Compare commits

...

11 Commits

Author SHA1 Message Date
3829f591e4 Add expires_at to queue-items (unicast & multicast).
This makes it possible to automatically remove items from the queue in
case the expires_at timestamp has reached. This field is optional and
the default remains to never expire queue-items.
2024-09-17 11:54:29 +01:00
2919fb79e5 Fix escaping issue in TOML template.
Closes #486.
2024-09-13 12:59:09 +01:00
61d794b680 Remove OFF, which is not a valid tracing Level. 2024-09-12 14:42:51 +01:00
29b57fb72a Implement importer for lorawan-device-profiles repo. 2024-09-12 14:22:09 +01:00
190d977bf5 lrwn: Update Deserialize trait implementation. 2024-09-12 14:20:59 +01:00
9d04a74a94 Update dependencies. 2024-09-09 13:36:59 +01:00
5f8ddca7b7 api: Re-export pbjson_types and tonic.
Closes #504.
2024-09-09 11:59:51 +01:00
e04515f647 ui: Run make format to format ui code. 2024-08-28 14:44:22 +01:00
d69a0eee75 ui: Fix tooltip date formatting.
With the migration from moment to date-fns (#460) some of the formatting
strings were not updated accordingly.

Fixes #503.
2024-08-28 14:44:07 +01:00
e63296573b Implement support for SQLite database backend. (#418) (#500)
This feature makes it possible to select between PostgreSQL and SQLite as database backend using a compile feature-flag. It is not possible to enable both at the same time.

---------

Co-authored-by: Momo Bel <plopyomomo@gmail.com>
2024-08-26 13:22:35 +01:00
800d7d0efe api: Upgrade io.grpc dependencies in Java API. (#494)
This fixes a compatibility issue with Netty.

---
Co-authored-by: Guillaume Milani <guillaume.milani@sontex.ch>
2024-08-21 14:49:08 +01:00
160 changed files with 5888 additions and 2813 deletions

View File

@ -13,6 +13,13 @@ env:
jobs:
tests:
runs-on: ubuntu-latest
strategy:
matrix:
database:
- postgres
- sqlite
env:
DATABASE: ${{ matrix.database }}
steps:
-
name: Checkout
@ -32,7 +39,7 @@ jobs:
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: ${{ runner.os }}-cargo-test-${{ hashFiles('**/Cargo.lock') }}
key: ${{ runner.os }}-cargo-test-${{ matrix.database }}-${{ hashFiles('**/Cargo.lock') }}
-
name: Start dependency services
run: docker compose up -d

4
.gitignore vendored
View File

@ -11,8 +11,12 @@
# Binary packages
/dist
# SQLite databases
*.sqlite
# Rust target directory
**/target
**/target-sqlite
# Certificates
/chirpstack/configuration/certs/*

617
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,7 @@ dist:
# Install dev dependencies
dev-dependencies:
cargo install cross --version 0.2.5
cargo install diesel_cli --version 2.2.1 --no-default-features --features postgres
cargo install diesel_cli --version 2.2.1 --no-default-features --features postgres,sqlite
cargo install cargo-deb --version 1.43.1
cargo install cargo-generate-rpm --version 0.12.1

View File

@ -84,7 +84,11 @@ docker compose up -d
Run the following command to run the ChirpStack tests:
```bash
# Test (with PostgresQL database backend)
make test
# Test with SQLite database backend
DATABASE=sqlite make test
```
### Building ChirpStack binaries
@ -109,6 +113,34 @@ make release-amd64
make dist
```
By default the above commands will build ChirpStack with the PostgresQL database
database backend. Set the `DATABASE=sqlite` env. variable to compile ChirpStack
with the SQLite database backend.
### Database migrations
To create a new database migration, execute:
```
make migration-generate NAME=test-migration
```
To apply migrations, execute:
```
make migration-run
```
To revert a migration, execute:
```
make migration-revert
```
By default the above commands will execute the migration commands using the
PostgresQL database backend. To execute migration commands for the SQLite
database backend, set the `DATABASE=sqlite` env. variable.
## License
ChirpStack Network Server is distributed under the MIT license. See also

View File

@ -21,10 +21,10 @@ buildscript {
}
dependencies {
api("io.grpc:grpc-protobuf:1.51.0")
api("io.grpc:grpc-api:1.51.0")
api("io.grpc:grpc-stub:1.51.0")
api("io.grpc:grpc-netty:1.51.0")
api("io.grpc:grpc-protobuf:1.59.1")
api("io.grpc:grpc-api:1.59.1")
api("io.grpc:grpc-stub:1.59.1")
api("io.grpc:grpc-netty:1.59.1")
implementation("javax.annotation:javax.annotation-api:1.3.2")
}

View File

@ -539,6 +539,10 @@ message DeviceQueueItem {
// the data payload. In this case, the f_cnt_down field must be set to
// the corresponding frame-counter which has been used during the encryption.
bool is_encrypted = 9;
// Expires at (optional).
// Expired queue-items will be automatically removed from the queue.
google.protobuf.Timestamp expires_at = 10;
}
message EnqueueDeviceQueueItemRequest { DeviceQueueItem queue_item = 1; }
@ -582,4 +586,4 @@ message GetDeviceNextFCntDownRequest {
message GetDeviceNextFCntDownResponse {
// FCntDown.
uint32 f_cnt_down = 1;
}
}

View File

@ -302,6 +302,10 @@ message MulticastGroupQueueItem {
// Payload.
bytes data = 4;
// Expires at (optional).
// Expired queue-items will be automatically removed from the queue.
google.protobuf.Timestamp expires_at = 5;
}
message EnqueueMulticastGroupQueueItemRequest {

View File

@ -60,6 +60,9 @@ enum LogCode {
// Downlink frame-counter.
F_CNT_DOWN = 10;
// Downlink has expired.
EXPIRED = 11;
}
// Device information.

2
api/rust/Cargo.toml vendored
View File

@ -12,7 +12,6 @@
default = ["api", "json"]
api = ["tonic/transport", "tonic-build/transport", "tokio"]
json = ["pbjson", "pbjson-types", "serde"]
diesel = ["dep:diesel"]
internal = []
[dependencies]
@ -29,7 +28,6 @@
pbjson = { version = "0.7", optional = true }
pbjson-types = { version = "0.7", optional = true }
serde = { version = "1.0", optional = true }
diesel = { version = "2.2", features = ["postgres_backend"], optional = true }
[build-dependencies]
tonic-build = { version = "0.12", features = [

View File

@ -539,6 +539,10 @@ message DeviceQueueItem {
// the data payload. In this case, the f_cnt_down field must be set to
// the corresponding frame-counter which has been used during the encryption.
bool is_encrypted = 9;
// Expires at (optional).
// Expired queue-items will be automatically removed from the queue.
google.protobuf.Timestamp expires_at = 10;
}
message EnqueueDeviceQueueItemRequest { DeviceQueueItem queue_item = 1; }
@ -582,4 +586,4 @@ message GetDeviceNextFCntDownRequest {
message GetDeviceNextFCntDownResponse {
// FCntDown.
uint32 f_cnt_down = 1;
}
}

View File

@ -302,6 +302,10 @@ message MulticastGroupQueueItem {
// Payload.
bytes data = 4;
// Expires at (optional).
// Expired queue-items will be automatically removed from the queue.
google.protobuf.Timestamp expires_at = 5;
}
message EnqueueMulticastGroupQueueItemRequest {

View File

@ -60,6 +60,9 @@ enum LogCode {
// Downlink frame-counter.
F_CNT_DOWN = 10;
// Downlink has expired.
EXPIRED = 11;
}
// Device information.

View File

@ -32,6 +32,7 @@ impl Into<String> for LogCode {
LogCode::DownlinkGateway => "DOWNLINK_GATEWAY",
LogCode::RelayNewEndDevice => "RELAY_NEW_END_DEVICE",
LogCode::FCntDown => "F_CNT_DOWN",
LogCode::Expired => "EXPIRED",
}
.to_string()
}

View File

@ -2,13 +2,6 @@ include!(concat!(env!("OUT_DIR"), "/internal/internal.rs"));
#[cfg(feature = "json")]
include!(concat!(env!("OUT_DIR"), "/internal/internal.serde.rs"));
#[cfg(feature = "diesel")]
use diesel::{backend::Backend, deserialize, serialize, sql_types::Binary};
#[cfg(feature = "diesel")]
use prost::Message;
#[cfg(feature = "diesel")]
use std::io::Cursor;
impl DeviceSession {
pub fn get_a_f_cnt_down(&self) -> u32 {
if self.mac_version().to_string().starts_with("1.0") {
@ -30,28 +23,3 @@ impl DeviceSession {
}
}
}
#[cfg(feature = "diesel")]
impl<ST, DB> deserialize::FromSql<ST, DB> for DeviceSession
where
DB: Backend,
*const [u8]: deserialize::FromSql<ST, DB>,
{
fn from_sql(value: DB::RawValue<'_>) -> deserialize::Result<Self> {
let bytes = <Vec<u8> as deserialize::FromSql<ST, DB>>::from_sql(value)?;
Ok(DeviceSession::decode(&mut Cursor::new(bytes))?)
}
}
#[cfg(feature = "diesel")]
impl serialize::ToSql<Binary, diesel::pg::Pg> for DeviceSession
where
[u8]: serialize::ToSql<Binary, diesel::pg::Pg>,
{
fn to_sql(&self, out: &mut serialize::Output<'_, '_, diesel::pg::Pg>) -> serialize::Result {
<[u8] as serialize::ToSql<Binary, diesel::pg::Pg>>::to_sql(
&self.encode_to_vec(),
&mut out.reborrow(),
)
}
}

5
api/rust/src/lib.rs vendored
View File

@ -1,4 +1,9 @@
#[cfg(feature = "json")]
pub use pbjson_types;
pub use prost;
#[cfg(feature = "api")]
pub use tonic;
#[cfg(feature = "api")]
pub mod api;
pub mod common;

View File

@ -26,20 +26,16 @@
email_address = "0.2"
diesel = { version = "2.2", features = [
"chrono",
"uuid",
"serde_json",
"numeric",
"64-column-tables",
"postgres_backend",
] }
diesel_migrations = { version = "2.2" }
diesel-async = { version = "0.5", features = [
"deadpool",
"postgres",
"async-connection-wrapper",
] }
tokio-postgres = "0.7"
tokio-postgres-rustls = "0.12"
tokio-postgres = { version = "0.7", optional = true }
tokio-postgres-rustls = { version = "0.12", optional = true }
bigdecimal = "0.4"
redis = { version = "0.26", features = ["tls-rustls", "tokio-rustls-comp"] }
deadpool-redis = { version = "0.16", features = ["cluster"] }
@ -53,11 +49,7 @@
], default-features = true }
# ChirpStack API definitions
chirpstack_api = { path = "../api/rust", features = [
"default",
"internal",
"diesel",
] }
chirpstack_api = { path = "../api/rust", features = ["default", "internal"] }
lrwn = { path = "../lrwn", features = [
"serde",
"diesel",
@ -91,7 +83,7 @@
tonic = "0.12"
tonic-web = "0.12"
tonic-reflection = "0.12"
tokio = { version = "1.38", features = ["macros", "rt-multi-thread"] }
tokio = { version = "1.40", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1"
prost-types = "0.13"
prost = "0.13"
@ -100,7 +92,7 @@
# gRPC and HTTP multiplexing
axum = "0.7"
axum-server = { version = "0.7.1", features = ["tls-rustls-no-provider"] }
tower = { version = "0.4" }
tower = { version = "0.5", features = ["util"] }
futures = "0.3"
futures-util = "0.3"
http = "1.1"
@ -116,14 +108,14 @@
# Authentication
pbkdf2 = { version = "0.12", features = ["simple"] }
rand_core = { version = "0.6", features = ["std"] }
jsonwebtoken = "9.2"
jsonwebtoken = "9.3"
rustls = { version = "0.23", default-features = false, features = [
"logging",
"std",
"tls12",
"ring",
] }
rustls-native-certs = "0.7"
rustls-native-certs = "0.8"
rustls-pemfile = "2.1"
pem = "3.0"
x509-parser = "0.16"
@ -161,16 +153,34 @@
petgraph = "0.6"
prometheus-client = "0.22"
pin-project = "1.1"
scoped-futures = { version = "0.1", features = ["std"] }
signal-hook = "0.3"
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] }
# Development and testing
[dev-dependencies]
httpmock = "0.7.0"
bytes = "1.6"
bytes = "1.7"
dotenv = "0.15"
[features]
default = ["postgres"]
postgres = [
"tokio-postgres",
"tokio-postgres-rustls",
"diesel/postgres_backend",
"diesel/serde_json",
"diesel/uuid",
"diesel-async/postgres",
"lrwn/postgres",
]
sqlite = [
"diesel/sqlite",
"diesel/returning_clauses_for_sqlite_3_35",
"lrwn/sqlite",
"diesel-async/sync-connection-wrapper",
"diesel-async/sqlite",
]
test-all-integrations = [
"test-integration-amqp",
"test-integration-kafka",

View File

@ -1,20 +1,21 @@
.PHONY: dist
PKG_VERSION := $(shell cargo metadata --no-deps --format-version 1 | jq -r '.packages[0].version')
DATABASE ?= postgres
debug-amd64:
cross build --target x86_64-unknown-linux-musl
cross build --target x86_64-unknown-linux-musl --no-default-features --features="$(DATABASE)"
release-amd64:
cross build --target x86_64-unknown-linux-musl --release
cross build --target x86_64-unknown-linux-musl --release --no-default-features --features="$(DATABASE)"
dist:
# Keep these in this order, as aarch64 is based on Debian Buster (older),
# the others on Bullseye. For some build scripts we want to build against
# least recent LIBC.
cross build --target aarch64-unknown-linux-musl --release
cross build --target x86_64-unknown-linux-musl --release
cross build --target armv7-unknown-linux-musleabihf --release
cross build --target aarch64-unknown-linux-musl --release --no-default-features --features="$(DATABASE)"
cross build --target x86_64-unknown-linux-musl --release --no-default-features --features="$(DATABASE)"
cross build --target armv7-unknown-linux-musleabihf --release --no-default-features --features="$(DATABASE)"
cargo deb --target x86_64-unknown-linux-musl --no-build --no-strip
cargo deb --target armv7-unknown-linux-musleabihf --no-build --no-strip
@ -40,10 +41,38 @@ dist:
test:
cargo fmt --check
cargo clippy --no-deps
TZ=UTC cargo test
cargo clippy --no-deps --no-default-features --features="$(DATABASE)"
TZ=UTC cargo test --no-default-features --features="$(DATABASE)"
test-all:
cargo fmt --check
cargo clippy --no-deps
TZ=UTC cargo test --features test-all-integrations
cargo clippy --no-deps --no-default-features --features="$(DATABASE)"
TZ=UTC cargo test --no-default-features --features="$(DATABASE),test-all-integrations"
migration-generate:
ifeq ($(NAME),)
@echo "You must provide a NAME parameter, e.g. make migration-generate NAME=test-migration"
else
diesel --config-file diesel_$(DATABASE).toml migration --migration-dir migrations_$(DATABASE) generate $(NAME)
endif
migration-run: chirpstack_test.sqlite
ifeq ($(DATABASE),postgres)
diesel --config-file diesel_postgres.toml migration --migration-dir migrations_postgres run
endif
ifeq ($(DATABASE),sqlite)
DATABASE_URL="chirpstack_test.sqlite" diesel --config-file diesel_sqlite.toml migration --migration-dir migrations_sqlite run
sed -i 's/Timestamp/TimestamptzSqlite/g' src/storage/schema_sqlite.rs
endif
migration-revert: chirpstack_test.sqlite
ifeq ($(DATABASE),postgres)
diesel --config-file diesel_postgres.toml migration --migration-dir migrations_postgres revert
endif
ifeq ($(DATABASE),sqlite)
DATABASE_URL="chirpstack_test.sqlite" diesel --config-file diesel_sqlite.toml migration --migration-dir migrations_sqlite revert
sed -i 's/Timestamp/TimestamptzSqlite/g' src/storage/schema_sqlite.rs
endif
chirpstack_test.sqlite:
DATABASE_URL=chirpstack_test.sqlite diesel --config-file diesel_sqlite.toml setup --migration-dir migrations_sqlite

View File

@ -2,4 +2,4 @@
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/storage/schema.rs"
file = "src/storage/schema_postgres.rs"

View File

@ -0,0 +1,5 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/storage/schema_sqlite.rs"

View File

@ -0,0 +1,6 @@
alter table device_queue_item
drop column expires_at;
alter table multicast_group_queue_item
drop column expires_at;

View File

@ -0,0 +1,6 @@
alter table multicast_group_queue_item
add column expires_at timestamp with time zone null;
alter table device_queue_item
add column expires_at timestamp with time zone null;

View File

@ -0,0 +1,18 @@
drop table relay_gateway;
drop table multicast_group_gateway;
drop table multicast_group_queue_item;
drop table multicast_group_device;
drop table multicast_group;
drop table device_queue_item;
drop table device_keys;
drop table device;
drop table device_profile;
drop table api_key;
drop table application_integration;
drop table application;
drop table gateway;
drop table tenant_user;
drop table tenant;
drop table "user";
drop table relay_device;
drop table device_profile_template;

View File

@ -0,0 +1,392 @@
-- user
create table "user" (
id text not null primary key,
external_id text null,
created_at datetime not null,
updated_at datetime not null,
is_admin boolean not null,
is_active boolean not null,
email text not null,
email_verified boolean not null,
password_hash varchar(200) not null,
note text not null
);
create unique index idx_user_email on "user"(email);
create unique index idx_user_external_id on "user"(external_id);
insert into "user" (
id,
created_at,
updated_at,
is_admin,
is_active,
email,
email_verified,
password_hash,
note
) values (
'05244f12-6daf-4e1f-8315-c66783a0ab56',
datetime('now'),
datetime('now'),
TRUE,
TRUE,
'admin',
FALSE,
'$pbkdf2-sha512$i=1,l=64$l8zGKtxRESq3PA2kFhHRWA$H3lGMxOt55wjwoc+myeOoABofJY9oDpldJa7fhqdjbh700V6FLPML75UmBOt9J5VFNjAL1AvqCozA1HJM0QVGA',
''
);
-- tenant
create table tenant (
id text not null primary key,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
description text not null,
can_have_gateways boolean not null,
max_device_count integer not null,
max_gateway_count integer not null,
private_gateways_up boolean not null,
private_gateways_down boolean not null default FALSE,
tags text not null default '{}'
);
-- sqlite has advanced text search with https://www.sqlite.org/fts5.html
-- but looks like it is for a full table and not specific per column, to investigate
create index idx_tenant_name_trgm on "tenant"(name);
insert into "tenant" (
id,
created_at,
updated_at,
name,
description,
can_have_gateways,
max_device_count,
max_gateway_count,
private_gateways_up
) values (
'52f14cd4-c6f1-4fbd-8f87-4025e1d49242',
datetime('now'),
datetime('now'),
'ChirpStack',
'',
TRUE,
0,
0,
FALSE
);
-- tenant user
create table tenant_user (
tenant_id text not null references tenant on delete cascade,
user_id text not null references "user" on delete cascade,
created_at datetime not null,
updated_at datetime not null,
is_admin boolean not null,
is_device_admin boolean not null,
is_gateway_admin boolean not null,
primary key (tenant_id, user_id)
);
create index idx_tenant_user_user_id on tenant_user (user_id);
-- gateway
create table gateway (
gateway_id blob not null primary key,
tenant_id text not null references tenant on delete cascade,
created_at datetime not null,
updated_at datetime not null,
last_seen_at datetime,
name varchar(100) not null,
description text not null,
latitude double precision not null,
longitude double precision not null,
altitude real not null,
stats_interval_secs integer not null,
tls_certificate blob,
tags text not null,
properties text not null
);
create index idx_gateway_tenant_id on gateway (tenant_id);
create index idx_gateway_name_trgm on gateway (name);
create index idx_gateway_id_trgm on gateway (hex(gateway_id));
create index idx_gateway_tags on gateway (tags);
-- application
create table application (
id text not null primary key,
tenant_id text not null references tenant on delete cascade,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
description text not null,
mqtt_tls_cert blob,
tags text not null default '{}'
);
create index idx_application_tenant_id on application (tenant_id);
create index idx_application_name_trgm on application (name);
-- application integration
create table application_integration (
application_id text not null references application on delete cascade,
kind varchar(20) not null,
created_at datetime not null,
updated_at datetime not null,
configuration text not null,
primary key (application_id, kind)
);
-- api-key
create table api_key (
id text not null primary key,
created_at datetime not null,
name varchar(100) not null,
is_admin boolean not null,
tenant_id text null references tenant on delete cascade
);
create index idx_api_key_tenant_id on api_key (tenant_id);
-- device-profile
create table device_profile (
id text not null primary key,
tenant_id text not null references tenant on delete cascade,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
region varchar(10) not null,
mac_version varchar(10) not null,
reg_params_revision varchar(20) not null,
adr_algorithm_id varchar(100) not null,
payload_codec_runtime varchar(20) not null,
uplink_interval integer not null,
device_status_req_interval integer not null,
supports_otaa boolean not null,
supports_class_b boolean not null,
supports_class_c boolean not null,
class_b_timeout integer not null,
class_b_ping_slot_nb_k integer not null,
class_b_ping_slot_dr smallint not null,
class_b_ping_slot_freq bigint not null,
class_c_timeout integer not null,
abp_rx1_delay smallint not null,
abp_rx1_dr_offset smallint not null,
abp_rx2_dr smallint not null,
abp_rx2_freq bigint not null,
tags text not null,
payload_codec_script text not null default '',
flush_queue_on_activate boolean not null default FALSE,
description text not null default '',
measurements text not null default '{}',
auto_detect_measurements boolean not null default TRUE,
region_config_id varchar(100) null,
is_relay boolean not null default FALSE,
is_relay_ed boolean not null default FALSE,
relay_ed_relay_only boolean not null default FALSE,
relay_enabled boolean not null default FALSE,
relay_cad_periodicity smallint not null default 0,
relay_default_channel_index smallint not null default 0,
relay_second_channel_freq bigint not null default 0,
relay_second_channel_dr smallint not null default 0,
relay_second_channel_ack_offset smallint not null default 0,
relay_ed_activation_mode smallint not null default 0,
relay_ed_smart_enable_level smallint not null default 0,
relay_ed_back_off smallint not null default 0,
relay_ed_uplink_limit_bucket_size smallint not null default 0,
relay_ed_uplink_limit_reload_rate smallint not null default 0,
relay_join_req_limit_reload_rate smallint not null default 0,
relay_notify_limit_reload_rate smallint not null default 0,
relay_global_uplink_limit_reload_rate smallint not null default 0,
relay_overall_limit_reload_rate smallint not null default 0,
relay_join_req_limit_bucket_size smallint not null default 0,
relay_notify_limit_bucket_size smallint not null default 0,
relay_global_uplink_limit_bucket_size smallint not null default 0,
relay_overall_limit_bucket_size smallint not null default 0,
allow_roaming boolean not null default TRUE,
rx1_delay smallint not null default 0
);
create index idx_device_profile_tenant_id on device_profile (tenant_id);
create index idx_device_profile_name_trgm on device_profile (name);
create index idx_device_profile_tags on device_profile (tags);
-- device
create table device (
dev_eui blob not null primary key,
application_id text not null references application on delete cascade,
device_profile_id text not null references device_profile on delete cascade,
created_at datetime not null,
updated_at datetime not null,
last_seen_at datetime,
scheduler_run_after datetime,
name varchar(100) not null,
description text not null,
external_power_source boolean not null,
battery_level numeric(5, 2),
margin int,
dr smallint,
latitude double precision,
longitude double precision,
altitude real,
dev_addr blob,
enabled_class char(1) not null,
skip_fcnt_check boolean not null,
is_disabled boolean not null,
tags text not null,
variables text not null,
join_eui blob not null default x'00000000',
secondary_dev_addr blob,
device_session blob
);
create index idx_device_application_id on device (application_id);
create index idx_device_device_profile_id on device (device_profile_id);
create index idx_device_name_trgm on device (name);
create index idx_device_dev_eui_trgm on device (hex(dev_eui));
create index idx_device_dev_addr_trgm on device (hex(dev_addr));
create index idx_device_tags on device (tags);
create table device_keys (
dev_eui blob not null primary key references device on delete cascade,
created_at datetime not null,
updated_at datetime not null,
nwk_key blob not null,
app_key blob not null,
dev_nonces text not null,
join_nonce int not null
);
create table device_queue_item (
id text not null primary key,
dev_eui blob references device on delete cascade not null,
created_at datetime not null,
f_port smallint not null,
confirmed boolean not null,
data blob not null,
is_pending boolean not null,
f_cnt_down bigint null,
timeout_after datetime,
is_encrypted boolean default FALSE not null
);
create index idx_device_queue_item_dev_eui on device_queue_item (dev_eui);
create index idx_device_queue_item_created_at on device_queue_item (created_at);
create index idx_device_queue_item_timeout_after on device_queue_item (timeout_after);
-- multicast groups
create table multicast_group (
id text not null primary key,
application_id text not null references application on delete cascade,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
region varchar(10) not null,
mc_addr blob not null,
mc_nwk_s_key blob not null,
mc_app_s_key blob not null,
f_cnt bigint not null,
group_type char(1) not null,
dr smallint not null,
frequency bigint not null,
class_b_ping_slot_nb_k smallint not null,
class_c_scheduling_type varchar(20) not null default 'delay'
);
create index idx_multicast_group_application_id on multicast_group (application_id);
create index idx_multicast_group_name_trgm on multicast_group (name);
create table multicast_group_device (
multicast_group_id text not null references multicast_group on delete cascade,
dev_eui blob not null references device on delete cascade,
created_at datetime not null,
primary key (multicast_group_id, dev_eui)
);
create table multicast_group_queue_item (
id text not null primary key,
created_at datetime not null,
scheduler_run_after datetime not null,
multicast_group_id text not null references multicast_group on delete cascade,
gateway_id blob not null references gateway on delete cascade,
f_cnt bigint not null,
f_port smallint not null,
data blob not null,
emit_at_time_since_gps_epoch bigint
);
create index idx_multicast_group_queue_item_multicast_group_id on multicast_group_queue_item (multicast_group_id);
create index idx_multicast_group_queue_item_scheduler_run_after on multicast_group_queue_item (scheduler_run_after);
create table device_profile_template (
id text not null primary key,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
description text not null,
vendor varchar(100) not null,
firmware varchar(100) not null,
region varchar(10) not null,
mac_version varchar(10) not null,
reg_params_revision varchar(20) not null,
adr_algorithm_id varchar(100) not null,
payload_codec_runtime varchar(20) not null,
payload_codec_script text not null,
uplink_interval integer not null,
device_status_req_interval integer not null,
flush_queue_on_activate boolean not null,
supports_otaa boolean not null,
supports_class_b boolean not null,
supports_class_c boolean not null,
class_b_timeout integer not null,
class_b_ping_slot_nb_k integer not null,
class_b_ping_slot_dr smallint not null,
class_b_ping_slot_freq bigint not null,
class_c_timeout integer not null,
abp_rx1_delay smallint not null,
abp_rx1_dr_offset smallint not null,
abp_rx2_dr smallint not null,
abp_rx2_freq bigint not null,
tags text not null,
measurements text not null default '{}',
auto_detect_measurements boolean not null default TRUE
);
create table multicast_group_gateway (
multicast_group_id text not null references multicast_group on delete cascade,
gateway_id blob not null references gateway on delete cascade,
created_at datetime not null,
primary key (multicast_group_id, gateway_id)
);
create table relay_device (
relay_dev_eui blob not null references device on delete cascade,
dev_eui blob not null references device on delete cascade,
created_at datetime not null,
primary key (relay_dev_eui, dev_eui)
);
create index idx_tenant_tags on tenant (tags);
create index idx_application_tags on application (tags);
create index idx_device_dev_addr on device (dev_addr);
create index idx_device_secondary_dev_addr on device (secondary_dev_addr);
-- relay gateway
create table relay_gateway (
tenant_id text not null references tenant on delete cascade,
relay_id blob not null,
created_at datetime not null,
updated_at datetime not null,
last_seen_at datetime,
name varchar(100) not null,
description text not null,
stats_interval_secs integer not null,
region_config_id varchar(100) not null,
primary key (tenant_id, relay_id)
);

View File

@ -0,0 +1,6 @@
alter table device_queue_item
drop column expires_at;
alter table multicast_group_queue_item
drop column expires_at;

View File

@ -0,0 +1,5 @@
alter table multicast_group_queue_item
add column expires_at datetime null;
alter table device_queue_item
add column expires_at datetime null;

View File

@ -44,7 +44,7 @@ impl ApplicationService for Application {
.await?;
let a = application::Application {
tenant_id,
tenant_id: tenant_id.into(),
name: req_app.name.clone(),
description: req_app.description.clone(),
tags: fields::KeyValue::new(req_app.tags.clone()),
@ -119,7 +119,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update(application::Application {
id: app_id,
id: app_id.into(),
name: req_app.name.to_string(),
description: req_app.description.to_string(),
tags: fields::KeyValue::new(req_app.tags.clone()),
@ -279,7 +279,7 @@ impl ApplicationService for Application {
.await?;
let i = application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::Http,
configuration: application::IntegrationConfiguration::Http(
application::HttpConfiguration {
@ -367,7 +367,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::Http,
configuration: application::IntegrationConfiguration::Http(
application::HttpConfiguration {
@ -438,7 +438,7 @@ impl ApplicationService for Application {
.await?;
let i = application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::InfluxDb,
configuration: application::IntegrationConfiguration::InfluxDb(
application::InfluxDbConfiguration {
@ -535,7 +535,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::InfluxDb,
configuration: application::IntegrationConfiguration::InfluxDb(
application::InfluxDbConfiguration {
@ -610,7 +610,7 @@ impl ApplicationService for Application {
.await?;
let i = application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::ThingsBoard,
configuration: application::IntegrationConfiguration::ThingsBoard(
application::ThingsBoardConfiguration {
@ -689,7 +689,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::ThingsBoard,
configuration: application::IntegrationConfiguration::ThingsBoard(
application::ThingsBoardConfiguration {
@ -755,7 +755,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::MyDevices,
configuration: application::IntegrationConfiguration::MyDevices(
application::MyDevicesConfiguration {
@ -832,7 +832,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::MyDevices,
configuration: application::IntegrationConfiguration::MyDevices(
application::MyDevicesConfiguration {
@ -907,7 +907,7 @@ impl ApplicationService for Application {
};
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::LoraCloud,
configuration: application::IntegrationConfiguration::LoraCloud(
application::LoraCloudConfiguration {
@ -1032,7 +1032,7 @@ impl ApplicationService for Application {
};
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::LoraCloud,
configuration: application::IntegrationConfiguration::LoraCloud(
application::LoraCloudConfiguration {
@ -1119,7 +1119,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::GcpPubSub,
configuration: application::IntegrationConfiguration::GcpPubSub(
application::GcpPubSubConfiguration {
@ -1202,7 +1202,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::GcpPubSub,
configuration: application::IntegrationConfiguration::GcpPubSub(
application::GcpPubSubConfiguration {
@ -1271,7 +1271,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::AwsSns,
configuration: application::IntegrationConfiguration::AwsSns(
application::AwsSnsConfiguration {
@ -1354,7 +1354,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::AwsSns,
configuration: application::IntegrationConfiguration::AwsSns(
application::AwsSnsConfiguration {
@ -1424,7 +1424,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::AzureServiceBus,
configuration: application::IntegrationConfiguration::AzureServiceBus(
application::AzureServiceBusConfiguration {
@ -1506,7 +1506,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::AzureServiceBus,
configuration: application::IntegrationConfiguration::AzureServiceBus(
application::AzureServiceBusConfiguration {
@ -1574,7 +1574,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::PilotThings,
configuration: application::IntegrationConfiguration::PilotThings(
application::PilotThingsConfiguration {
@ -1653,7 +1653,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::PilotThings,
configuration: application::IntegrationConfiguration::PilotThings(
application::PilotThingsConfiguration {
@ -1730,7 +1730,7 @@ impl ApplicationService for Application {
}
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::Ifttt,
configuration: application::IntegrationConfiguration::Ifttt(
application::IftttConfiguration {
@ -1814,7 +1814,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::Ifttt,
configuration: application::IntegrationConfiguration::Ifttt(
application::IftttConfiguration {
@ -1945,7 +1945,9 @@ pub mod test {
}),
};
let mut create_req = Request::new(create_req);
create_req.extensions_mut().insert(AuthID::User(u.id));
create_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let create_resp = service.create(create_req).await.unwrap();
let create_resp = create_resp.get_ref();
@ -1954,7 +1956,9 @@ pub mod test {
id: create_resp.id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Application {
@ -1976,7 +1980,9 @@ pub mod test {
}),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update(up_req).await.unwrap();
//get
@ -1984,7 +1990,9 @@ pub mod test {
id: create_resp.id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Application {
@ -2004,7 +2012,9 @@ pub mod test {
offset: 0,
};
let mut list_req = Request::new(list_req);
list_req.extensions_mut().insert(AuthID::User(u.id));
list_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let list_resp = service.list(list_req).await.unwrap();
assert_eq!(1, list_resp.get_ref().total_count);
assert_eq!(1, list_resp.get_ref().result.len());
@ -2014,14 +2024,18 @@ pub mod test {
id: create_resp.id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.delete(del_req).await.unwrap();
let del_req = api::DeleteApplicationRequest {
id: create_resp.id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
}

File diff suppressed because it is too large Load Diff

View File

@ -64,8 +64,8 @@ impl DeviceService for Device {
let d = device::Device {
dev_eui,
application_id: app_id,
device_profile_id: dp_id,
application_id: app_id.into(),
device_profile_id: dp_id.into(),
name: req_d.name.clone(),
description: req_d.description.clone(),
skip_fcnt_check: req_d.skip_fcnt_check,
@ -191,8 +191,8 @@ impl DeviceService for Device {
// update
let _ = device::update(device::Device {
dev_eui,
application_id: app_id,
device_profile_id: dp_id,
application_id: app_id.into(),
device_profile_id: dp_id.into(),
name: req_d.name.clone(),
description: req_d.description.clone(),
skip_fcnt_check: req_d.skip_fcnt_check,
@ -533,7 +533,7 @@ impl DeviceService for Device {
dp.reset_session_to_boot_params(&mut ds);
let mut device_changeset = device::DeviceChangeset {
device_session: Some(Some(ds)),
device_session: Some(Some(ds.into())),
dev_addr: Some(Some(dev_addr)),
secondary_dev_addr: Some(None),
..Default::default()
@ -1085,7 +1085,7 @@ impl DeviceService for Device {
}
let qi = device_queue::DeviceQueueItem {
id: Uuid::new_v4(),
id: Uuid::new_v4().into(),
dev_eui,
f_port: req_qi.f_port as i16,
confirmed: req_qi.confirmed,
@ -1095,6 +1095,14 @@ impl DeviceService for Device {
} else {
None
},
expires_at: if let Some(expires_at) = req_qi.expires_at {
let expires_at: std::time::SystemTime = expires_at
.try_into()
.map_err(|e: prost_types::TimestampError| e.status())?;
Some(expires_at.into())
} else {
None
},
data,
..Default::default()
};
@ -1169,6 +1177,10 @@ impl DeviceService for Device {
is_pending: qi.is_pending,
f_cnt_down: qi.f_cnt_down.unwrap_or(0) as u32,
is_encrypted: qi.is_encrypted,
expires_at: qi.expires_at.map(|v| {
let v: std::time::SystemTime = v.into();
v.into()
}),
})
.collect(),
});
@ -1539,11 +1551,14 @@ pub mod test {
dev.dev_eui,
&device::DeviceChangeset {
dev_addr: Some(Some(DevAddr::from_be_bytes([1, 2, 3, 4]))),
device_session: Some(Some(internal::DeviceSession {
dev_addr: vec![1, 2, 3, 4],
js_session_key_id: vec![8, 7, 6, 5, 4, 3, 2, 1],
..Default::default()
})),
device_session: Some(Some(
internal::DeviceSession {
dev_addr: vec![1, 2, 3, 4],
js_session_key_id: vec![8, 7, 6, 5, 4, 3, 2, 1],
..Default::default()
}
.into(),
)),
..Default::default()
},
)
@ -1568,14 +1583,17 @@ pub mod test {
device::partial_update(
dev.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(internal::DeviceSession {
dev_addr: vec![1, 2, 3, 4],
app_s_key: Some(common::KeyEnvelope {
kek_label: "test-key".into(),
aes_key: vec![8, 7, 6, 5, 4, 3, 2, 1, 8, 7, 6, 5, 4, 3, 2, 1],
}),
..Default::default()
})),
device_session: Some(Some(
internal::DeviceSession {
dev_addr: vec![1, 2, 3, 4],
app_s_key: Some(common::KeyEnvelope {
kek_label: "test-key".into(),
aes_key: vec![8, 7, 6, 5, 4, 3, 2, 1, 8, 7, 6, 5, 4, 3, 2, 1],
}),
..Default::default()
}
.into(),
)),
..Default::default()
},
)

View File

@ -45,7 +45,7 @@ impl DeviceProfileService for DeviceProfile {
.await?;
let mut dp = device_profile::DeviceProfile {
tenant_id,
tenant_id: tenant_id.into(),
name: req_dp.name.clone(),
description: req_dp.description.clone(),
region: req_dp.region().from_proto(),
@ -247,7 +247,7 @@ impl DeviceProfileService for DeviceProfile {
// update
let _ = device_profile::update(device_profile::DeviceProfile {
id: dp_id,
id: dp_id.into(),
name: req_dp.name.clone(),
description: req_dp.description.clone(),
region: req_dp.region().from_proto(),

View File

@ -58,7 +58,7 @@ impl GatewayService for Gateway {
let gw = gateway::Gateway {
gateway_id: EUI64::from_str(&req_gw.gateway_id).map_err(|e| e.status())?,
tenant_id,
tenant_id: tenant_id.into(),
name: req_gw.name.clone(),
description: req_gw.description.clone(),
latitude: lat,
@ -851,8 +851,8 @@ impl GatewayService for Gateway {
.await?;
let _ = gateway::update_relay_gateway(gateway::RelayGateway {
tenant_id,
relay_id,
tenant_id: tenant_id.into(),
name: req_relay.name.clone(),
description: req_relay.description.clone(),
stats_interval_secs: req_relay.stats_interval as i32,
@ -1028,7 +1028,9 @@ pub mod test {
}),
};
let mut create_req = Request::new(create_req);
create_req.extensions_mut().insert(AuthID::User(u.id));
create_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.create(create_req).await.unwrap();
// get
@ -1036,7 +1038,9 @@ pub mod test {
gateway_id: "0102030405060708".into(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Gateway {
@ -1070,7 +1074,9 @@ pub mod test {
}),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update(up_req).await.unwrap();
// get
@ -1078,7 +1084,9 @@ pub mod test {
gateway_id: "0102030405060708".into(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Gateway {
@ -1105,7 +1113,9 @@ pub mod test {
..Default::default()
};
let mut list_req = Request::new(list_req);
list_req.extensions_mut().insert(AuthID::User(u.id));
list_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let list_resp = service.list(list_req).await.unwrap();
assert_eq!(1, list_resp.get_ref().total_count);
assert_eq!(1, list_resp.get_ref().result.len());
@ -1115,14 +1125,18 @@ pub mod test {
gateway_id: "0102030405060708".into(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.delete(del_req).await.unwrap();
let del_req = api::DeleteGatewayRequest {
gateway_id: "0102030405060708".into(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
}
@ -1198,7 +1212,9 @@ pub mod test {
aggregation: common::Aggregation::Day.into(),
};
let mut stats_req = Request::new(stats_req);
stats_req.extensions_mut().insert(AuthID::User(u.id));
stats_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let stats_resp = service.get_metrics(stats_req).await.unwrap();
let stats_resp = stats_resp.get_ref();
assert_eq!(
@ -1289,7 +1305,7 @@ pub mod test {
end: Some(now_st.into()),
};
let mut stats_req = Request::new(stats_req);
stats_req.extensions_mut().insert(AuthID::User(u.id));
stats_req.extensions_mut().insert(AuthID::User(u.id.into()));
let stats_resp = service.get_duty_cycle_metrics(stats_req).await.unwrap();
let stats_resp = stats_resp.get_ref();
assert_eq!(
@ -1363,7 +1379,9 @@ pub mod test {
relay_id: "01020304".into(),
};
let mut get_relay_req = Request::new(get_relay_req);
get_relay_req.extensions_mut().insert(AuthID::User(u.id));
get_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let get_relay_resp = service.get_relay_gateway(get_relay_req).await.unwrap();
assert_eq!(
Some(api::RelayGateway {
@ -1389,7 +1407,9 @@ pub mod test {
}),
};
let mut up_relay_req = Request::new(up_relay_req);
up_relay_req.extensions_mut().insert(AuthID::User(u.id));
up_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let _ = service.update_relay_gateway(up_relay_req).await.unwrap();
// get relay gateway
@ -1398,7 +1418,9 @@ pub mod test {
relay_id: "01020304".into(),
};
let mut get_relay_req = Request::new(get_relay_req);
get_relay_req.extensions_mut().insert(AuthID::User(u.id));
get_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let get_relay_resp = service.get_relay_gateway(get_relay_req).await.unwrap();
assert_eq!(
Some(api::RelayGateway {
@ -1419,7 +1441,9 @@ pub mod test {
offset: 0,
};
let mut list_relay_req = Request::new(list_relay_req);
list_relay_req.extensions_mut().insert(AuthID::User(u.id));
list_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let list_relay_resp = service.list_relay_gateways(list_relay_req).await.unwrap();
assert_eq!(1, list_relay_resp.get_ref().total_count);
assert_eq!(1, list_relay_resp.get_ref().result.len());
@ -1430,7 +1454,9 @@ pub mod test {
relay_id: "01020304".into(),
};
let mut del_relay_req = Request::new(del_relay_req);
del_relay_req.extensions_mut().insert(AuthID::User(u.id));
del_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let del_relay_resp = service.delete_relay_gateway(del_relay_req).await;
assert!(del_relay_resp.is_ok());
@ -1439,7 +1465,9 @@ pub mod test {
relay_id: "01020304".into(),
};
let mut del_relay_req = Request::new(del_relay_req);
del_relay_req.extensions_mut().insert(AuthID::User(u.id));
del_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let del_relay_resp = service.delete_relay_gateway(del_relay_req).await;
assert!(del_relay_resp.is_err());
}

View File

@ -287,7 +287,11 @@ impl InternalService for Internal {
let tenant_id = if req_key.tenant_id.is_empty() {
None
} else {
Some(Uuid::from_str(&req_key.tenant_id).map_err(|e| e.status())?)
Some(
Uuid::from_str(&req_key.tenant_id)
.map_err(|e| e.status())?
.into(),
)
};
if req_key.is_admin && tenant_id.is_some() {
@ -312,7 +316,7 @@ impl InternalService for Internal {
let ak = api_key::ApiKey {
name: req_key.name.clone(),
is_admin: req_key.is_admin,
tenant_id,
tenant_id: tenant_id.map(|u| u.into()),
..Default::default()
};

View File

@ -133,7 +133,7 @@ pub async fn setup() -> Result<()> {
.add_service(
TonicReflectionBuilder::configure()
.register_encoded_file_descriptor_set(chirpstack_api::api::DESCRIPTOR)
.build()
.build_v1()
.unwrap(),
)
.add_service(InternalServiceServer::with_interceptor(

View File

@ -47,7 +47,7 @@ impl MulticastGroupService for MulticastGroup {
.await?;
let mg = multicast::MulticastGroup {
application_id: app_id,
application_id: app_id.into(),
name: req_mg.name.clone(),
region: req_mg.region().from_proto(),
mc_addr: DevAddr::from_str(&req_mg.mc_addr).map_err(|e| e.status())?,
@ -154,7 +154,7 @@ impl MulticastGroupService for MulticastGroup {
.await?;
let _ = multicast::update(multicast::MulticastGroup {
id: mg_id,
id: mg_id.into(),
name: req_mg.name.clone(),
region: req_mg.region().from_proto(),
mc_addr: DevAddr::from_str(&req_mg.mc_addr).map_err(|e| e.status())?,
@ -408,9 +408,17 @@ impl MulticastGroupService for MulticastGroup {
.await?;
let f_cnt = downlink::multicast::enqueue(multicast::MulticastGroupQueueItem {
multicast_group_id: mg_id,
multicast_group_id: mg_id.into(),
f_port: req_enq.f_port as i16,
data: req_enq.data.clone(),
expires_at: if let Some(expires_at) = req_enq.expires_at {
let expires_at: std::time::SystemTime = expires_at
.try_into()
.map_err(|e: prost_types::TimestampError| e.status())?;
Some(expires_at.into())
} else {
None
},
..Default::default()
})
.await
@ -478,6 +486,10 @@ impl MulticastGroupService for MulticastGroup {
f_cnt: qi.f_cnt as u32,
f_port: qi.f_port as u32,
data: qi.data.clone(),
expires_at: qi.expires_at.map(|v| {
let v: std::time::SystemTime = v.into();
v.into()
}),
});
}
}
@ -778,6 +790,7 @@ pub mod test {
f_cnt: 31,
f_port: 10,
data: vec![1, 2, 3],
expires_at: None,
},
list_queue_resp.items[0]
);

View File

@ -122,7 +122,7 @@ impl TenantService for Tenant {
// update
let _ = tenant::update(tenant::Tenant {
id: tenant_id,
id: tenant_id.into(),
name: req_tenant.name.clone(),
description: req_tenant.description.clone(),
can_have_gateways: req_tenant.can_have_gateways,
@ -190,7 +190,7 @@ impl TenantService for Tenant {
let u = user::get(id).await.map_err(|e| e.status())?;
if !u.is_admin {
filters.user_id = Some(u.id);
filters.user_id = Some(u.id.into());
}
}
AuthID::Key(_) => {
@ -258,8 +258,8 @@ impl TenantService for Tenant {
.await?;
let _ = tenant::add_user(tenant::TenantUser {
tenant_id,
user_id,
tenant_id: tenant_id.into(),
user_id: user_id.into(),
is_admin: req_user.is_admin,
is_device_admin: req_user.is_device_admin,
is_gateway_admin: req_user.is_gateway_admin,
@ -342,8 +342,8 @@ impl TenantService for Tenant {
.await?;
tenant::update_user(tenant::TenantUser {
tenant_id,
user_id,
tenant_id: tenant_id.into(),
user_id: user_id.into(),
is_admin: req_user.is_admin,
is_device_admin: req_user.is_device_admin,
is_gateway_admin: req_user.is_gateway_admin,
@ -482,7 +482,9 @@ pub mod test {
}),
};
let mut create_req = Request::new(create_req);
create_req.extensions_mut().insert(AuthID::User(u.id));
create_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let create_resp = service.create(create_req).await.unwrap();
// get
@ -490,7 +492,9 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Tenant {
@ -518,7 +522,9 @@ pub mod test {
}),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update(up_req).await.unwrap();
// get
@ -526,7 +532,9 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Tenant {
@ -549,7 +557,9 @@ pub mod test {
user_id: "".into(),
};
let mut list_req = Request::new(list_req);
list_req.extensions_mut().insert(AuthID::User(u.id));
list_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let list_resp = service.list(list_req).await.unwrap();
assert_eq!(1, list_resp.get_ref().total_count);
assert_eq!(1, list_resp.get_ref().result.len());
@ -559,14 +569,18 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.delete(del_req).await.unwrap();
let del_req = api::DeleteTenantRequest {
id: create_resp.get_ref().id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
}

View File

@ -64,8 +64,8 @@ impl UserService for User {
let tenant_id = Uuid::from_str(&tu.tenant_id).map_err(|e| e.status())?;
tenant::add_user(tenant::TenantUser {
tenant_id,
user_id: u.id,
tenant_id: tenant_id.into(),
user_id: u.id.into(),
is_admin: tu.is_admin,
is_device_admin: tu.is_device_admin,
is_gateway_admin: tu.is_gateway_admin,
@ -138,7 +138,7 @@ impl UserService for User {
// update
let _ = user::update(user::User {
id: user_id,
id: user_id.into(),
is_admin: req_user.is_admin,
is_active: req_user.is_active,
email: req_user.email.clone(),
@ -292,7 +292,9 @@ pub mod test {
}),
};
let mut create_req = Request::new(create_req);
create_req.extensions_mut().insert(AuthID::User(u.id));
create_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let create_resp = service.create(create_req).await.unwrap();
// get
@ -300,7 +302,9 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::User {
@ -326,7 +330,9 @@ pub mod test {
}),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update(up_req).await.unwrap();
// get
@ -334,7 +340,9 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::User {
@ -354,7 +362,9 @@ pub mod test {
password: "newpassword".into(),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update_password(up_req).await.unwrap();
// list
@ -363,7 +373,9 @@ pub mod test {
limit: 10,
};
let mut list_req = Request::new(list_req);
list_req.extensions_mut().insert(AuthID::User(u.id));
list_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let list_resp = service.list(list_req).await.unwrap();
// * Admin from migrations
// * User that we created for auth
@ -376,14 +388,18 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.delete(del_req).await.unwrap();
let del_req = api::DeleteUserRequest {
id: create_resp.get_ref().id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
@ -391,7 +407,9 @@ pub mod test {
id: u.id.to_string(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
}

View File

@ -1,9 +1,10 @@
use handlebars::{no_escape, Handlebars};
use handlebars::Handlebars;
use super::super::config;
pub fn run() {
let template = r#"
let template = vec![
r#"
# Logging configuration
[logging]
@ -15,12 +16,13 @@ pub fn run() {
# * INFO
# * WARN
# * ERROR
# * OFF
level="{{ logging.level }}"
# Log as JSON.
json={{ logging.json }}
"#,
#[cfg(feature = "postgres")]
r#"
# PostgreSQL configuration.
[postgresql]
@ -46,8 +48,36 @@ pub fn run() {
# the server-certificate is not signed by a CA in the platform certificate
# store.
ca_cert="{{ postgresql.ca_cert }}"
"#,
#[cfg(feature = "sqlite")]
r#"
# SQLite configuration.
[sqlite]
# Sqlite DB path.
#
# Format example: sqlite:///<DATABASE>.
#
path="{{ sqlite.path }}"
# Max open connections.
#
# This sets the max. number of open connections that are allowed in the
# SQLite connection pool.
max_open_connections={{ sqlite.max_open_connections }}
# PRAGMAs.
#
# This configures the list of PRAGMAs that are executed to prepare the
# SQLite library. For a full list of available PRAGMAs see:
# https://www.sqlite.org/pragma.html
pragmas=[
{{#each sqlite.pragmas}}
"{{this}}",
{{/each}}
]
"#,
r#"
# Redis configuration.
[redis]
@ -944,6 +974,7 @@ pub fn run() {
kek="{{ this.kek }}"
{{/each}}
# UI configuration.
[ui]
# Tileserver URL.
@ -958,14 +989,14 @@ pub fn run() {
# default tileserver_url (OSM). If you configure a different tile-server, you
# might need to update the map_attribution.
map_attribution="{{ui.map_attribution}}"
"#;
"#].join("\n");
let mut reg = Handlebars::new();
reg.register_escape_fn(no_escape);
reg.register_escape_fn(|s| s.to_string().replace('"', r#"\""#));
let conf = config::get();
println!(
"{}",
reg.render_template(template, &conf)
reg.render_template(&template, &conf)
.expect("render configfile error")
);
}

View File

@ -0,0 +1,259 @@
use std::fs;
use std::path::Path;
use anyhow::Result;
use serde::Deserialize;
use tracing::{info, span, Instrument, Level};
use crate::codec::Codec;
use crate::storage::{self, device_profile_template};
use lrwn::region;
#[derive(Deserialize)]
struct VendorConfig {
pub vendor: Vendor,
}
#[derive(Default, Deserialize)]
#[serde(default)]
pub struct Vendor {
pub slug: String,
pub name: String,
pub id: usize,
pub ouis: Vec<String>,
pub devices: Vec<String>,
}
#[derive(Deserialize)]
pub struct DeviceConfig {
pub device: Device,
}
#[derive(Default, Deserialize)]
#[serde(default)]
pub struct Device {
pub slug: String,
pub name: String,
pub description: String,
pub firmware: Vec<DeviceFirmware>,
}
#[derive(Deserialize)]
pub struct DeviceFirmware {
pub version: String,
pub profiles: Vec<String>,
pub codec: Option<String>,
}
#[derive(Deserialize)]
pub struct ProfileConfig {
pub profile: Profile,
}
#[derive(Deserialize)]
#[serde(default)]
pub struct Profile {
pub region: region::CommonName,
pub mac_version: region::MacVersion,
pub reg_params_revision: region::Revision,
pub supports_otaa: bool,
pub supports_class_b: bool,
pub supports_class_c: bool,
pub max_eirp: usize,
pub abp: ProfileAbp,
pub class_b: ProfileClassB,
pub class_c: ProfileClassC,
}
impl Default for Profile {
fn default() -> Self {
Self {
region: region::CommonName::EU868,
mac_version: region::MacVersion::LORAWAN_1_0_4,
reg_params_revision: region::Revision::RP002_1_0_4,
supports_otaa: false,
supports_class_b: false,
supports_class_c: false,
max_eirp: 0,
abp: ProfileAbp::default(),
class_b: ProfileClassB::default(),
class_c: ProfileClassC::default(),
}
}
}
#[derive(Default, Deserialize)]
pub struct ProfileAbp {
pub rx1_delay: usize,
pub rx1_dr_offset: usize,
pub rx2_dr: usize,
pub rx2_freq: usize,
}
#[derive(Default, Deserialize)]
pub struct ProfileClassB {
pub timeout_secs: usize,
pub ping_slot_nb_k: usize,
pub ping_slot_dr: usize,
pub ping_slot_freq: usize,
}
#[derive(Default, Deserialize)]
pub struct ProfileClassC {
pub timeout_secs: usize,
}
pub async fn run(dir: &Path) -> Result<()> {
storage::setup().await?;
info!(path = ?dir, "Import LoRaWAN device profiles");
let vendors_dir = dir.join("vendors");
let vendors = fs::read_dir(vendors_dir)?;
for vendor in vendors {
if let Ok(vendor) = vendor {
if vendor.file_name() == "example-vendor" {
continue;
}
let span = span!(Level::INFO, "", vendor = ?vendor.file_name());
let vendor_dir = vendor.path();
if vendor_dir.is_dir() {
handle_vendor(&vendor_dir).instrument(span).await?;
}
}
}
Ok(())
}
async fn handle_vendor(dir: &Path) -> Result<()> {
let vendor_conf = dir.join("vendor.toml");
info!(path = ?vendor_conf, "Reading vendor configuration");
let mut vendor_conf: VendorConfig = toml::from_str(&fs::read_to_string(vendor_conf)?)?;
vendor_conf.vendor.slug = dir.file_name().unwrap().to_str().unwrap().to_string();
info!(vendor_name = %vendor_conf.vendor.name, "Vendor loaded");
for device in &vendor_conf.vendor.devices {
let span = span!(Level::INFO, "", device = %device);
handle_device(dir, &vendor_conf.vendor, device)
.instrument(span)
.await?;
}
Ok(())
}
async fn handle_device(dir: &Path, vendor: &Vendor, device: &str) -> Result<()> {
let device_conf = dir.join("devices").join(device);
info!(path = ?device_conf, "Reading device configuration");
let mut device_conf: DeviceConfig = toml::from_str(&fs::read_to_string(device_conf)?)?;
device_conf.device.slug = dir
.join("devices")
.join(device)
.file_stem()
.unwrap()
.to_str()
.unwrap()
.to_string();
info!(device_name = %device_conf.device.name, "Device loaded");
for firmware in &device_conf.device.firmware {
let span = span!(Level::INFO, "", firmware = %firmware.version);
handle_firmware(dir, vendor, &device_conf.device, firmware)
.instrument(span)
.await?;
}
Ok(())
}
async fn handle_firmware(
dir: &Path,
vendor: &Vendor,
device: &Device,
firmware: &DeviceFirmware,
) -> Result<()> {
let codec = if let Some(codec) = &firmware.codec {
let codec_path = dir.join("codecs").join(codec);
info!(path = ?codec_path, "Reading codec file");
Some(fs::read_to_string(codec_path)?)
} else {
None
};
for profile in &firmware.profiles {
let span = span!(Level::INFO, "", profile = %profile);
handle_profile(dir, vendor, device, firmware, &codec, profile)
.instrument(span)
.await?;
}
Ok(())
}
async fn handle_profile(
dir: &Path,
vendor: &Vendor,
device: &Device,
firmware: &DeviceFirmware,
codec: &Option<String>,
profile: &str,
) -> Result<()> {
let profile_path = dir.join("profiles").join(profile);
info!(path = ?profile_path, "Reading profile configuration");
let profile_conf: ProfileConfig = toml::from_str(&fs::read_to_string(profile_path)?)?;
let id_regex = regex::Regex::new(r"[^a-zA-Z0-9\-]+").unwrap();
let id = format!(
"{}-{}-{}-{}",
vendor.slug, device.slug, firmware.version, profile_conf.profile.region
);
let id = id_regex.replace_all(&id, "-").to_string();
let dpt = device_profile_template::DeviceProfileTemplate {
id,
name: device.name.clone(),
description: device.description.clone(),
vendor: vendor.name.clone(),
firmware: firmware.version.clone(),
region: profile_conf.profile.region,
mac_version: profile_conf.profile.mac_version,
reg_params_revision: profile_conf.profile.reg_params_revision,
adr_algorithm_id: "default".into(),
payload_codec_runtime: match codec {
Some(_) => Codec::JS,
None => Codec::NONE,
},
payload_codec_script: match codec {
Some(v) => v.into(),
None => "".into(),
},
uplink_interval: 60 * 60,
device_status_req_interval: 1,
flush_queue_on_activate: true,
supports_otaa: profile_conf.profile.supports_otaa,
supports_class_b: profile_conf.profile.supports_class_b,
supports_class_c: profile_conf.profile.supports_class_c,
class_b_timeout: profile_conf.profile.class_b.timeout_secs as i32,
class_b_ping_slot_nb_k: profile_conf.profile.class_b.ping_slot_nb_k as i32,
class_b_ping_slot_dr: profile_conf.profile.class_b.ping_slot_dr as i16,
class_b_ping_slot_freq: profile_conf.profile.class_b.ping_slot_freq as i64,
class_c_timeout: profile_conf.profile.class_c.timeout_secs as i32,
abp_rx1_delay: profile_conf.profile.abp.rx1_delay as i16,
abp_rx1_dr_offset: profile_conf.profile.abp.rx1_dr_offset as i16,
abp_rx2_dr: profile_conf.profile.abp.rx2_dr as i16,
abp_rx2_freq: profile_conf.profile.abp.rx2_freq as i64,
..Default::default()
};
info!(id = %dpt.id, "Creating or updating device-profile template");
device_profile_template::upsert(dpt).await?;
Ok(())
}

View File

@ -43,7 +43,7 @@ pub async fn run() -> Result<()> {
*dev_eui,
&storage::device::DeviceChangeset {
dev_addr: Some(Some(DevAddr::from_slice(&ds.dev_addr)?)),
device_session: Some(Some(ds)),
device_session: Some(Some(ds.into())),
..Default::default()
},
)

View File

@ -1,6 +1,7 @@
pub mod configfile;
pub mod create_api_key;
pub mod import_legacy_lorawan_devices_repository;
pub mod import_lorawan_device_profiles;
pub mod migrate_ds_to_pg;
pub mod print_ds;
pub mod root;

View File

@ -5,8 +5,11 @@ use std::str::FromStr;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use diesel::backend::Backend;
#[cfg(feature = "postgres")]
use diesel::pg::Pg;
use diesel::sql_types::Text;
#[cfg(feature = "sqlite")]
use diesel::sqlite::Sqlite;
use diesel::{deserialize, serialize};
use serde::{Deserialize, Serialize};
@ -40,6 +43,7 @@ where
}
}
#[cfg(feature = "postgres")]
impl serialize::ToSql<Text, Pg> for Codec
where
str: serialize::ToSql<Text, Pg>,
@ -49,6 +53,14 @@ where
}
}
#[cfg(feature = "sqlite")]
impl serialize::ToSql<Text, Sqlite> for Codec {
fn to_sql(&self, out: &mut serialize::Output<'_, '_, Sqlite>) -> serialize::Result {
out.set_value(self.to_string());
Ok(serialize::IsNull::No)
}
}
impl FromStr for Codec {
type Err = anyhow::Error;

View File

@ -19,6 +19,7 @@ pub struct Configuration {
pub logging: Logging,
pub postgresql: Postgresql,
pub redis: Redis,
pub sqlite: Sqlite,
pub api: Api,
pub gateway: Gateway,
pub network: Network,
@ -90,6 +91,29 @@ impl Default for Redis {
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct Sqlite {
pub path: String,
pub pragmas: Vec<String>,
pub max_open_connections: u32,
}
impl Default for Sqlite {
fn default() -> Self {
Sqlite {
path: "sqlite://chirpstack.sqlite".into(),
pragmas: vec![
// Set busy_timeout to avoid manually managing transaction business/contention
"busy_timeout = 1000".to_string(),
// Enable foreign-keys since it is off by default
"foreign_keys = ON".to_string(),
],
max_open_connections: 4,
}
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct Api {

View File

@ -363,7 +363,7 @@ impl Data {
trace!("Selecting downlink gateway");
let gw_down = helpers::select_downlink_gateway(
Some(self.tenant.id),
Some(self.tenant.id.into()),
&self.device.get_device_session()?.region_config_id,
self.network_conf.gateway_prefer_min_margin,
self.device_gateway_rx_info.as_mut().unwrap(),
@ -464,9 +464,11 @@ impl Data {
// The queue item:
// * should fit within the max payload size
// * should not be pending
// * should not be expired
// * in case encrypted, should have a valid FCntDown
if qi.data.len() <= max_payload_size
&& !qi.is_pending
&& !(qi.expires_at.is_some() && qi.expires_at.unwrap() < Utc::now())
&& !(qi.is_encrypted
&& (qi.f_cnt_down.unwrap_or_default() as u32) < ds.get_a_f_cnt_down())
{
@ -519,12 +521,41 @@ impl Data {
},
};
integration::ack_event(self.application.id, &self.device.variables, &pl).await;
integration::ack_event(self.application.id.into(), &self.device.variables, &pl)
.await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of timeout");
continue;
}
// Handle expired payload.
if let Some(expires_at) = qi.expires_at {
if expires_at < Utc::now() {
device_queue::delete_item(&qi.id)
.await
.context("Delete device queue-item")?;
let pl = integration_pb::LogEvent {
time: Some(Utc::now().into()),
device_info: Some(device_info.clone()),
level: integration_pb::LogLevel::Error.into(),
code: integration_pb::LogCode::Expired.into(),
description: "Device queue-item discarded because it has expired"
.to_string(),
context: [("queue_item_id".to_string(), qi.id.to_string())]
.iter()
.cloned()
.collect(),
};
integration::log_event(self.application.id.into(), &self.device.variables, &pl)
.await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because it has expired");
continue;
}
}
// Handle payload size.
if qi.data.len() > max_payload_size {
device_queue::delete_item(&qi.id)
@ -549,7 +580,8 @@ impl Data {
.collect(),
};
integration::log_event(self.application.id, &self.device.variables, &pl).await;
integration::log_event(self.application.id.into(), &self.device.variables, &pl)
.await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of max. payload size");
continue;
@ -585,7 +617,8 @@ impl Data {
.collect(),
};
integration::log_event(self.application.id, &self.device.variables, &pl).await;
integration::log_event(self.application.id.into(), &self.device.variables, &pl)
.await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of invalid frame-counter");
continue;
@ -2728,7 +2761,7 @@ mod test {
name: "max payload size error".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
@ -2764,11 +2797,46 @@ mod test {
..Default::default()
}),
},
Test {
name: "item has expired".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3],
expires_at: Some(Utc::now() - chrono::Duration::seconds(10)),
..Default::default()
}],
expected_queue_item: None,
expected_ack_event: None,
expected_log_event: Some(integration_pb::LogEvent {
device_info: Some(integration_pb::DeviceInfo {
tenant_id: t.id.to_string(),
tenant_name: t.name.clone(),
application_id: app.id.to_string(),
application_name: app.name.clone(),
device_profile_id: dp.id.to_string(),
device_profile_name: dp.name.clone(),
device_name: d.name.clone(),
dev_eui: d.dev_eui.to_string(),
..Default::default()
}),
level: integration_pb::LogLevel::Error.into(),
code: integration_pb::LogCode::Expired.into(),
description: "Device queue-item discarded because it has expired".into(),
context: [("queue_item_id".to_string(), qi_id.to_string())]
.iter()
.cloned()
.collect(),
..Default::default()
}),
},
Test {
name: "is pending".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
f_cnt_down: Some(10),
@ -2800,7 +2868,7 @@ mod test {
name: "invalid frame-counter".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3],
@ -2841,14 +2909,14 @@ mod test {
name: "valid payload".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3],
..Default::default()
}],
expected_queue_item: Some(device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3],
@ -2874,7 +2942,7 @@ mod test {
let d = device::partial_update(
d.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(ds.clone())),
device_session: Some(Some(ds.clone().into())),
..Default::default()
},
)
@ -3418,11 +3486,14 @@ mod test {
dev_addr: Some(*dev_addr),
application_id: app.id,
device_profile_id: dp_ed.id,
device_session: Some(internal::DeviceSession {
dev_addr: dev_addr.to_vec(),
nwk_s_enc_key: vec![0; 16],
..Default::default()
}),
device_session: Some(
internal::DeviceSession {
dev_addr: dev_addr.to_vec(),
nwk_s_enc_key: vec![0; 16],
..Default::default()
}
.into(),
),
..Default::default()
})
.await
@ -3435,7 +3506,7 @@ mod test {
let d_relay = device::partial_update(
d_relay.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(test.device_session.clone())),
device_session: Some(Some(test.device_session.clone().into())),
..Default::default()
},
)
@ -3884,7 +3955,7 @@ mod test {
let d_relay = device::partial_update(
d_relay.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(test.device_session.clone())),
device_session: Some(Some(test.device_session.clone().into())),
..Default::default()
},
)
@ -4015,7 +4086,7 @@ mod test {
application: application::Application::default(),
device_profile: test.device_profile.clone(),
device: device::Device {
device_session: Some(test.device_session.clone()),
device_session: Some(test.device_session.clone().into()),
..Default::default()
},
network_conf: config::get_region_network("eu868").unwrap(),
@ -4126,7 +4197,7 @@ mod test {
application: application::Application::default(),
device_profile: test.device_profile.clone(),
device: device::Device {
device_session: Some(test.device_session.clone()),
device_session: Some(test.device_session.clone().into()),
..Default::default()
},
network_conf: config::get_region_network("eu868").unwrap(),
@ -4247,7 +4318,7 @@ mod test {
application: application::Application::default(),
device_profile: test.device_profile.clone(),
device: device::Device {
device_session: Some(test.device_session.clone()),
device_session: Some(test.device_session.clone().into()),
..Default::default()
},
network_conf: config::get_region_network("eu868").unwrap(),
@ -4504,7 +4575,7 @@ mod test {
let d_relay = device::partial_update(
d_relay.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(test.device_session.clone())),
device_session: Some(Some(test.device_session.clone().into())),
..Default::default()
},
)

View File

@ -239,7 +239,7 @@ mod tests {
},
// is_private_down is set, first gateway matches tenant.
Test {
tenant_id: Some(t.id),
tenant_id: Some(t.id.into()),
min_snr_margin: 0.0,
rx_info: internal::DeviceGatewayRxInfo {
items: vec![
@ -262,7 +262,7 @@ mod tests {
},
// is_private_down is set, second gateway matches tenant.
Test {
tenant_id: Some(t.id),
tenant_id: Some(t.id.into()),
min_snr_margin: 0.0,
rx_info: internal::DeviceGatewayRxInfo {
items: vec![

View File

@ -182,7 +182,7 @@ impl JoinAccept<'_> {
trace!("Select downlink gateway");
let gw_down = helpers::select_downlink_gateway(
Some(self.tenant.id),
Some(self.tenant.id.into()),
&self.uplink_frame_set.region_config_id,
self.network_conf.gateway_prefer_min_margin,
self.device_gateway_rx_info.as_mut().unwrap(),

View File

@ -1,6 +1,7 @@
use std::collections::{HashMap, HashSet};
use anyhow::{Context, Result};
use chrono::Utc;
use petgraph::algo::min_spanning_tree;
use petgraph::data::FromElements;
use petgraph::graph::{DefaultIx, Graph, NodeIndex, UnGraph};
@ -55,6 +56,7 @@ impl Multicast {
ctx.get_gateway().await?;
ctx.set_region_config_id()?;
ctx.get_multicast_group().await?;
ctx.validate_expiration().await?;
ctx.validate_payload_size().await?;
ctx.set_tx_info()?;
ctx.set_phy_payload()?;
@ -90,6 +92,22 @@ impl Multicast {
Ok(())
}
async fn validate_expiration(&self) -> Result<()> {
trace!("Validating expires_at");
if let Some(expires_at) = self.multicast_group_queue_item.expires_at {
if Utc::now() > expires_at {
warn!(
expires_at = %expires_at,
"Discarding multicast-group queue item because it has expired"
);
multicast::delete_queue_item(&self.multicast_group_queue_item.id).await?;
return Err(anyhow!("Queue item has expired and has been discarded"));
}
}
Ok(())
}
async fn validate_payload_size(&self) -> Result<()> {
trace!("Validating payload size for DR");
let mg = self.multicast_group.as_ref().unwrap();

View File

@ -434,7 +434,7 @@ impl TxAck {
..Default::default()
};
integration::log_event(app.id, &dev.variables, &pl).await;
integration::log_event(app.id.into(), &dev.variables, &pl).await;
Ok(())
}
@ -483,7 +483,7 @@ impl TxAck {
tx_info: self.downlink_frame_item.as_ref().unwrap().tx_info.clone(),
};
integration::txack_event(app.id, &dev.variables, &pl).await;
integration::txack_event(app.id.into(), &dev.variables, &pl).await;
Ok(())
}
@ -532,7 +532,7 @@ impl TxAck {
tx_info: self.downlink_frame_item.as_ref().unwrap().tx_info.clone(),
};
integration::txack_event(app.id, &dev.variables, &pl).await;
integration::txack_event(app.id.into(), &dev.variables, &pl).await;
Ok(())
}

View File

@ -8,7 +8,7 @@ use tokio::fs;
// Return root certificates, optionally with the provided ca_file appended.
pub fn get_root_certs(ca_file: Option<String>) -> Result<rustls::RootCertStore> {
let mut roots = rustls::RootCertStore::empty();
for cert in rustls_native_certs::load_native_certs()? {
for cert in rustls_native_certs::load_native_certs().certs {
roots.add(cert)?;
}

View File

@ -11,7 +11,7 @@ use tokio::fs;
// Return root certificates, optionally with the provided ca_file appended.
pub fn get_root_certs(ca_file: Option<String>) -> Result<rustls::RootCertStore> {
let mut roots = rustls::RootCertStore::empty();
for cert in rustls_native_certs::load_native_certs()? {
for cert in rustls_native_certs::load_native_certs().certs {
roots.add(cert)?;
}

View File

@ -28,6 +28,7 @@ pub mod mock;
mod mqtt;
mod mydevices;
mod pilot_things;
#[cfg(feature = "postgres")]
mod postgresql;
mod redis;
mod thingsboard;
@ -54,6 +55,7 @@ pub async fn setup() -> Result<()> {
.context("Setup MQTT integration")?,
));
}
#[cfg(feature = "postgres")]
"postgresql" => integrations.push(Box::new(
postgresql::Integration::new(&conf.integration.postgresql)
.await
@ -533,7 +535,7 @@ async fn handle_down_command(application_id: String, pl: integration::DownlinkCo
// Validate that the application_id from the topic is indeed the application ID to which
// the device belongs.
let dev = device::get(&dev_eui).await?;
if dev.application_id != app_id {
if Into::<Uuid>::into(dev.application_id) != app_id {
return Err(anyhow!(
"Application ID from topic does not match application ID from device"
));
@ -555,8 +557,8 @@ async fn handle_down_command(application_id: String, pl: integration::DownlinkCo
let qi = device_queue::DeviceQueueItem {
id: match pl.id.is_empty() {
true => Uuid::new_v4(),
false => Uuid::from_str(&pl.id)?,
true => Uuid::new_v4().into(),
false => Uuid::from_str(&pl.id)?.into(),
},
f_port: pl.f_port as i16,
confirmed: pl.confirmed,

View File

@ -118,7 +118,7 @@ mod test {
for tst in &tests {
let mut dev = device::Device {
device_session: Some(tst.device_session.clone()),
device_session: Some(tst.device_session.clone().into()),
..Default::default()
};
let resp = handle(

View File

@ -277,7 +277,7 @@ mod test {
device::partial_update(
dev.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(tst.device_session_ed.clone())),
device_session: Some(Some(tst.device_session_ed.clone().into())),
..Default::default()
},
)
@ -285,7 +285,7 @@ mod test {
.unwrap();
let mut relay_dev = device::Device {
device_session: Some(tst.device_session.clone()),
device_session: Some(tst.device_session.clone().into()),
..Default::default()
};

View File

@ -1,11 +1,10 @@
use anyhow::Result;
use bigdecimal::BigDecimal;
use chrono::{DateTime, Utc};
use tracing::info;
use crate::api::helpers::ToProto;
use crate::integration;
use crate::storage::{application, device, device_profile, tenant};
use crate::storage::{application, device, device_profile, fields, tenant};
use crate::uplink::{helpers, UplinkFrameSet};
use chirpstack_api::integration as integration_pb;
@ -29,8 +28,8 @@ pub async fn handle(
margin: Some(pl.margin as i32),
external_power_source: Some(pl.battery == 0),
battery_level: Some(if pl.battery > 0 && pl.battery < 255 {
let v: BigDecimal = ((pl.battery as f32) / 254.0 * 100.0).try_into()?;
Some(v.with_scale(2))
let v: fields::BigDecimal = ((pl.battery as f32) / 254.0 * 100.0).try_into()?;
Some(v.with_scale(2).into())
} else {
None
}),
@ -47,7 +46,7 @@ pub async fn handle(
helpers::get_rx_timestamp(&uplink_frame_set.rx_info_set).into();
integration::status_event(
app.id,
app.id.into(),
&dev.variables,
&integration_pb::StatusEvent {
deduplication_id: uplink_frame_set.uplink_set_id.to_string(),
@ -203,7 +202,7 @@ pub mod test {
assert_eq!(Some(10), d.margin);
assert!(!d.external_power_source);
assert_eq!(
Some(BigDecimal::from_str("100.00").unwrap()),
Some(bigdecimal::BigDecimal::from_str("100.00").unwrap().into()),
d.battery_level
);
}

View File

@ -178,7 +178,7 @@ mod test {
for tst in &tests {
let mut dev = device::Device {
device_session: Some(tst.device_session.clone()),
device_session: Some(tst.device_session.clone().into()),
..Default::default()
};
let resp = handle(

View File

@ -216,7 +216,7 @@ mod test {
for tst in &tests {
let mut dev = device::Device {
device_session: Some(tst.device_session.clone()),
device_session: Some(tst.device_session.clone().into()),
..Default::default()
};
let resp = handle(&mut dev, &tst.filter_list_ans, tst.filter_list_req.as_ref());

View File

@ -361,7 +361,7 @@ pub mod test {
for tst in &tests {
let mut dev = device::Device {
dev_eui: lrwn::EUI64::from_str("0102030405060708").unwrap(),
device_session: Some(tst.device_session.clone()),
device_session: Some(tst.device_session.clone().into()),
..Default::default()
};
let block = lrwn::MACCommandSet::new(vec![lrwn::MACCommand::LinkADRAns(

View File

@ -207,9 +207,12 @@ pub mod test {
let dp: device_profile::DeviceProfile = Default::default();
let mut dev = device::Device {
dev_eui: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
device_session: Some(internal::DeviceSession {
..Default::default()
}),
device_session: Some(
internal::DeviceSession {
..Default::default()
}
.into(),
),
..Default::default()
};

View File

@ -472,7 +472,7 @@ pub mod test {
for tst in &tests {
let mut dev = device::Device {
device_session: Some(tst.device_session.clone()),
device_session: Some(tst.device_session.clone().into()),
..Default::default()
};

View File

@ -65,7 +65,7 @@ pub async fn handle(
.collect(),
};
integration::log_event(app.id, &dev.variables, &log_event).await;
integration::log_event(app.id.into(), &dev.variables, &log_event).await;
Ok(None)
}
@ -88,19 +88,19 @@ mod test {
integration::set_mock().await;
let t = tenant::Tenant {
id: Uuid::new_v4(),
id: Uuid::new_v4().into(),
name: "tenant".to_string(),
..Default::default()
};
let app = application::Application {
id: Uuid::new_v4(),
id: Uuid::new_v4().into(),
name: "app".to_string(),
..Default::default()
};
let dp = device_profile::DeviceProfile {
id: Uuid::new_v4(),
id: Uuid::new_v4().into(),
name: "dp".to_string(),
tags: fields::KeyValue::new(
[("dp_tag".to_string(), "dp_value".to_string())]

View File

@ -183,7 +183,7 @@ pub mod test {
for tst in &tests {
let mut dev = device::Device {
device_session: Some(tst.device_session.clone()),
device_session: Some(tst.device_session.clone().into()),
..Default::default()
};
let resp = handle(

View File

@ -37,7 +37,7 @@ pub mod test {
#[test]
fn test_handle() {
let mut dev = device::Device {
device_session: Some(internal::DeviceSession::default()),
device_session: Some(internal::DeviceSession::default().into()),
..Default::default()
};
let block = lrwn::MACCommandSet::new(vec![lrwn::MACCommand::PingSlotInfoReq(

View File

@ -161,7 +161,7 @@ pub mod test {
for tst in &tests {
let mut dev = device::Device {
device_session: Some(tst.device_session.clone()),
device_session: Some(tst.device_session.clone().into()),
..Default::default()
};
let resp = handle(

View File

@ -180,7 +180,7 @@ mod test {
for tst in &tests {
let mut dev = device::Device {
device_session: Some(tst.device_session.clone()),
device_session: Some(tst.device_session.clone().into()),
..Default::default()
};
let resp = handle(&mut dev, &tst.relay_conf_ans, tst.relay_conf_req.as_ref());

View File

@ -48,21 +48,24 @@ pub mod test {
#[test]
fn test_handle() {
let mut dev = device::Device {
device_session: Some(internal::DeviceSession {
tx_power_index: 3,
min_supported_tx_power_index: 1,
max_supported_tx_power_index: 5,
extra_uplink_channels: [(3, Default::default())].iter().cloned().collect(),
rx1_delay: 3,
rx1_dr_offset: 1,
rx2_dr: 5,
rx2_frequency: 868900000,
enabled_uplink_channel_indices: vec![0, 1],
class_b_ping_slot_dr: 3,
class_b_ping_slot_freq: 868100000,
nb_trans: 3,
..Default::default()
}),
device_session: Some(
internal::DeviceSession {
tx_power_index: 3,
min_supported_tx_power_index: 1,
max_supported_tx_power_index: 5,
extra_uplink_channels: [(3, Default::default())].iter().cloned().collect(),
rx1_delay: 3,
rx1_dr_offset: 1,
rx2_dr: 5,
rx2_frequency: 868900000,
enabled_uplink_channel_indices: vec![0, 1],
class_b_ping_slot_dr: 3,
class_b_ping_slot_freq: 868100000,
nb_trans: 3,
..Default::default()
}
.into(),
),
..Default::default()
};
let dp = device_profile::DeviceProfile {

Some files were not shown because too many files have changed in this diff Show More