Compare commits

...

18 Commits

Author SHA1 Message Date
da00217a3a Merge branch 'master' into feat_sqlite_support 2024-08-26 12:49:45 +01:00
489a35e0ec Bump version to 4.9.0 2024-08-15 09:06:19 +01:00
1848097e6c Fix missing last_seen_at field update of relay gateway. 2024-08-13 15:54:51 +01:00
76b7ec4881 Add signal handling.
Fixes #480.
2024-08-13 13:04:06 +01:00
3aaa114f46 Update deadpool-redis. 2024-08-08 16:53:07 +01:00
a811ec5c80 Bump version to 4.9.0-test.6 2024-08-07 15:11:09 +01:00
5794f41324 Fix clippy feedback (cargo clippy --fix). 2024-08-07 14:59:25 +01:00
21f07fedb0 Implement support for SQLite database backend. (#418)
This feature makes it possible to select between PostgreSQL and SQLite as database backend using a compile feature-flag. It is not possible to enable both at the same time.

---------

Co-authored-by: Orne Brocaar <info@brocaar.com>
2024-08-06 16:23:43 +01:00
7158c0c610 ui: Update codec template with JSDoc (#473) 2024-08-05 11:17:13 +01:00
eda6646b18 Update dependencies. 2024-08-05 10:15:23 +01:00
40a2b83bf5 Update redis dependency. 2024-08-05 10:06:48 +01:00
4e0106a4e8 Replace warp with axum.
The warp dependency was causing some issues with upgrading dependencies
as it depends on http v0.2, where other dependencies (e.g. tonic) have
already upgraded to http v1+.
2024-08-01 11:33:57 +01:00
98978135c4 ui: Fix formatting template after merging #460. 2024-07-30 11:18:52 +01:00
6a691c62e2 Update dependencies. 2024-07-24 14:12:50 +01:00
66ab41036b Update lapin crate.
This disables the default features (rustls), because lapin enables the
default rustls features, which pulls in the aws-lc-rs dependency besides
ring.

Most likely, the next lapin version will fix this by exposing feature
flags to either enable aws-lc-rs or ring backend for rustls.
2024-07-24 14:05:46 +01:00
dc57e6fe51 Update rustls to 0.23. 2024-07-23 14:03:09 +01:00
ebc4065ca2 Bump version to 4.9.0-test.5 2024-07-23 11:03:39 +01:00
a23797ddbb Fix updating dependencies.
The previous update dependencies commit contained a dependency that
pulled in the aws-lc-rs crate, which fails to build on ARMv7. See also
370b84cb09f0d55c9cc1d993df2474e579e7fa94.

This commit reverts the updates and only updates part of the crates.

A proper fix will be to update all dependencies to rustls 0.23 such that
we can enable the ring feature flag (which is the 0.22 default).
2024-07-23 10:39:58 +01:00
198 changed files with 6471 additions and 4095 deletions

View File

@ -13,6 +13,13 @@ env:
jobs:
tests:
runs-on: ubuntu-latest
strategy:
matrix:
database:
- postgres
- sqlite
env:
DATABASE: ${{ matrix.database }}
steps:
-
name: Checkout
@ -32,7 +39,7 @@ jobs:
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: ${{ runner.os }}-cargo-test-${{ hashFiles('**/Cargo.lock') }}
key: ${{ runner.os }}-cargo-test-${{ matrix.database }}-${{ hashFiles('**/Cargo.lock') }}
-
name: Start dependency services
run: docker compose up -d

4
.gitignore vendored
View File

@ -11,8 +11,12 @@
# Binary packages
/dist
# SQLite databases
*.sqlite
# Rust target directory
**/target
**/target-sqlite
# Certificates
/chirpstack/configuration/certs/*

893
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,7 @@ dist:
# Install dev dependencies
dev-dependencies:
cargo install cross --version 0.2.5
cargo install diesel_cli --version 2.2.1 --no-default-features --features postgres
cargo install diesel_cli --version 2.2.1 --no-default-features --features postgres,sqlite
cargo install cargo-deb --version 1.43.1
cargo install cargo-generate-rpm --version 0.12.1

View File

@ -84,7 +84,11 @@ docker compose up -d
Run the following command to run the ChirpStack tests:
```bash
# Test (with PostgresQL database backend)
make test
# Test with SQLite database backend
DATABASE=sqlite make test
```
### Building ChirpStack binaries
@ -109,6 +113,34 @@ make release-amd64
make dist
```
By default the above commands will build ChirpStack with the PostgresQL database
database backend. Set the `DATABASE=sqlite` env. variable to compile ChirpStack
with the SQLite database backend.
### Database migrations
To create a new database migration, execute:
```
make migration-generate NAME=test-migration
```
To apply migrations, execute:
```
make migration-run
```
To revert a migration, execute:
```
make migration-revert
```
By default the above commands will execute the migration commands using the
PostgresQL database backend. To execute migration commands for the SQLite
database backend, set the `DATABASE=sqlite` env. variable.
## License
ChirpStack Network Server is distributed under the MIT license. See also

View File

@ -1,6 +1,6 @@
{
"name": "@chirpstack/chirpstack-api-grpc-web",
"version": "4.9.0-test.4",
"version": "4.9.0",
"description": "Chirpstack gRPC-web API",
"license": "MIT",
"devDependencies": {

View File

@ -8,7 +8,7 @@ plugins {
}
group = "io.chirpstack"
version = "4.9.0-test.4"
version = "4.9.0"
repositories {
mavenCentral()

2
api/js/package.json vendored
View File

@ -1,6 +1,6 @@
{
"name": "@chirpstack/chirpstack-api",
"version": "4.9.0-test.4",
"version": "4.9.0",
"description": "Chirpstack JS and TS API",
"license": "MIT",
"devDependencies": {

View File

@ -9,7 +9,7 @@ plugins {
}
group = "io.chirpstack"
version = "4.9.0-test.4"
version = "4.9.0"
repositories {
mavenCentral()

View File

@ -3,7 +3,7 @@
"description": "Chirpstack PHP API",
"license": "MIT",
"type": "library",
"version": "4.9.0-test.4",
"version": "4.9.0",
"require": {
"php": ">=7.0.0",
"grpc/grpc": "^v1.57.0",

View File

@ -18,7 +18,7 @@ CLASSIFIERS = [
setup(
name='chirpstack-api',
version = "4.9.0-test.4",
version = "4.9.0",
url='https://github.com/brocaar/chirpstack-api',
author='Orne Brocaar',
author_email='info@brocaar.com',

18
api/rust/Cargo.toml vendored
View File

@ -1,7 +1,7 @@
[package]
name = "chirpstack_api"
description = "ChirpStack Protobuf / gRPC API definitions."
version = "4.9.0-test.4"
version = "4.9.0"
authors = ["Orne Brocaar <info@brocaar.com>"]
license = "MIT"
homepage = "https://www.chirpstack.io"
@ -12,27 +12,25 @@
default = ["api", "json"]
api = ["tonic/transport", "tonic-build/transport", "tokio"]
json = ["pbjson", "pbjson-types", "serde"]
diesel = ["dep:diesel"]
internal = []
[dependencies]
prost = "0.12"
prost-types = "0.12"
prost = "0.13"
prost-types = "0.13"
hex = "0.4"
rand = "0.8"
tonic = { version = "0.11", features = [
tonic = { version = "0.12", features = [
"codegen",
"prost",
], default-features = false, optional = true }
tokio = { version = "1.38", features = ["macros"], optional = true }
pbjson = { version = "0.6", optional = true }
pbjson-types = { version = "0.6", optional = true }
pbjson = { version = "0.7", optional = true }
pbjson-types = { version = "0.7", optional = true }
serde = { version = "1.0", optional = true }
diesel = { version = "2.2", features = ["postgres_backend"], optional = true }
[build-dependencies]
tonic-build = { version = "0.11", features = [
tonic-build = { version = "0.12", features = [
"prost",
], default-features = false }
pbjson-build = "0.6"
pbjson-build = "0.7"

24
api/rust/src/gw.rs vendored
View File

@ -95,7 +95,7 @@ impl UplinkFrame {
})
}
uplink_tx_info_legacy::ModulationInfo::FskModulationInfo(info) => {
modulation::Parameters::Fsk(info.clone())
modulation::Parameters::Fsk(*info)
}
uplink_tx_info_legacy::ModulationInfo::LrFhssModulationInfo(info) => {
modulation::Parameters::LrFhss(LrFhssModulationInfo {
@ -120,9 +120,9 @@ impl UplinkFrame {
self.rx_info = Some(UplinkRxInfo {
gateway_id: hex::encode(&rx_info.gateway_id),
uplink_id: rng.gen::<u32>(),
gw_time: rx_info.time.clone(),
gw_time: rx_info.time,
ns_time: None,
time_since_gps_epoch: rx_info.time_since_gps_epoch.clone(),
time_since_gps_epoch: rx_info.time_since_gps_epoch,
fine_time_since_gps_epoch: None,
rssi: rx_info.rssi,
snr: rx_info.lora_snr as f32,
@ -130,7 +130,7 @@ impl UplinkFrame {
rf_chain: rx_info.rf_chain,
board: rx_info.board,
antenna: rx_info.antenna,
location: rx_info.location.clone(),
location: rx_info.location,
context: rx_info.context.clone(),
metadata: rx_info.metadata.clone(),
crc_status: rx_info.crc_status,
@ -195,23 +195,19 @@ impl DownlinkFrame {
Some(timing::Parameters::Immediately(v)) => {
tx_info_legacy.timing = DownlinkTiming::Immediately.into();
tx_info_legacy.timing_info = Some(
downlink_tx_info_legacy::TimingInfo::ImmediatelyTimingInfo(
v.clone(),
),
downlink_tx_info_legacy::TimingInfo::ImmediatelyTimingInfo(*v),
);
}
Some(timing::Parameters::Delay(v)) => {
tx_info_legacy.timing = DownlinkTiming::Delay.into();
tx_info_legacy.timing_info = Some(
downlink_tx_info_legacy::TimingInfo::DelayTimingInfo(v.clone()),
);
tx_info_legacy.timing_info =
Some(downlink_tx_info_legacy::TimingInfo::DelayTimingInfo(*v));
}
Some(timing::Parameters::GpsEpoch(v)) => {
tx_info_legacy.timing = DownlinkTiming::GpsEpoch.into();
tx_info_legacy.timing_info =
Some(downlink_tx_info_legacy::TimingInfo::GpsEpochTimingInfo(
v.clone(),
));
tx_info_legacy.timing_info = Some(
downlink_tx_info_legacy::TimingInfo::GpsEpochTimingInfo(*v),
);
}
_ => {}
}

View File

@ -2,13 +2,6 @@ include!(concat!(env!("OUT_DIR"), "/internal/internal.rs"));
#[cfg(feature = "json")]
include!(concat!(env!("OUT_DIR"), "/internal/internal.serde.rs"));
#[cfg(feature = "diesel")]
use diesel::{backend::Backend, deserialize, serialize, sql_types::Binary};
#[cfg(feature = "diesel")]
use prost::Message;
#[cfg(feature = "diesel")]
use std::io::Cursor;
impl DeviceSession {
pub fn get_a_f_cnt_down(&self) -> u32 {
if self.mac_version().to_string().starts_with("1.0") {
@ -30,28 +23,3 @@ impl DeviceSession {
}
}
}
#[cfg(feature = "diesel")]
impl<ST, DB> deserialize::FromSql<ST, DB> for DeviceSession
where
DB: Backend,
*const [u8]: deserialize::FromSql<ST, DB>,
{
fn from_sql(value: DB::RawValue<'_>) -> deserialize::Result<Self> {
let bytes = <Vec<u8> as deserialize::FromSql<ST, DB>>::from_sql(value)?;
Ok(DeviceSession::decode(&mut Cursor::new(bytes))?)
}
}
#[cfg(feature = "diesel")]
impl serialize::ToSql<Binary, diesel::pg::Pg> for DeviceSession
where
[u8]: serialize::ToSql<Binary, diesel::pg::Pg>,
{
fn to_sql(&self, out: &mut serialize::Output<'_, '_, diesel::pg::Pg>) -> serialize::Result {
<[u8] as serialize::ToSql<Binary, diesel::pg::Pg>>::to_sql(
&self.encode_to_vec(),
&mut out.reborrow(),
)
}
}

View File

@ -1,6 +1,6 @@
[package]
name = "backend"
version = "4.9.0-test.4"
version = "4.9.0"
authors = ["Orne Brocaar <info@brocaar.com>"]
edition = "2018"
publish = false

View File

@ -3,14 +3,14 @@
description = "Library for building external ChirpStack integrations"
homepage = "https://www.chirpstack.io/"
license = "MIT"
version = "4.9.0-test.4"
version = "4.9.0"
authors = ["Orne Brocaar <info@brocaar.com>"]
edition = "2021"
repository = "https://github.com/chirpstack/chirpstack"
[dependencies]
chirpstack_api = { path = "../api/rust", version = "4.9.0-test.1" }
redis = { version = "0.25", features = [
redis = { version = "0.26", features = [
"cluster-async",
"tokio-rustls-comp",
] }
@ -23,7 +23,7 @@
], default-features = true }
async-trait = "0.1.79"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.38", features = ["macros", "rt-multi-thread"] }
lazy_static = "1.5"
tokio = { version = "1.36", features = ["macros", "rt-multi-thread"] }
lazy_static = "1.4"
serde_json = "1.0"
toml = "0.8"

View File

@ -215,7 +215,7 @@ impl Integration {
info!(key = %k, "Event received from Redis stream");
match k.as_ref() {
"up" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl = integration_pb::UplinkEvent::decode(
&mut Cursor::new(b),
)?;
@ -223,21 +223,21 @@ impl Integration {
}
}
"join" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl =
integration_pb::JoinEvent::decode(&mut Cursor::new(b))?;
tokio::spawn(join_event(pl));
}
}
"ack" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl =
integration_pb::AckEvent::decode(&mut Cursor::new(b))?;
tokio::spawn(ack_event(pl));
}
}
"txack" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl = integration_pb::TxAckEvent::decode(
&mut Cursor::new(b),
)?;
@ -245,7 +245,7 @@ impl Integration {
}
}
"status" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl = integration_pb::StatusEvent::decode(
&mut Cursor::new(b),
)?;
@ -253,14 +253,14 @@ impl Integration {
}
}
"log" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl =
integration_pb::LogEvent::decode(&mut Cursor::new(b))?;
tokio::spawn(log_event(pl));
}
}
"location" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl = integration_pb::LocationEvent::decode(
&mut Cursor::new(b),
)?;
@ -268,7 +268,7 @@ impl Integration {
}
}
"integration" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl = integration_pb::IntegrationEvent::decode(
&mut Cursor::new(b),
)?;

View File

@ -3,7 +3,7 @@
description = "ChirpStack is an open-source LoRaWAN(TM) Network Server"
repository = "https://github.com/chirpstack/chirpstack"
homepage = "https://www.chirpstack.io/"
version = "4.9.0-test.4"
version = "4.9.0"
authors = ["Orne Brocaar <info@brocaar.com>"]
edition = "2021"
publish = false
@ -26,23 +26,19 @@
email_address = "0.2"
diesel = { version = "2.2", features = [
"chrono",
"uuid",
"serde_json",
"numeric",
"64-column-tables",
"postgres_backend",
] }
diesel_migrations = { version = "2.2" }
diesel-async = { version = "0.5", features = [
"deadpool",
"postgres",
"async-connection-wrapper",
] }
tokio-postgres = "0.7"
tokio-postgres-rustls = "0.11"
tokio-postgres = { version = "0.7", optional = true }
tokio-postgres-rustls = { version = "0.12", optional = true }
bigdecimal = "0.4"
redis = { version = "0.25.2", features = ["tls-rustls", "tokio-rustls-comp"] }
deadpool-redis = { version = "0.15", features = ["cluster"] }
redis = { version = "0.26", features = ["tls-rustls", "tokio-rustls-comp"] }
deadpool-redis = { version = "0.16", features = ["cluster"] }
# Logging
tracing = "0.1"
@ -53,11 +49,7 @@
], default-features = true }
# ChirpStack API definitions
chirpstack_api = { path = "../api/rust", features = [
"default",
"internal",
"diesel",
] }
chirpstack_api = { path = "../api/rust", features = ["default", "internal"] }
lrwn = { path = "../lrwn", features = [
"serde",
"diesel",
@ -78,8 +70,8 @@
sha2 = "0.10"
urlencoding = "2.1"
geohash = "0.13"
gcp_auth = "0.11"
lapin = "2.3"
gcp_auth = "0.12"
lapin = { version = "2.5", default-features = false }
tokio-executor-trait = "2.1"
tokio-reactor-trait = "1.1"
rdkafka = { version = "0.36", default-features = false, features = [
@ -88,26 +80,26 @@
] }
# gRPC and Protobuf
tonic = "0.11"
tonic-web = "0.11"
tonic-reflection = "0.11"
tonic = "0.12"
tonic-web = "0.12"
tonic-reflection = "0.12"
tokio = { version = "1.38", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1"
prost-types = "0.12"
prost = "0.12"
pbjson-types = "0.6"
prost-types = "0.13"
prost = "0.13"
pbjson-types = "0.7"
# gRPC and HTTP multiplexing
warp = { version = "0.3", features = ["tls"], default-features = false }
hyper = "0.14"
tower = "0.4"
axum = "0.7"
axum-server = { version = "0.7.1", features = ["tls-rustls-no-provider"] }
tower = { version = "0.4" }
futures = "0.3"
futures-util = "0.3"
http = "0.2"
http-body = "0.4"
http = "1.1"
http-body = "1.0"
rust-embed = "8.5"
mime_guess = "2.0"
tower-http = { version = "0.4", features = ["trace", "auth"] }
tower-http = { version = "0.5", features = ["trace", "auth"] }
# Error handling
thiserror = "1.0"
@ -117,7 +109,12 @@
pbkdf2 = { version = "0.12", features = ["simple"] }
rand_core = { version = "0.6", features = ["std"] }
jsonwebtoken = "9.2"
rustls = "0.22"
rustls = { version = "0.23", default-features = false, features = [
"logging",
"std",
"tls12",
"ring",
] }
rustls-native-certs = "0.7"
rustls-pemfile = "2.1"
pem = "3.0"
@ -151,11 +148,14 @@
aes = "0.8"
rand = "0.8"
base64 = "0.22"
async-recursion = "1.0"
async-recursion = "1.1"
regex = "1.10"
petgraph = "0.6"
prometheus-client = "0.22"
pin-project = "1.1"
scoped-futures = { version = "0.1", features = ["std"] }
signal-hook = "0.3"
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] }
# Development and testing
[dev-dependencies]
@ -164,6 +164,23 @@
dotenv = "0.15"
[features]
default = ["postgres"]
postgres = [
"tokio-postgres",
"tokio-postgres-rustls",
"diesel/postgres_backend",
"diesel/serde_json",
"diesel/uuid",
"diesel-async/postgres",
"lrwn/postgres",
]
sqlite = [
"diesel/sqlite",
"diesel/returning_clauses_for_sqlite_3_35",
"lrwn/sqlite",
"diesel-async/sync-connection-wrapper",
"diesel-async/sqlite",
]
test-all-integrations = [
"test-integration-amqp",
"test-integration-kafka",

View File

@ -1,20 +1,21 @@
.PHONY: dist
PKG_VERSION := $(shell cargo metadata --no-deps --format-version 1 | jq -r '.packages[0].version')
DATABASE ?= postgres
debug-amd64:
cross build --target x86_64-unknown-linux-musl
cross build --target x86_64-unknown-linux-musl --no-default-features --features="$(DATABASE)"
release-amd64:
cross build --target x86_64-unknown-linux-musl --release
cross build --target x86_64-unknown-linux-musl --release --no-default-features --features="$(DATABASE)"
dist:
# Keep these in this order, as aarch64 is based on Debian Buster (older),
# the others on Bullseye. For some build scripts we want to build against
# least recent LIBC.
cross build --target aarch64-unknown-linux-musl --release
cross build --target x86_64-unknown-linux-musl --release
cross build --target armv7-unknown-linux-musleabihf --release
cross build --target aarch64-unknown-linux-musl --release --no-default-features --features="$(DATABASE)"
cross build --target x86_64-unknown-linux-musl --release --no-default-features --features="$(DATABASE)"
cross build --target armv7-unknown-linux-musleabihf --release --no-default-features --features="$(DATABASE)"
cargo deb --target x86_64-unknown-linux-musl --no-build --no-strip
cargo deb --target armv7-unknown-linux-musleabihf --no-build --no-strip
@ -40,10 +41,38 @@ dist:
test:
cargo fmt --check
cargo clippy --no-deps
TZ=UTC cargo test
cargo clippy --no-deps --no-default-features --features="$(DATABASE)"
TZ=UTC cargo test --no-default-features --features="$(DATABASE)"
test-all:
cargo fmt --check
cargo clippy --no-deps
TZ=UTC cargo test --features test-all-integrations
cargo clippy --no-deps --no-default-features --features="$(DATABASE)"
TZ=UTC cargo test --no-default-features --features="$(DATABASE),test-all-integrations"
migration-generate:
ifeq ($(NAME),)
@echo "You must provide a NAME parameter, e.g. make migration-generate NAME=test-migration"
else
diesel --config-file diesel_$(DATABASE).toml migration --migration-dir migrations_$(DATABASE) generate $(NAME)
endif
migration-run: chirpstack_test.sqlite
ifeq ($(DATABASE),postgres)
diesel --config-file diesel_postgres.toml migration --migration-dir migrations_postgres run
endif
ifeq ($(DATABASE),sqlite)
DATABASE_URL="chirpstack_test.sqlite" diesel --config-file diesel_sqlite.toml migration --migration-dir migrations_sqlite run
sed -i 's/Timestamp/TimestamptzSqlite/g' src/storage/schema_sqlite.rs
endif
migration-revert: chirpstack_test.sqlite
ifeq ($(DATABASE),postgres)
diesel --config-file diesel_postgres.toml migration --migration-dir migrations_postgres revert
endif
ifeq ($(DATABASE),sqlite)
DATABASE_URL="chirpstack_test.sqlite" diesel --config-file diesel_sqlite.toml migration --migration-dir migrations_sqlite revert
sed -i 's/Timestamp/TimestamptzSqlite/g' src/storage/schema_sqlite.rs
endif
chirpstack_test.sqlite:
DATABASE_URL=chirpstack_test.sqlite diesel --config-file diesel_sqlite.toml setup --migration-dir migrations_sqlite

View File

@ -2,4 +2,4 @@
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/storage/schema.rs"
file = "src/storage/schema_postgres.rs"

View File

@ -0,0 +1,5 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/storage/schema_sqlite.rs"

View File

@ -0,0 +1,18 @@
drop table relay_gateway;
drop table multicast_group_gateway;
drop table multicast_group_queue_item;
drop table multicast_group_device;
drop table multicast_group;
drop table device_queue_item;
drop table device_keys;
drop table device;
drop table device_profile;
drop table api_key;
drop table application_integration;
drop table application;
drop table gateway;
drop table tenant_user;
drop table tenant;
drop table "user";
drop table relay_device;
drop table device_profile_template;

View File

@ -0,0 +1,392 @@
-- user
create table "user" (
id text not null primary key,
external_id text null,
created_at datetime not null,
updated_at datetime not null,
is_admin boolean not null,
is_active boolean not null,
email text not null,
email_verified boolean not null,
password_hash varchar(200) not null,
note text not null
);
create unique index idx_user_email on "user"(email);
create unique index idx_user_external_id on "user"(external_id);
insert into "user" (
id,
created_at,
updated_at,
is_admin,
is_active,
email,
email_verified,
password_hash,
note
) values (
'05244f12-6daf-4e1f-8315-c66783a0ab56',
datetime('now'),
datetime('now'),
TRUE,
TRUE,
'admin',
FALSE,
'$pbkdf2-sha512$i=1,l=64$l8zGKtxRESq3PA2kFhHRWA$H3lGMxOt55wjwoc+myeOoABofJY9oDpldJa7fhqdjbh700V6FLPML75UmBOt9J5VFNjAL1AvqCozA1HJM0QVGA',
''
);
-- tenant
create table tenant (
id text not null primary key,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
description text not null,
can_have_gateways boolean not null,
max_device_count integer not null,
max_gateway_count integer not null,
private_gateways_up boolean not null,
private_gateways_down boolean not null default FALSE,
tags text not null default '{}'
);
-- sqlite has advanced text search with https://www.sqlite.org/fts5.html
-- but looks like it is for a full table and not specific per column, to investigate
create index idx_tenant_name_trgm on "tenant"(name);
insert into "tenant" (
id,
created_at,
updated_at,
name,
description,
can_have_gateways,
max_device_count,
max_gateway_count,
private_gateways_up
) values (
'52f14cd4-c6f1-4fbd-8f87-4025e1d49242',
datetime('now'),
datetime('now'),
'ChirpStack',
'',
TRUE,
0,
0,
FALSE
);
-- tenant user
create table tenant_user (
tenant_id text not null references tenant on delete cascade,
user_id text not null references "user" on delete cascade,
created_at datetime not null,
updated_at datetime not null,
is_admin boolean not null,
is_device_admin boolean not null,
is_gateway_admin boolean not null,
primary key (tenant_id, user_id)
);
create index idx_tenant_user_user_id on tenant_user (user_id);
-- gateway
create table gateway (
gateway_id blob not null primary key,
tenant_id text not null references tenant on delete cascade,
created_at datetime not null,
updated_at datetime not null,
last_seen_at datetime,
name varchar(100) not null,
description text not null,
latitude double precision not null,
longitude double precision not null,
altitude real not null,
stats_interval_secs integer not null,
tls_certificate blob,
tags text not null,
properties text not null
);
create index idx_gateway_tenant_id on gateway (tenant_id);
create index idx_gateway_name_trgm on gateway (name);
create index idx_gateway_id_trgm on gateway (hex(gateway_id));
create index idx_gateway_tags on gateway (tags);
-- application
create table application (
id text not null primary key,
tenant_id text not null references tenant on delete cascade,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
description text not null,
mqtt_tls_cert blob,
tags text not null default '{}'
);
create index idx_application_tenant_id on application (tenant_id);
create index idx_application_name_trgm on application (name);
-- application integration
create table application_integration (
application_id text not null references application on delete cascade,
kind varchar(20) not null,
created_at datetime not null,
updated_at datetime not null,
configuration text not null,
primary key (application_id, kind)
);
-- api-key
create table api_key (
id text not null primary key,
created_at datetime not null,
name varchar(100) not null,
is_admin boolean not null,
tenant_id text null references tenant on delete cascade
);
create index idx_api_key_tenant_id on api_key (tenant_id);
-- device-profile
create table device_profile (
id text not null primary key,
tenant_id text not null references tenant on delete cascade,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
region varchar(10) not null,
mac_version varchar(10) not null,
reg_params_revision varchar(20) not null,
adr_algorithm_id varchar(100) not null,
payload_codec_runtime varchar(20) not null,
uplink_interval integer not null,
device_status_req_interval integer not null,
supports_otaa boolean not null,
supports_class_b boolean not null,
supports_class_c boolean not null,
class_b_timeout integer not null,
class_b_ping_slot_nb_k integer not null,
class_b_ping_slot_dr smallint not null,
class_b_ping_slot_freq bigint not null,
class_c_timeout integer not null,
abp_rx1_delay smallint not null,
abp_rx1_dr_offset smallint not null,
abp_rx2_dr smallint not null,
abp_rx2_freq bigint not null,
tags text not null,
payload_codec_script text not null default '',
flush_queue_on_activate boolean not null default FALSE,
description text not null default '',
measurements text not null default '{}',
auto_detect_measurements boolean not null default TRUE,
region_config_id varchar(100) null,
is_relay boolean not null default FALSE,
is_relay_ed boolean not null default FALSE,
relay_ed_relay_only boolean not null default FALSE,
relay_enabled boolean not null default FALSE,
relay_cad_periodicity smallint not null default 0,
relay_default_channel_index smallint not null default 0,
relay_second_channel_freq bigint not null default 0,
relay_second_channel_dr smallint not null default 0,
relay_second_channel_ack_offset smallint not null default 0,
relay_ed_activation_mode smallint not null default 0,
relay_ed_smart_enable_level smallint not null default 0,
relay_ed_back_off smallint not null default 0,
relay_ed_uplink_limit_bucket_size smallint not null default 0,
relay_ed_uplink_limit_reload_rate smallint not null default 0,
relay_join_req_limit_reload_rate smallint not null default 0,
relay_notify_limit_reload_rate smallint not null default 0,
relay_global_uplink_limit_reload_rate smallint not null default 0,
relay_overall_limit_reload_rate smallint not null default 0,
relay_join_req_limit_bucket_size smallint not null default 0,
relay_notify_limit_bucket_size smallint not null default 0,
relay_global_uplink_limit_bucket_size smallint not null default 0,
relay_overall_limit_bucket_size smallint not null default 0,
allow_roaming boolean not null default TRUE,
rx1_delay smallint not null default 0
);
create index idx_device_profile_tenant_id on device_profile (tenant_id);
create index idx_device_profile_name_trgm on device_profile (name);
create index idx_device_profile_tags on device_profile (tags);
-- device
create table device (
dev_eui blob not null primary key,
application_id text not null references application on delete cascade,
device_profile_id text not null references device_profile on delete cascade,
created_at datetime not null,
updated_at datetime not null,
last_seen_at datetime,
scheduler_run_after datetime,
name varchar(100) not null,
description text not null,
external_power_source boolean not null,
battery_level numeric(5, 2),
margin int,
dr smallint,
latitude double precision,
longitude double precision,
altitude real,
dev_addr blob,
enabled_class char(1) not null,
skip_fcnt_check boolean not null,
is_disabled boolean not null,
tags text not null,
variables text not null,
join_eui blob not null default x'00000000',
secondary_dev_addr blob,
device_session blob
);
create index idx_device_application_id on device (application_id);
create index idx_device_device_profile_id on device (device_profile_id);
create index idx_device_name_trgm on device (name);
create index idx_device_dev_eui_trgm on device (hex(dev_eui));
create index idx_device_dev_addr_trgm on device (hex(dev_addr));
create index idx_device_tags on device (tags);
create table device_keys (
dev_eui blob not null primary key references device on delete cascade,
created_at datetime not null,
updated_at datetime not null,
nwk_key blob not null,
app_key blob not null,
dev_nonces text not null,
join_nonce int not null
);
create table device_queue_item (
id text not null primary key,
dev_eui blob references device on delete cascade not null,
created_at datetime not null,
f_port smallint not null,
confirmed boolean not null,
data blob not null,
is_pending boolean not null,
f_cnt_down bigint null,
timeout_after datetime,
is_encrypted boolean default FALSE not null
);
create index idx_device_queue_item_dev_eui on device_queue_item (dev_eui);
create index idx_device_queue_item_created_at on device_queue_item (created_at);
create index idx_device_queue_item_timeout_after on device_queue_item (timeout_after);
-- multicast groups
create table multicast_group (
id text not null primary key,
application_id text not null references application on delete cascade,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
region varchar(10) not null,
mc_addr blob not null,
mc_nwk_s_key blob not null,
mc_app_s_key blob not null,
f_cnt bigint not null,
group_type char(1) not null,
dr smallint not null,
frequency bigint not null,
class_b_ping_slot_nb_k smallint not null,
class_c_scheduling_type varchar(20) not null default 'delay'
);
create index idx_multicast_group_application_id on multicast_group (application_id);
create index idx_multicast_group_name_trgm on multicast_group (name);
create table multicast_group_device (
multicast_group_id text not null references multicast_group on delete cascade,
dev_eui blob not null references device on delete cascade,
created_at datetime not null,
primary key (multicast_group_id, dev_eui)
);
create table multicast_group_queue_item (
id text not null primary key,
created_at datetime not null,
scheduler_run_after datetime not null,
multicast_group_id text not null references multicast_group on delete cascade,
gateway_id blob not null references gateway on delete cascade,
f_cnt bigint not null,
f_port smallint not null,
data blob not null,
emit_at_time_since_gps_epoch bigint
);
create index idx_multicast_group_queue_item_multicast_group_id on multicast_group_queue_item (multicast_group_id);
create index idx_multicast_group_queue_item_scheduler_run_after on multicast_group_queue_item (scheduler_run_after);
create table device_profile_template (
id text not null primary key,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
description text not null,
vendor varchar(100) not null,
firmware varchar(100) not null,
region varchar(10) not null,
mac_version varchar(10) not null,
reg_params_revision varchar(20) not null,
adr_algorithm_id varchar(100) not null,
payload_codec_runtime varchar(20) not null,
payload_codec_script text not null,
uplink_interval integer not null,
device_status_req_interval integer not null,
flush_queue_on_activate boolean not null,
supports_otaa boolean not null,
supports_class_b boolean not null,
supports_class_c boolean not null,
class_b_timeout integer not null,
class_b_ping_slot_nb_k integer not null,
class_b_ping_slot_dr smallint not null,
class_b_ping_slot_freq bigint not null,
class_c_timeout integer not null,
abp_rx1_delay smallint not null,
abp_rx1_dr_offset smallint not null,
abp_rx2_dr smallint not null,
abp_rx2_freq bigint not null,
tags text not null,
measurements text not null default '{}',
auto_detect_measurements boolean not null default TRUE
);
create table multicast_group_gateway (
multicast_group_id text not null references multicast_group on delete cascade,
gateway_id blob not null references gateway on delete cascade,
created_at datetime not null,
primary key (multicast_group_id, gateway_id)
);
create table relay_device (
relay_dev_eui blob not null references device on delete cascade,
dev_eui blob not null references device on delete cascade,
created_at datetime not null,
primary key (relay_dev_eui, dev_eui)
);
create index idx_tenant_tags on tenant (tags);
create index idx_application_tags on application (tags);
create index idx_device_dev_addr on device (dev_addr);
create index idx_device_secondary_dev_addr on device (secondary_dev_addr);
-- relay gateway
create table relay_gateway (
tenant_id text not null references tenant on delete cascade,
relay_id blob not null,
created_at datetime not null,
updated_at datetime not null,
last_seen_at datetime,
name varchar(100) not null,
description text not null,
stats_interval_secs integer not null,
region_config_id varchar(100) not null,
primary key (tenant_id, relay_id)
);

View File

@ -44,7 +44,7 @@ impl ApplicationService for Application {
.await?;
let a = application::Application {
tenant_id,
tenant_id: tenant_id.into(),
name: req_app.name.clone(),
description: req_app.description.clone(),
tags: fields::KeyValue::new(req_app.tags.clone()),
@ -119,7 +119,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update(application::Application {
id: app_id,
id: app_id.into(),
name: req_app.name.to_string(),
description: req_app.description.to_string(),
tags: fields::KeyValue::new(req_app.tags.clone()),
@ -279,7 +279,7 @@ impl ApplicationService for Application {
.await?;
let i = application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::Http,
configuration: application::IntegrationConfiguration::Http(
application::HttpConfiguration {
@ -367,7 +367,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::Http,
configuration: application::IntegrationConfiguration::Http(
application::HttpConfiguration {
@ -438,7 +438,7 @@ impl ApplicationService for Application {
.await?;
let i = application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::InfluxDb,
configuration: application::IntegrationConfiguration::InfluxDb(
application::InfluxDbConfiguration {
@ -535,7 +535,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::InfluxDb,
configuration: application::IntegrationConfiguration::InfluxDb(
application::InfluxDbConfiguration {
@ -610,7 +610,7 @@ impl ApplicationService for Application {
.await?;
let i = application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::ThingsBoard,
configuration: application::IntegrationConfiguration::ThingsBoard(
application::ThingsBoardConfiguration {
@ -689,7 +689,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::ThingsBoard,
configuration: application::IntegrationConfiguration::ThingsBoard(
application::ThingsBoardConfiguration {
@ -755,7 +755,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::MyDevices,
configuration: application::IntegrationConfiguration::MyDevices(
application::MyDevicesConfiguration {
@ -832,7 +832,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::MyDevices,
configuration: application::IntegrationConfiguration::MyDevices(
application::MyDevicesConfiguration {
@ -907,7 +907,7 @@ impl ApplicationService for Application {
};
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::LoraCloud,
configuration: application::IntegrationConfiguration::LoraCloud(
application::LoraCloudConfiguration {
@ -1032,7 +1032,7 @@ impl ApplicationService for Application {
};
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::LoraCloud,
configuration: application::IntegrationConfiguration::LoraCloud(
application::LoraCloudConfiguration {
@ -1119,7 +1119,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::GcpPubSub,
configuration: application::IntegrationConfiguration::GcpPubSub(
application::GcpPubSubConfiguration {
@ -1202,7 +1202,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::GcpPubSub,
configuration: application::IntegrationConfiguration::GcpPubSub(
application::GcpPubSubConfiguration {
@ -1271,7 +1271,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::AwsSns,
configuration: application::IntegrationConfiguration::AwsSns(
application::AwsSnsConfiguration {
@ -1354,7 +1354,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::AwsSns,
configuration: application::IntegrationConfiguration::AwsSns(
application::AwsSnsConfiguration {
@ -1424,7 +1424,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::AzureServiceBus,
configuration: application::IntegrationConfiguration::AzureServiceBus(
application::AzureServiceBusConfiguration {
@ -1506,7 +1506,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::AzureServiceBus,
configuration: application::IntegrationConfiguration::AzureServiceBus(
application::AzureServiceBusConfiguration {
@ -1574,7 +1574,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::PilotThings,
configuration: application::IntegrationConfiguration::PilotThings(
application::PilotThingsConfiguration {
@ -1653,7 +1653,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::PilotThings,
configuration: application::IntegrationConfiguration::PilotThings(
application::PilotThingsConfiguration {
@ -1730,7 +1730,7 @@ impl ApplicationService for Application {
}
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::Ifttt,
configuration: application::IntegrationConfiguration::Ifttt(
application::IftttConfiguration {
@ -1814,7 +1814,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::Ifttt,
configuration: application::IntegrationConfiguration::Ifttt(
application::IftttConfiguration {
@ -1947,7 +1947,7 @@ pub mod test {
let mut create_req = Request::new(create_req);
create_req
.extensions_mut()
.insert(AuthID::User(u.id.clone()));
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let create_resp = service.create(create_req).await.unwrap();
let create_resp = create_resp.get_ref();
@ -1956,7 +1956,9 @@ pub mod test {
id: create_resp.id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Application {
@ -1978,7 +1980,9 @@ pub mod test {
}),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id.clone()));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update(up_req).await.unwrap();
//get
@ -1986,7 +1990,9 @@ pub mod test {
id: create_resp.id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Application {
@ -2006,7 +2012,9 @@ pub mod test {
offset: 0,
};
let mut list_req = Request::new(list_req);
list_req.extensions_mut().insert(AuthID::User(u.id.clone()));
list_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let list_resp = service.list(list_req).await.unwrap();
assert_eq!(1, list_resp.get_ref().total_count);
assert_eq!(1, list_resp.get_ref().result.len());
@ -2016,14 +2024,18 @@ pub mod test {
id: create_resp.id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.delete(del_req).await.unwrap();
let del_req = api::DeleteApplicationRequest {
id: create_resp.id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
}
@ -2038,7 +2050,7 @@ pub mod test {
.unwrap();
application::create(application::Application {
tenant_id: t.id.clone(),
tenant_id: t.id,
name: "test-app".into(),
..Default::default()
})
@ -2059,7 +2071,7 @@ pub mod test {
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
let mut req = Request::new(req);
req.extensions_mut().insert(AuthID::User(user_id.clone()));
req.extensions_mut().insert(AuthID::User(*user_id));
req
}

View File

@ -95,14 +95,11 @@ pub mod test {
assert_eq!(claim, decoded);
// different key
assert_eq!(
true,
AuthClaim::decode(&token, other_secret.as_ref()).is_err()
);
assert!(AuthClaim::decode(&token, other_secret.as_ref()).is_err());
// expired
claim.exp = Some(exp.timestamp() as usize);
let token = claim.encode(secrect.as_ref()).unwrap();
assert_eq!(true, AuthClaim::decode(&token, secrect.as_ref()).is_err());
assert!(AuthClaim::decode(&token, secrect.as_ref()).is_err());
}
}

View File

@ -6,7 +6,7 @@ pub mod claims;
pub mod error;
pub mod validator;
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum AuthID {
None,
User(Uuid),

File diff suppressed because it is too large Load Diff

View File

@ -5,18 +5,28 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use axum::{
body::Bytes,
response::{IntoResponse, Json, Response},
Router,
};
use chrono::Utc;
use http::StatusCode;
use redis::streams::StreamReadReply;
use rustls::{
server::{NoClientAuth, WebPkiClientVerifier},
ServerConfig,
};
use serde::Serialize;
use tokio::sync::oneshot;
use tokio::task;
use tracing::{error, info, span, warn, Instrument, Level};
use uuid::Uuid;
use warp::{http::StatusCode, Filter, Reply};
use crate::backend::{joinserver, keywrap, roaming};
use crate::downlink::data_fns;
use crate::helpers::errors::PrintFullError;
use crate::helpers::tls::{get_root_certs, load_cert, load_key};
use crate::storage::{
device, error::Error as StorageError, get_async_redis_conn, passive_roaming, redis_key,
};
@ -39,47 +49,47 @@ pub async fn setup() -> Result<()> {
let addr: SocketAddr = conf.backend_interfaces.bind.parse()?;
info!(bind = %conf.backend_interfaces.bind, "Setting up backend interfaces API");
let routes = warp::post()
.and(warp::body::content_length_limit(1024 * 16))
.and(warp::body::aggregate())
.then(handle_request);
let app = Router::new().fallback(handle_request);
if !conf.backend_interfaces.ca_cert.is_empty()
|| !conf.backend_interfaces.tls_cert.is_empty()
|| !conf.backend_interfaces.tls_key.is_empty()
{
let mut w = warp::serve(routes).tls();
if !conf.backend_interfaces.ca_cert.is_empty() {
w = w.client_auth_required_path(&conf.backend_interfaces.ca_cert);
}
if !conf.backend_interfaces.tls_cert.is_empty() {
w = w.cert_path(&conf.backend_interfaces.tls_cert);
}
if !conf.backend_interfaces.tls_key.is_empty() {
w = w.key_path(&conf.backend_interfaces.tls_key);
}
w.run(addr).await;
let mut server_config = ServerConfig::builder()
.with_client_cert_verifier(if conf.backend_interfaces.ca_cert.is_empty() {
Arc::new(NoClientAuth)
} else {
let root_certs = get_root_certs(Some(conf.backend_interfaces.ca_cert.clone()))?;
WebPkiClientVerifier::builder(root_certs.into()).build()?
})
.with_single_cert(
load_cert(&conf.backend_interfaces.tls_cert).await?,
load_key(&conf.backend_interfaces.tls_key).await?,
)?;
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
axum_server::bind_rustls(
addr,
axum_server::tls_rustls::RustlsConfig::from_config(Arc::new(server_config)),
)
.serve(app.into_make_service())
.await?;
} else {
warp::serve(routes).run(addr).await;
axum_server::bind(addr)
.serve(app.into_make_service())
.await?;
}
Ok(())
}
pub async fn handle_request(mut body: impl warp::Buf) -> http::Response<hyper::Body> {
let mut b: Vec<u8> = vec![];
while body.has_remaining() {
b.extend_from_slice(body.chunk());
let cnt = body.chunk().len();
body.advance(cnt);
}
pub async fn handle_request(b: Bytes) -> Response {
let b: Vec<u8> = b.into();
let bp: BasePayload = match serde_json::from_slice(&b) {
Ok(v) => v,
Err(e) => {
return warp::reply::with_status(e.to_string(), StatusCode::BAD_REQUEST)
.into_response();
return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
}
};
@ -87,7 +97,7 @@ pub async fn handle_request(mut body: impl warp::Buf) -> http::Response<hyper::B
_handle_request(bp, b).instrument(span).await
}
pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hyper::Body> {
pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> Response {
info!("Request received");
let sender_client = {
@ -100,7 +110,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
let msg = format!("Error decoding SenderID: {}", e);
let pl = bp.to_base_payload_result(backend::ResultCode::MalformedRequest, &msg);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
return Json(&pl).into_response();
}
};
@ -111,7 +121,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
let msg = format!("Unknown SenderID: {}", sender_id);
let pl = bp.to_base_payload_result(backend::ResultCode::UnknownSender, &msg);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
return Json(&pl).into_response();
}
}
} else if bp.sender_id.len() == 3 {
@ -123,7 +133,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
let msg = format!("Error decoding SenderID: {}", e);
let pl = bp.to_base_payload_result(backend::ResultCode::MalformedRequest, &msg);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
return Json(&pl).into_response();
}
};
@ -134,7 +144,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
let msg = format!("Unknown SenderID: {}", sender_id);
let pl = bp.to_base_payload_result(backend::ResultCode::UnknownSender, &msg);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
return Json(&pl).into_response();
}
}
} else {
@ -145,7 +155,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
"Invalid SenderID length",
);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
return Json(&pl).into_response();
}
};
@ -156,7 +166,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
error!(error = %e.full(), "Handle async answer error");
}
});
return warp::reply::with_status("", StatusCode::OK).into_response();
return (StatusCode::OK, "").into_response();
}
match bp.message_type {
@ -165,11 +175,11 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
MessageType::XmitDataReq => handle_xmit_data_req(sender_client, bp, &b).await,
MessageType::HomeNSReq => handle_home_ns_req(sender_client, bp, &b).await,
// Unknown message
_ => warp::reply::with_status(
"Handler for {:?} is not implemented",
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Handler for {:?} is not implemented", bp.message_type),
)
.into_response(),
.into_response(),
}
}
@ -201,7 +211,7 @@ async fn handle_pr_start_req(
sender_client: Arc<backend::Client>,
bp: backend::BasePayload,
b: &[u8],
) -> http::Response<hyper::Body> {
) -> Response {
if sender_client.is_async() {
let b = b.to_vec();
task::spawn(async move {
@ -222,18 +232,17 @@ async fn handle_pr_start_req(
error!(error = %e.full(), transaction_id = bp.transaction_id, "Send async PRStartAns error");
}
});
warp::reply::with_status("", StatusCode::OK).into_response()
(StatusCode::OK, "").into_response()
} else {
match _handle_pr_start_req(b).await {
Ok(ans) => {
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
Err(e) => {
let ans = err_to_response(e, &bp);
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
}
}
@ -363,7 +372,7 @@ async fn handle_pr_stop_req(
sender_client: Arc<backend::Client>,
bp: backend::BasePayload,
b: &[u8],
) -> http::Response<hyper::Body> {
) -> Response {
if sender_client.is_async() {
let b = b.to_vec();
task::spawn(async move {
@ -383,18 +392,17 @@ async fn handle_pr_stop_req(
error!(error = %e.full(), "Send async PRStopAns error");
}
});
warp::reply::with_status("", StatusCode::OK).into_response()
(StatusCode::OK, "").into_response()
} else {
match _handle_pr_stop_req(b).await {
Ok(ans) => {
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
Err(e) => {
let ans = err_to_response(e, &bp);
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
}
}
@ -430,13 +438,13 @@ async fn handle_xmit_data_req(
sender_client: Arc<backend::Client>,
bp: backend::BasePayload,
b: &[u8],
) -> http::Response<hyper::Body> {
) -> Response {
let pl: backend::XmitDataReqPayload = match serde_json::from_slice(b) {
Ok(v) => v,
Err(e) => {
let ans = err_to_response(anyhow::Error::new(e), &bp);
log_request_response(&bp, b, &ans).await;
return warp::reply::json(&ans).into_response();
return Json(&ans).into_response();
}
};
@ -465,18 +473,17 @@ async fn handle_xmit_data_req(
error!(error = %e.full(), "Send async XmitDataAns error");
}
});
warp::reply::with_status("", StatusCode::OK).into_response()
(StatusCode::OK, "").into_response()
} else {
match _handle_xmit_data_req(pl).await {
Ok(ans) => {
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
Err(e) => {
let ans = err_to_response(e, &bp);
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
}
}
@ -529,13 +536,13 @@ async fn handle_home_ns_req(
sender_client: Arc<backend::Client>,
bp: backend::BasePayload,
b: &[u8],
) -> http::Response<hyper::Body> {
) -> Response {
let pl: backend::HomeNSReqPayload = match serde_json::from_slice(b) {
Ok(v) => v,
Err(e) => {
let ans = err_to_response(anyhow::Error::new(e), &bp);
log_request_response(&bp, b, &ans).await;
return warp::reply::json(&ans).into_response();
return Json(&ans).into_response();
}
};
@ -560,17 +567,17 @@ async fn handle_home_ns_req(
}
});
warp::reply::with_status("", StatusCode::OK).into_response()
(StatusCode::OK, "").into_response()
} else {
match _handle_home_ns_req(pl).await {
Ok(ans) => {
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
Err(e) => {
let ans = err_to_response(e, &bp);
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
}
}
@ -587,7 +594,7 @@ async fn _handle_home_ns_req(pl: backend::HomeNSReqPayload) -> Result<backend::H
})
}
async fn handle_async_ans(bp: &BasePayload, b: &[u8]) -> Result<http::Response<hyper::Body>> {
async fn handle_async_ans(bp: &BasePayload, b: &[u8]) -> Result<Response> {
let transaction_id = bp.transaction_id;
let key = redis_key(format!("backend:async:{}", transaction_id));
@ -609,7 +616,7 @@ async fn handle_async_ans(bp: &BasePayload, b: &[u8]) -> Result<http::Response<h
.query_async(&mut get_async_redis_conn().await?)
.await?;
Ok(warp::reply().into_response())
Ok((StatusCode::OK, "").into_response())
}
pub async fn get_async_receiver(
@ -651,7 +658,7 @@ pub async fn get_async_receiver(
for (k, v) in &stream_id.map {
match k.as_ref() {
"pl" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let _ = tx.send(b.to_vec());
return;
}

View File

@ -64,8 +64,8 @@ impl DeviceService for Device {
let d = device::Device {
dev_eui,
application_id: app_id,
device_profile_id: dp_id,
application_id: app_id.into(),
device_profile_id: dp_id.into(),
name: req_d.name.clone(),
description: req_d.description.clone(),
skip_fcnt_check: req_d.skip_fcnt_check,
@ -191,8 +191,8 @@ impl DeviceService for Device {
// update
let _ = device::update(device::Device {
dev_eui,
application_id: app_id,
device_profile_id: dp_id,
application_id: app_id.into(),
device_profile_id: dp_id.into(),
name: req_d.name.clone(),
description: req_d.description.clone(),
skip_fcnt_check: req_d.skip_fcnt_check,
@ -533,7 +533,7 @@ impl DeviceService for Device {
dp.reset_session_to_boot_params(&mut ds);
let mut device_changeset = device::DeviceChangeset {
device_session: Some(Some(ds)),
device_session: Some(Some(ds.into())),
dev_addr: Some(Some(dev_addr)),
secondary_dev_addr: Some(None),
..Default::default()
@ -695,20 +695,18 @@ impl DeviceService for Device {
.await?;
let start = SystemTime::try_from(
req.start
*req.start
.as_ref()
.ok_or_else(|| anyhow!("start is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
let end = SystemTime::try_from(
req.end
*req.end
.as_ref()
.ok_or_else(|| anyhow!("end is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
@ -819,20 +817,18 @@ impl DeviceService for Device {
.await?;
let start = SystemTime::try_from(
req.start
*req.start
.as_ref()
.ok_or_else(|| anyhow!("start is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
let end = SystemTime::try_from(
req.end
*req.end
.as_ref()
.ok_or_else(|| anyhow!("end is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
@ -1089,7 +1085,7 @@ impl DeviceService for Device {
}
let qi = device_queue::DeviceQueueItem {
id: Uuid::new_v4(),
id: Uuid::new_v4().into(),
dev_eui,
f_port: req_qi.f_port as i16,
confirmed: req_qi.confirmed,
@ -1256,7 +1252,7 @@ pub mod test {
// create application
let app = application::create(application::Application {
name: "test-app".into(),
tenant_id: t.id.clone(),
tenant_id: t.id,
..Default::default()
})
.await
@ -1265,7 +1261,7 @@ pub mod test {
// create device-profile
let dp = device_profile::create(device_profile::DeviceProfile {
name: "test-dp".into(),
tenant_id: t.id.clone(),
tenant_id: t.id,
..Default::default()
})
.await
@ -1421,12 +1417,10 @@ pub mod test {
);
// flush dev nonces
let _ = device_keys::set_dev_nonces(
&EUI64::from_str("0102030405060708").unwrap(),
&vec![1, 2, 3],
)
.await
.unwrap();
let _ =
device_keys::set_dev_nonces(&EUI64::from_str("0102030405060708").unwrap(), &[1, 2, 3])
.await
.unwrap();
let flush_dev_nonces_req = get_request(
&u.id,
api::FlushDevNoncesRequest {
@ -1545,11 +1539,14 @@ pub mod test {
dev.dev_eui,
&device::DeviceChangeset {
dev_addr: Some(Some(DevAddr::from_be_bytes([1, 2, 3, 4]))),
device_session: Some(Some(internal::DeviceSession {
dev_addr: vec![1, 2, 3, 4],
js_session_key_id: vec![8, 7, 6, 5, 4, 3, 2, 1],
..Default::default()
})),
device_session: Some(Some(
internal::DeviceSession {
dev_addr: vec![1, 2, 3, 4],
js_session_key_id: vec![8, 7, 6, 5, 4, 3, 2, 1],
..Default::default()
}
.into(),
)),
..Default::default()
},
)
@ -1574,14 +1571,17 @@ pub mod test {
device::partial_update(
dev.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(internal::DeviceSession {
dev_addr: vec![1, 2, 3, 4],
app_s_key: Some(common::KeyEnvelope {
kek_label: "test-key".into(),
aes_key: vec![8, 7, 6, 5, 4, 3, 2, 1, 8, 7, 6, 5, 4, 3, 2, 1],
}),
..Default::default()
})),
device_session: Some(Some(
internal::DeviceSession {
dev_addr: vec![1, 2, 3, 4],
app_s_key: Some(common::KeyEnvelope {
kek_label: "test-key".into(),
aes_key: vec![8, 7, 6, 5, 4, 3, 2, 1, 8, 7, 6, 5, 4, 3, 2, 1],
}),
..Default::default()
}
.into(),
)),
..Default::default()
},
)
@ -1618,7 +1618,7 @@ pub mod test {
.await
.unwrap();
let dev_addr = DevAddr::from_str(&get_random_dev_addr_resp.get_ref().dev_addr).unwrap();
let mut dev_addr_copy = dev_addr.clone();
let mut dev_addr_copy = dev_addr;
dev_addr_copy.set_dev_addr_prefix(NetID::from_str("000000").unwrap().dev_addr_prefix());
assert_eq!(dev_addr, dev_addr_copy);
@ -1666,10 +1666,10 @@ pub mod test {
assert_eq!(2, get_queue_resp.total_count);
assert_eq!(2, get_queue_resp.result.len());
assert_eq!(vec![3, 2, 1], get_queue_resp.result[0].data);
assert_eq!(false, get_queue_resp.result[0].is_encrypted);
assert!(!get_queue_resp.result[0].is_encrypted);
assert_eq!(0, get_queue_resp.result[0].f_cnt_down);
assert_eq!(vec![1, 2, 3], get_queue_resp.result[1].data);
assert_eq!(true, get_queue_resp.result[1].is_encrypted);
assert!(get_queue_resp.result[1].is_encrypted);
assert_eq!(10, get_queue_resp.result[1].f_cnt_down);
// get next FCntDown (from queue)
@ -1726,7 +1726,7 @@ pub mod test {
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
let mut req = Request::new(req);
req.extensions_mut().insert(AuthID::User(user_id.clone()));
req.extensions_mut().insert(AuthID::User(*user_id));
req
}
}

View File

@ -45,7 +45,7 @@ impl DeviceProfileService for DeviceProfile {
.await?;
let mut dp = device_profile::DeviceProfile {
tenant_id,
tenant_id: tenant_id.into(),
name: req_dp.name.clone(),
description: req_dp.description.clone(),
region: req_dp.region().from_proto(),
@ -247,7 +247,7 @@ impl DeviceProfileService for DeviceProfile {
// update
let _ = device_profile::update(device_profile::DeviceProfile {
id: dp_id,
id: dp_id.into(),
name: req_dp.name.clone(),
description: req_dp.description.clone(),
region: req_dp.region().from_proto(),
@ -600,7 +600,7 @@ pub mod test {
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
let mut req = Request::new(req);
req.extensions_mut().insert(AuthID::User(user_id.clone()));
req.extensions_mut().insert(AuthID::User(*user_id));
req
}
}

View File

@ -444,7 +444,7 @@ pub mod test {
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
let mut req = Request::new(req);
req.extensions_mut().insert(AuthID::User(user_id.clone()));
req.extensions_mut().insert(AuthID::User(*user_id));
req
}
}

View File

@ -58,7 +58,7 @@ impl GatewayService for Gateway {
let gw = gateway::Gateway {
gateway_id: EUI64::from_str(&req_gw.gateway_id).map_err(|e| e.status())?,
tenant_id,
tenant_id: tenant_id.into(),
name: req_gw.name.clone(),
description: req_gw.description.clone(),
latitude: lat,
@ -345,20 +345,18 @@ impl GatewayService for Gateway {
.await?;
let start = SystemTime::try_from(
req.start
*req.start
.as_ref()
.ok_or_else(|| anyhow!("start is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
let end = SystemTime::try_from(
req.end
*req.end
.as_ref()
.ok_or_else(|| anyhow!("end is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
@ -634,20 +632,18 @@ impl GatewayService for Gateway {
.await?;
let start = SystemTime::try_from(
req.start
*req.start
.as_ref()
.ok_or_else(|| anyhow!("start is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
let end = SystemTime::try_from(
req.end
*req.end
.as_ref()
.ok_or_else(|| anyhow!("end is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
@ -855,8 +851,8 @@ impl GatewayService for Gateway {
.await?;
let _ = gateway::update_relay_gateway(gateway::RelayGateway {
tenant_id,
relay_id,
tenant_id: tenant_id.into(),
name: req_relay.name.clone(),
description: req_relay.description.clone(),
stats_interval_secs: req_relay.stats_interval as i32,
@ -1034,7 +1030,7 @@ pub mod test {
let mut create_req = Request::new(create_req);
create_req
.extensions_mut()
.insert(AuthID::User(u.id.clone()));
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.create(create_req).await.unwrap();
// get
@ -1042,7 +1038,9 @@ pub mod test {
gateway_id: "0102030405060708".into(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Gateway {
@ -1076,7 +1074,9 @@ pub mod test {
}),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id.clone()));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update(up_req).await.unwrap();
// get
@ -1084,7 +1084,9 @@ pub mod test {
gateway_id: "0102030405060708".into(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Gateway {
@ -1111,7 +1113,9 @@ pub mod test {
..Default::default()
};
let mut list_req = Request::new(list_req);
list_req.extensions_mut().insert(AuthID::User(u.id.clone()));
list_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let list_resp = service.list(list_req).await.unwrap();
assert_eq!(1, list_resp.get_ref().total_count);
assert_eq!(1, list_resp.get_ref().result.len());
@ -1121,14 +1125,18 @@ pub mod test {
gateway_id: "0102030405060708".into(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.delete(del_req).await.unwrap();
let del_req = api::DeleteGatewayRequest {
gateway_id: "0102030405060708".into(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
}
@ -1160,7 +1168,7 @@ pub mod test {
// create gateway
let _ = gateway::create(gateway::Gateway {
gateway_id: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
tenant_id: t.id.clone(),
tenant_id: t.id,
name: "test-gw".into(),
..Default::default()
})
@ -1172,7 +1180,7 @@ pub mod test {
// insert stats
let mut m = metrics::Record {
kind: metrics::Kind::ABSOLUTE,
time: now.into(),
time: now,
metrics: HashMap::new(),
};
@ -1206,7 +1214,7 @@ pub mod test {
let mut stats_req = Request::new(stats_req);
stats_req
.extensions_mut()
.insert(AuthID::User(u.id.clone()));
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let stats_resp = service.get_metrics(stats_req).await.unwrap();
let stats_resp = stats_resp.get_ref();
assert_eq!(
@ -1257,7 +1265,7 @@ pub mod test {
// create gateway
let _ = gateway::create(gateway::Gateway {
gateway_id: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
tenant_id: t.id.clone(),
tenant_id: t.id,
name: "test-gw".into(),
..Default::default()
})
@ -1269,7 +1277,7 @@ pub mod test {
// insert stats
let mut m = metrics::Record {
kind: metrics::Kind::COUNTER,
time: now.into(),
time: now,
metrics: HashMap::new(),
};
@ -1297,9 +1305,7 @@ pub mod test {
end: Some(now_st.into()),
};
let mut stats_req = Request::new(stats_req);
stats_req
.extensions_mut()
.insert(AuthID::User(u.id.clone()));
stats_req.extensions_mut().insert(AuthID::User(u.id.into()));
let stats_resp = service.get_duty_cycle_metrics(stats_req).await.unwrap();
let stats_resp = stats_resp.get_ref();
assert_eq!(
@ -1373,7 +1379,9 @@ pub mod test {
relay_id: "01020304".into(),
};
let mut get_relay_req = Request::new(get_relay_req);
get_relay_req.extensions_mut().insert(AuthID::User(u.id));
get_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let get_relay_resp = service.get_relay_gateway(get_relay_req).await.unwrap();
assert_eq!(
Some(api::RelayGateway {
@ -1399,7 +1407,9 @@ pub mod test {
}),
};
let mut up_relay_req = Request::new(up_relay_req);
up_relay_req.extensions_mut().insert(AuthID::User(u.id));
up_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let _ = service.update_relay_gateway(up_relay_req).await.unwrap();
// get relay gateway
@ -1408,7 +1418,9 @@ pub mod test {
relay_id: "01020304".into(),
};
let mut get_relay_req = Request::new(get_relay_req);
get_relay_req.extensions_mut().insert(AuthID::User(u.id));
get_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let get_relay_resp = service.get_relay_gateway(get_relay_req).await.unwrap();
assert_eq!(
Some(api::RelayGateway {
@ -1429,7 +1441,9 @@ pub mod test {
offset: 0,
};
let mut list_relay_req = Request::new(list_relay_req);
list_relay_req.extensions_mut().insert(AuthID::User(u.id));
list_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let list_relay_resp = service.list_relay_gateways(list_relay_req).await.unwrap();
assert_eq!(1, list_relay_resp.get_ref().total_count);
assert_eq!(1, list_relay_resp.get_ref().result.len());
@ -1440,7 +1454,9 @@ pub mod test {
relay_id: "01020304".into(),
};
let mut del_relay_req = Request::new(del_relay_req);
del_relay_req.extensions_mut().insert(AuthID::User(u.id));
del_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let del_relay_resp = service.delete_relay_gateway(del_relay_req).await;
assert!(del_relay_resp.is_ok());
@ -1449,7 +1465,9 @@ pub mod test {
relay_id: "01020304".into(),
};
let mut del_relay_req = Request::new(del_relay_req);
del_relay_req.extensions_mut().insert(AuthID::User(u.id));
del_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let del_relay_resp = service.delete_relay_gateway(del_relay_req).await;
assert!(del_relay_resp.is_err());
}

View File

@ -0,0 +1,141 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::ready;
use http::{header::CONTENT_TYPE, Request, Response};
use http_body::Body;
use pin_project::pin_project;
use tower::{Layer, Service};
type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[pin_project(project = GrpcMultiplexFutureEnumProj)]
enum GrpcMultiplexFutureEnum<FS, FO> {
Grpc {
#[pin]
future: FS,
},
Other {
#[pin]
future: FO,
},
}
#[pin_project]
pub struct GrpcMultiplexFuture<FS, FO> {
#[pin]
future: GrpcMultiplexFutureEnum<FS, FO>,
}
impl<ResBody, FS, FO, ES, EO> Future for GrpcMultiplexFuture<FS, FO>
where
ResBody: Body,
FS: Future<Output = Result<Response<ResBody>, ES>>,
FO: Future<Output = Result<Response<ResBody>, EO>>,
ES: Into<BoxError> + Send,
EO: Into<BoxError> + Send,
{
type Output = Result<Response<ResBody>, Box<dyn std::error::Error + Send + Sync + 'static>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.future.project() {
GrpcMultiplexFutureEnumProj::Grpc { future } => future.poll(cx).map_err(Into::into),
GrpcMultiplexFutureEnumProj::Other { future } => future.poll(cx).map_err(Into::into),
}
}
}
#[derive(Debug, Clone)]
pub struct GrpcMultiplexService<S, O> {
grpc: S,
other: O,
grpc_ready: bool,
other_ready: bool,
}
impl<ReqBody, ResBody, S, O> Service<Request<ReqBody>> for GrpcMultiplexService<S, O>
where
ResBody: Body,
S: Service<Request<ReqBody>, Response = Response<ResBody>>,
O: Service<Request<ReqBody>, Response = Response<ResBody>>,
S::Error: Into<BoxError> + Send,
O::Error: Into<BoxError> + Send,
{
type Response = S::Response;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Future = GrpcMultiplexFuture<S::Future, O::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
match (self.grpc_ready, self.other_ready) {
(true, true) => {
return Ok(()).into();
}
(false, _) => {
ready!(self.grpc.poll_ready(cx)).map_err(Into::into)?;
self.grpc_ready = true;
}
(_, false) => {
ready!(self.other.poll_ready(cx)).map_err(Into::into)?;
self.other_ready = true;
}
}
}
}
fn call(&mut self, request: Request<ReqBody>) -> Self::Future {
assert!(self.grpc_ready);
assert!(self.other_ready);
if is_grpc_request(&request) {
GrpcMultiplexFuture {
future: GrpcMultiplexFutureEnum::Grpc {
future: self.grpc.call(request),
},
}
} else {
GrpcMultiplexFuture {
future: GrpcMultiplexFutureEnum::Other {
future: self.other.call(request),
},
}
}
}
}
#[derive(Debug, Clone)]
pub struct GrpcMultiplexLayer<O> {
other: O,
}
impl<O> GrpcMultiplexLayer<O> {
pub fn new(other: O) -> Self {
Self { other }
}
}
impl<S, O> Layer<S> for GrpcMultiplexLayer<O>
where
O: Clone,
{
type Service = GrpcMultiplexService<S, O>;
fn layer(&self, grpc: S) -> Self::Service {
GrpcMultiplexService {
grpc,
other: self.other.clone(),
grpc_ready: false,
other_ready: false,
}
}
}
fn is_grpc_request<B>(req: &Request<B>) -> bool {
req.headers()
.get(CONTENT_TYPE)
.map(|content_type| content_type.as_bytes())
.filter(|content_type| content_type.starts_with(b"application/grpc"))
.is_some()
}

View File

@ -287,7 +287,11 @@ impl InternalService for Internal {
let tenant_id = if req_key.tenant_id.is_empty() {
None
} else {
Some(Uuid::from_str(&req_key.tenant_id).map_err(|e| e.status())?)
Some(
Uuid::from_str(&req_key.tenant_id)
.map_err(|e| e.status())?
.into(),
)
};
if req_key.is_admin && tenant_id.is_some() {
@ -312,7 +316,7 @@ impl InternalService for Internal {
let ak = api_key::ApiKey {
name: req_key.name.clone(),
is_admin: req_key.is_admin,
tenant_id,
tenant_id: tenant_id.map(|u| u.into()),
..Default::default()
};

View File

@ -1,4 +1,3 @@
use std::convert::Infallible;
use std::time::{Duration, Instant};
use std::{
future::Future,
@ -7,23 +6,27 @@ use std::{
};
use anyhow::Result;
use futures::future::{self, Either, TryFutureExt};
use hyper::{service::make_service_fn, Server};
use axum::{response::IntoResponse, routing::get, Router};
use http::{
header::{self, HeaderMap, HeaderValue},
Request, StatusCode, Uri,
};
use pin_project::pin_project;
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::Histogram;
use rust_embed::RustEmbed;
use tokio::{task, try_join};
use tokio::task;
use tokio::try_join;
use tonic::transport::Server as TonicServer;
use tonic::Code;
use tonic_reflection::server::Builder as TonicReflectionBuilder;
use tonic_web::GrpcWebLayer;
use tower::{Service, ServiceBuilder};
use tower::util::ServiceExt;
use tower::Service;
use tower_http::trace::TraceLayer;
use tracing::{error, info};
use warp::{http::header::HeaderValue, path::Tail, reply::Response, Filter, Rejection, Reply};
use chirpstack_api::api::application_service_server::ApplicationServiceServer;
use chirpstack_api::api::device_profile_service_server::DeviceProfileServiceServer;
@ -51,6 +54,7 @@ pub mod device_profile;
pub mod device_profile_template;
pub mod error;
pub mod gateway;
mod grpc_multiplex;
pub mod helpers;
pub mod internal;
pub mod monitoring;
@ -89,210 +93,125 @@ lazy_static! {
};
}
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
#[derive(RustEmbed)]
#[folder = "../ui/build"]
struct Asset;
type BoxError = Box<dyn std::error::Error + Send + Sync>;
pub async fn setup() -> Result<()> {
let conf = config::get();
let addr = conf.api.bind.parse()?;
let bind = conf.api.bind.parse()?;
info!(bind = %conf.api.bind, "Setting up API interface");
info!(bind = %bind, "Setting up API interface");
// Taken from the tonic hyper_warp_multiplex example:
// https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp_multiplex/server.rs#L101
let service = make_service_fn(move |_| {
// tonic gRPC service
let tonic_service = TonicServer::builder()
.accept_http1(true)
.layer(GrpcWebLayer::new())
.add_service(
TonicReflectionBuilder::configure()
.register_encoded_file_descriptor_set(chirpstack_api::api::DESCRIPTOR)
.build()
.unwrap(),
)
.add_service(InternalServiceServer::with_interceptor(
internal::Internal::new(
validator::RequestValidator::new(),
conf.api.secret.clone(),
),
auth::auth_interceptor,
))
.add_service(ApplicationServiceServer::with_interceptor(
application::Application::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(DeviceProfileServiceServer::with_interceptor(
device_profile::DeviceProfile::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(DeviceProfileTemplateServiceServer::with_interceptor(
device_profile_template::DeviceProfileTemplate::new(
validator::RequestValidator::new(),
),
auth::auth_interceptor,
))
.add_service(TenantServiceServer::with_interceptor(
tenant::Tenant::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(DeviceServiceServer::with_interceptor(
device::Device::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(UserServiceServer::with_interceptor(
user::User::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(GatewayServiceServer::with_interceptor(
gateway::Gateway::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(MulticastGroupServiceServer::with_interceptor(
multicast::MulticastGroup::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(RelayServiceServer::with_interceptor(
relay::Relay::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.into_service();
let mut tonic_service = ServiceBuilder::new()
.layer(
TraceLayer::new_for_grpc()
.make_span_with(|req: &http::Request<hyper::Body>| {
tracing::info_span!(
"gRPC",
uri = %req.uri().path(),
)
})
.on_request(OnRequest {})
.on_response(OnResponse {}),
)
.layer(ApiLogger {})
.service(tonic_service);
let web = Router::new()
.route("/auth/oidc/login", get(oidc::login_handler))
.route("/auth/oidc/callback", get(oidc::callback_handler))
.route("/auth/oauth2/login", get(oauth2::login_handler))
.route("/auth/oauth2/callback", get(oauth2::callback_handler))
.fallback(service_static_handler)
.into_service()
.map_response(|r| r.map(tonic::body::boxed));
// HTTP service
let warp_service = warp::service(
warp::path!("auth" / "oidc" / "login")
.and_then(oidc::login_handler)
.or(warp::path!("auth" / "oidc" / "callback")
.and(warp::query::<oidc::CallbackArgs>())
.and_then(oidc::callback_handler))
.or(warp::path!("auth" / "oauth2" / "login").and_then(oauth2::login_handler))
.or(warp::path!("auth" / "oauth2" / "callback")
.and(warp::query::<oauth2::CallbackArgs>())
.and_then(oauth2::callback_handler))
.or(warp::path::tail().and_then(http_serve)),
);
let mut warp_service = ServiceBuilder::new()
.layer(
TraceLayer::new_for_http()
.make_span_with(|req: &http::Request<hyper::Body>| {
tracing::info_span!(
"http",
method = req.method().as_str(),
uri = %req.uri().path(),
version = ?req.version(),
)
})
.on_request(OnRequest {})
.on_response(OnResponse {}),
)
.service(warp_service);
future::ok::<_, Infallible>(tower::service_fn(
move |req: hyper::Request<hyper::Body>| match req.method() {
&hyper::Method::GET => Either::Left(
warp_service
.call(req)
.map_ok(|res| res.map(EitherBody::Right))
.map_err(Error::from),
),
_ => Either::Right(
tonic_service
.call(req)
.map_ok(|res| res.map(EitherBody::Left))
.map_err(Error::from),
),
},
let grpc = TonicServer::builder()
.accept_http1(true)
.layer(
TraceLayer::new_for_grpc()
.make_span_with(|req: &Request<_>| {
tracing::info_span!(
"gRPC",
uri = %req.uri().path(),
)
})
.on_request(OnRequest {})
.on_response(OnResponse {}),
)
.layer(grpc_multiplex::GrpcMultiplexLayer::new(web))
.layer(ApiLoggerLayer {})
.layer(GrpcWebLayer::new())
.add_service(
TonicReflectionBuilder::configure()
.register_encoded_file_descriptor_set(chirpstack_api::api::DESCRIPTOR)
.build()
.unwrap(),
)
.add_service(InternalServiceServer::with_interceptor(
internal::Internal::new(validator::RequestValidator::new(), conf.api.secret.clone()),
auth::auth_interceptor,
))
});
.add_service(ApplicationServiceServer::with_interceptor(
application::Application::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(DeviceProfileServiceServer::with_interceptor(
device_profile::DeviceProfile::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(DeviceProfileTemplateServiceServer::with_interceptor(
device_profile_template::DeviceProfileTemplate::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(TenantServiceServer::with_interceptor(
tenant::Tenant::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(DeviceServiceServer::with_interceptor(
device::Device::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(UserServiceServer::with_interceptor(
user::User::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(GatewayServiceServer::with_interceptor(
gateway::Gateway::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(MulticastGroupServiceServer::with_interceptor(
multicast::MulticastGroup::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(RelayServiceServer::with_interceptor(
relay::Relay::new(validator::RequestValidator::new()),
auth::auth_interceptor,
));
let backend_handle = tokio::spawn(backend::setup());
let monitoring_handle = tokio::spawn(monitoring::setup());
let api_handle = tokio::spawn(Server::bind(&addr).serve(service));
let grpc_handle = tokio::spawn(grpc.serve(bind));
let _ = try_join!(api_handle, backend_handle, monitoring_handle)?;
tokio::spawn(async move {
if let Err(e) = try_join!(grpc_handle, backend_handle, monitoring_handle) {
error!(error = %e, "Setup API error");
std::process::exit(-1);
}
});
Ok(())
}
enum EitherBody<A, B> {
Left(A),
Right(B),
}
impl<A, B> http_body::Body for EitherBody<A, B>
where
A: http_body::Body + Send + Unpin,
B: http_body::Body<Data = A::Data> + Send + Unpin,
A::Error: Into<Error>,
B::Error: Into<Error>,
{
type Data = A::Data;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
fn is_end_stream(&self) -> bool {
match self {
EitherBody::Left(b) => b.is_end_stream(),
EitherBody::Right(b) => b.is_end_stream(),
}
}
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
}
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
}
}
}
fn map_option_err<T, U: Into<Error>>(err: Option<Result<T, U>>) -> Option<Result<T, Error>> {
err.map(|e| e.map_err(Into::into))
}
async fn http_serve(path: Tail) -> Result<impl Reply, Rejection> {
let mut path = path.as_str();
async fn service_static_handler(uri: Uri) -> impl IntoResponse {
let mut path = {
let mut chars = uri.path().chars();
chars.next();
chars.as_str()
};
if path.is_empty() {
path = "index.html";
}
let asset = Asset::get(path).ok_or_else(warp::reject::not_found)?;
let mime = mime_guess::from_path(path).first_or_octet_stream();
let mut res = Response::new(asset.data.into());
res.headers_mut().insert(
"content-type",
HeaderValue::from_str(mime.as_ref()).unwrap(),
);
Ok(res)
if let Some(asset) = Asset::get(path) {
let mime = mime_guess::from_path(path).first_or_octet_stream();
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_str(mime.as_ref()).unwrap(),
);
(StatusCode::OK, headers, asset.data.into())
} else {
(StatusCode::NOT_FOUND, HeaderMap::new(), vec![])
}
}
#[derive(Debug, Clone)]
@ -320,13 +239,14 @@ struct GrpcLabels {
status_code: String,
}
struct ApiLogger {}
#[derive(Debug, Clone)]
struct ApiLoggerLayer {}
impl<S> tower::Layer<S> for ApiLogger {
impl<S> tower::Layer<S> for ApiLoggerLayer {
type Service = ApiLoggerService<S>;
fn layer(&self, service: S) -> Self::Service {
ApiLoggerService { inner: service }
fn layer(&self, inner: S) -> Self::Service {
ApiLoggerService { inner }
}
}
@ -335,15 +255,15 @@ struct ApiLoggerService<S> {
inner: S,
}
impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for ApiLoggerService<S>
impl<ReqBody, ResBody, S> Service<http::Request<ReqBody>> for ApiLoggerService<S>
where
S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
ReqBody: http_body::Body,
ResBody: http_body::Body,
S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
S::Error: Into<BoxError> + Send,
{
type Response = S::Response;
type Error = S::Error;
type Future = ApiLoggerResponseFuture<S::Future>;
type Future = ApiLoggerFuture<S::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
@ -352,10 +272,10 @@ where
fn call(&mut self, request: http::Request<ReqBody>) -> Self::Future {
let uri = request.uri().path().to_string();
let uri_parts: Vec<&str> = uri.split('/').collect();
let response_future = self.inner.call(request);
let future = self.inner.call(request);
let start = Instant::now();
ApiLoggerResponseFuture {
response_future,
ApiLoggerFuture {
future,
start,
service: uri_parts.get(1).map(|v| v.to_string()).unwrap_or_default(),
method: uri_parts.get(2).map(|v| v.to_string()).unwrap_or_default(),
@ -364,25 +284,26 @@ where
}
#[pin_project]
struct ApiLoggerResponseFuture<F> {
struct ApiLoggerFuture<F> {
#[pin]
response_future: F,
future: F,
start: Instant,
service: String,
method: String,
}
impl<F, ResBody, Error> Future for ApiLoggerResponseFuture<F>
impl<ResBody, F, E> Future for ApiLoggerFuture<F>
where
F: Future<Output = Result<http::Response<ResBody>, Error>>,
ResBody: http_body::Body,
F: Future<Output = Result<http::Response<ResBody>, E>>,
E: Into<BoxError> + Send,
{
type Output = Result<http::Response<ResBody>, Error>;
type Output = Result<http::Response<ResBody>, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.response_future.poll(cx) {
match this.future.poll(cx) {
Poll::Ready(result) => {
if let Ok(response) = &result {
let status_code: i32 = match response.headers().get("grpc-status") {

View File

@ -1,51 +1,49 @@
use std::convert::Infallible;
use std::net::SocketAddr;
use anyhow::{Context, Result};
use axum::{
response::{IntoResponse, Response},
routing::get,
Router,
};
use diesel_async::RunQueryDsl;
use http::StatusCode;
use tracing::info;
use warp::{http::Response, http::StatusCode, Filter};
use crate::config;
use crate::monitoring::prometheus;
use crate::storage::{get_async_db_conn, get_async_redis_conn};
pub async fn setup() {
pub async fn setup() -> Result<()> {
let conf = config::get();
if conf.monitoring.bind.is_empty() {
return;
return Ok(());
}
let addr: SocketAddr = conf.monitoring.bind.parse().unwrap();
info!(bind = %conf.monitoring.bind, "Setting up monitoring endpoint");
let prom_endpoint = warp::get()
.and(warp::path!("metrics"))
.and_then(prometheus_handler);
let app = Router::new()
.route("/metrics", get(prometheus_handler))
.route("/health", get(health_handler));
let health_endpoint = warp::get()
.and(warp::path!("health"))
.and_then(health_handler);
let routes = prom_endpoint.or(health_endpoint);
warp::serve(routes).run(addr).await;
axum_server::bind(addr)
.serve(app.into_make_service())
.await?;
Ok(())
}
async fn prometheus_handler() -> Result<impl warp::Reply, Infallible> {
async fn prometheus_handler() -> Response {
let body = prometheus::encode_to_string().unwrap_or_default();
Ok(Response::builder().body(body))
body.into_response()
}
async fn health_handler() -> Result<impl warp::Reply, Infallible> {
async fn health_handler() -> Response {
if let Err(e) = _health_handler().await {
return Ok(warp::reply::with_status(
e.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
));
(StatusCode::SERVICE_UNAVAILABLE, e.to_string()).into_response()
} else {
(StatusCode::OK, "".to_string()).into_response()
}
Ok(warp::reply::with_status("OK".to_string(), StatusCode::OK))
}
async fn _health_handler() -> Result<()> {

View File

@ -47,7 +47,7 @@ impl MulticastGroupService for MulticastGroup {
.await?;
let mg = multicast::MulticastGroup {
application_id: app_id,
application_id: app_id.into(),
name: req_mg.name.clone(),
region: req_mg.region().from_proto(),
mc_addr: DevAddr::from_str(&req_mg.mc_addr).map_err(|e| e.status())?,
@ -154,7 +154,7 @@ impl MulticastGroupService for MulticastGroup {
.await?;
let _ = multicast::update(multicast::MulticastGroup {
id: mg_id,
id: mg_id.into(),
name: req_mg.name.clone(),
region: req_mg.region().from_proto(),
mc_addr: DevAddr::from_str(&req_mg.mc_addr).map_err(|e| e.status())?,
@ -408,7 +408,7 @@ impl MulticastGroupService for MulticastGroup {
.await?;
let f_cnt = downlink::multicast::enqueue(multicast::MulticastGroupQueueItem {
multicast_group_id: mg_id,
multicast_group_id: mg_id.into(),
f_port: req_enq.f_port as i16,
data: req_enq.data.clone(),
..Default::default()
@ -550,7 +550,7 @@ pub mod test {
// create application
let app = application::create(application::Application {
name: "test-app".into(),
tenant_id: t.id.clone(),
tenant_id: t.id,
..Default::default()
})
.await
@ -559,7 +559,7 @@ pub mod test {
// create device-profile
let dp = device_profile::create(device_profile::DeviceProfile {
name: "test-dp".into(),
tenant_id: t.id.clone(),
tenant_id: t.id,
..Default::default()
})
.await
@ -871,7 +871,7 @@ pub mod test {
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
let mut req = Request::new(req);
req.extensions_mut().insert(AuthID::User(user_id.clone()));
req.extensions_mut().insert(AuthID::User(*user_id));
req
}
}

View File

@ -1,7 +1,10 @@
use std::str::FromStr;
use anyhow::{Context, Result};
use axum::{
extract::Query,
response::{IntoResponse, Redirect, Response},
};
use chrono::Duration;
use http::StatusCode;
use oauth2::basic::BasicClient;
use oauth2::reqwest;
use oauth2::{
@ -11,7 +14,6 @@ use oauth2::{
use reqwest::header::AUTHORIZATION;
use serde::{Deserialize, Serialize};
use tracing::{error, trace};
use warp::{Rejection, Reply};
use crate::config;
use crate::helpers::errors::PrintFullError;
@ -26,7 +28,7 @@ struct ClerkUserinfo {
pub user_id: String,
}
#[derive(Serialize, Deserialize)]
#[derive(Deserialize)]
pub struct CallbackArgs {
pub code: String,
pub state: String,
@ -39,16 +41,12 @@ pub struct User {
pub external_id: String,
}
pub async fn login_handler() -> Result<impl Reply, Rejection> {
pub async fn login_handler() -> Response {
let client = match get_client() {
Ok(v) => v,
Err(e) => {
error!(error = %e.full(), "Get OAuth2 client error");
return Ok(warp::reply::with_status(
"Internal error",
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response());
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response();
}
};
@ -64,26 +62,16 @@ pub async fn login_handler() -> Result<impl Reply, Rejection> {
if let Err(e) = store_verifier(&csrf_token, &pkce_verifier).await {
error!(error = %e.full(), "Store verifier error");
return Ok(warp::reply::with_status(
"Internal error",
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response());
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response();
}
Ok(
warp::redirect::found(warp::http::Uri::from_str(auth_url.as_str()).unwrap())
.into_response(),
)
Redirect::temporary(auth_url.as_str()).into_response()
}
pub async fn callback_handler(args: CallbackArgs) -> Result<impl Reply, Rejection> {
// warp::redirect does not work with '#'.
Ok(warp::reply::with_header(
warp::http::StatusCode::PERMANENT_REDIRECT,
warp::http::header::LOCATION,
format!("/#/login?code={}&state={}", args.code, args.state),
))
pub async fn callback_handler(args: Query<CallbackArgs>) -> Response {
let args: CallbackArgs = args.0;
Redirect::permanent(&format!("/#/login?code={}&state={}", args.code, args.state))
.into_response()
}
fn get_client() -> Result<Client> {

View File

@ -1,8 +1,12 @@
use std::collections::HashMap;
use std::str::FromStr;
use anyhow::{Context, Result};
use axum::{
extract::Query,
response::{IntoResponse, Redirect, Response},
};
use chrono::Duration;
use http::StatusCode;
use openidconnect::core::{
CoreClient, CoreGenderClaim, CoreIdTokenClaims, CoreIdTokenVerifier, CoreProviderMetadata,
CoreResponseType,
@ -15,7 +19,6 @@ use openidconnect::{
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::{error, trace};
use warp::{Rejection, Reply};
use crate::config;
use crate::helpers::errors::PrintFullError;
@ -40,22 +43,18 @@ pub struct CustomClaims {
impl AdditionalClaims for CustomClaims {}
#[derive(Serialize, Deserialize)]
#[derive(Deserialize)]
pub struct CallbackArgs {
pub code: String,
pub state: String,
}
pub async fn login_handler() -> Result<impl Reply, Rejection> {
pub async fn login_handler() -> Response {
let client = match get_client().await {
Ok(v) => v,
Err(e) => {
error!(error = %e.full(), "Get OIDC client error");
return Ok(warp::reply::with_status(
"Internal error",
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response());
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response();
}
};
@ -72,26 +71,16 @@ pub async fn login_handler() -> Result<impl Reply, Rejection> {
if let Err(e) = store_nonce(&csrf_state, &nonce).await {
error!(error = %e.full(), "Store nonce error");
return Ok(warp::reply::with_status(
"Internal error",
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response());
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response();
}
Ok(
warp::redirect::found(warp::http::Uri::from_str(auth_url.as_str()).unwrap())
.into_response(),
)
Redirect::temporary(auth_url.as_str()).into_response()
}
pub async fn callback_handler(args: CallbackArgs) -> Result<impl Reply, Rejection> {
// warp::redirect does not work with '#'.
Ok(warp::reply::with_header(
warp::http::StatusCode::PERMANENT_REDIRECT,
warp::http::header::LOCATION,
format!("/#/login?code={}&state={}", args.code, args.state),
))
pub async fn callback_handler(args: Query<CallbackArgs>) -> Response {
let args: CallbackArgs = args.0;
Redirect::permanent(&format!("/#/login?code={}&state={}", args.code, args.state))
.into_response()
}
pub async fn get_user(code: &str, state: &str) -> Result<User> {

View File

@ -315,7 +315,7 @@ pub mod test {
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
let mut req = Request::new(req);
req.extensions_mut().insert(AuthID::User(user_id.clone()));
req.extensions_mut().insert(AuthID::User(*user_id));
req
}
}

View File

@ -122,7 +122,7 @@ impl TenantService for Tenant {
// update
let _ = tenant::update(tenant::Tenant {
id: tenant_id,
id: tenant_id.into(),
name: req_tenant.name.clone(),
description: req_tenant.description.clone(),
can_have_gateways: req_tenant.can_have_gateways,
@ -190,7 +190,7 @@ impl TenantService for Tenant {
let u = user::get(id).await.map_err(|e| e.status())?;
if !u.is_admin {
filters.user_id = Some(u.id);
filters.user_id = Some(u.id.into());
}
}
AuthID::Key(_) => {
@ -258,8 +258,8 @@ impl TenantService for Tenant {
.await?;
let _ = tenant::add_user(tenant::TenantUser {
tenant_id,
user_id,
tenant_id: tenant_id.into(),
user_id: user_id.into(),
is_admin: req_user.is_admin,
is_device_admin: req_user.is_device_admin,
is_gateway_admin: req_user.is_gateway_admin,
@ -342,8 +342,8 @@ impl TenantService for Tenant {
.await?;
tenant::update_user(tenant::TenantUser {
tenant_id,
user_id,
tenant_id: tenant_id.into(),
user_id: user_id.into(),
is_admin: req_user.is_admin,
is_device_admin: req_user.is_device_admin,
is_gateway_admin: req_user.is_gateway_admin,
@ -484,7 +484,7 @@ pub mod test {
let mut create_req = Request::new(create_req);
create_req
.extensions_mut()
.insert(AuthID::User(u.id.clone()));
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let create_resp = service.create(create_req).await.unwrap();
// get
@ -492,7 +492,9 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Tenant {
@ -520,7 +522,9 @@ pub mod test {
}),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id.clone()));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update(up_req).await.unwrap();
// get
@ -528,7 +532,9 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Tenant {
@ -551,7 +557,9 @@ pub mod test {
user_id: "".into(),
};
let mut list_req = Request::new(list_req);
list_req.extensions_mut().insert(AuthID::User(u.id.clone()));
list_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let list_resp = service.list(list_req).await.unwrap();
assert_eq!(1, list_resp.get_ref().total_count);
assert_eq!(1, list_resp.get_ref().result.len());
@ -561,14 +569,18 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.delete(del_req).await.unwrap();
let del_req = api::DeleteTenantRequest {
id: create_resp.get_ref().id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
}

View File

@ -64,8 +64,8 @@ impl UserService for User {
let tenant_id = Uuid::from_str(&tu.tenant_id).map_err(|e| e.status())?;
tenant::add_user(tenant::TenantUser {
tenant_id,
user_id: u.id,
tenant_id: tenant_id.into(),
user_id: u.id.into(),
is_admin: tu.is_admin,
is_device_admin: tu.is_device_admin,
is_gateway_admin: tu.is_gateway_admin,
@ -138,7 +138,7 @@ impl UserService for User {
// update
let _ = user::update(user::User {
id: user_id,
id: user_id.into(),
is_admin: req_user.is_admin,
is_active: req_user.is_active,
email: req_user.email.clone(),
@ -294,7 +294,7 @@ pub mod test {
let mut create_req = Request::new(create_req);
create_req
.extensions_mut()
.insert(AuthID::User(u.id.clone()));
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let create_resp = service.create(create_req).await.unwrap();
// get
@ -302,7 +302,9 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::User {
@ -328,7 +330,9 @@ pub mod test {
}),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id.clone()));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update(up_req).await.unwrap();
// get
@ -336,7 +340,9 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::User {
@ -356,7 +362,9 @@ pub mod test {
password: "newpassword".into(),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id.clone()));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update_password(up_req).await.unwrap();
// list
@ -365,7 +373,9 @@ pub mod test {
limit: 10,
};
let mut list_req = Request::new(list_req);
list_req.extensions_mut().insert(AuthID::User(u.id.clone()));
list_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let list_resp = service.list(list_req).await.unwrap();
// * Admin from migrations
// * User that we created for auth
@ -378,14 +388,18 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.delete(del_req).await.unwrap();
let del_req = api::DeleteUserRequest {
id: create_resp.get_ref().id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
@ -393,7 +407,9 @@ pub mod test {
id: u.id.to_string(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
}

View File

@ -3,7 +3,8 @@ use handlebars::{no_escape, Handlebars};
use super::super::config;
pub fn run() {
let template = r#"
let template = vec![
r#"
# Logging configuration
[logging]
@ -20,7 +21,9 @@ pub fn run() {
# Log as JSON.
json={{ logging.json }}
"#,
#[cfg(feature = "postgres")]
r#"
# PostgreSQL configuration.
[postgresql]
@ -46,8 +49,36 @@ pub fn run() {
# the server-certificate is not signed by a CA in the platform certificate
# store.
ca_cert="{{ postgresql.ca_cert }}"
"#,
#[cfg(feature = "sqlite")]
r#"
# SQLite configuration.
[sqlite]
# Sqlite DB path.
#
# Format example: sqlite:///<DATABASE>.
#
path="{{ sqlite.path }}"
# Max open connections.
#
# This sets the max. number of open connections that are allowed in the
# SQLite connection pool.
max_open_connections={{ sqlite.max_open_connections }}
# PRAGMAs.
#
# This configures the list of PRAGMAs that are executed to prepare the
# SQLite library. For a full list of available PRAGMAs see:
# https://www.sqlite.org/pragma.html
pragmas=[
{{#each sqlite.pragmas}}
"{{this}}",
{{/each}}
]
"#,
r#"
# Redis configuration.
[redis]
@ -944,6 +975,7 @@ pub fn run() {
kek="{{ this.kek }}"
{{/each}}
# UI configuration.
[ui]
# Tileserver URL.
@ -958,14 +990,14 @@ pub fn run() {
# default tileserver_url (OSM). If you configure a different tile-server, you
# might need to update the map_attribution.
map_attribution="{{ui.map_attribution}}"
"#;
"#].join("\n");
let mut reg = Handlebars::new();
reg.register_escape_fn(no_escape);
let conf = config::get();
println!(
"{}",
reg.render_template(template, &conf)
reg.render_template(&template, &conf)
.expect("render configfile error")
);
}

View File

@ -43,7 +43,7 @@ pub async fn run() -> Result<()> {
*dev_eui,
&storage::device::DeviceChangeset {
dev_addr: Some(Some(DevAddr::from_slice(&ds.dev_addr)?)),
device_session: Some(Some(ds)),
device_session: Some(Some(ds.into())),
..Default::default()
},
)

View File

@ -1,5 +1,8 @@
use anyhow::Result;
use tracing::info;
use futures::stream::StreamExt;
use signal_hook::consts::signal::{SIGINT, SIGTERM};
use signal_hook_tokio::Signals;
use tracing::{info, warn};
use crate::gateway;
use crate::{adr, api, backend, downlink, integration, region, storage};
@ -20,5 +23,10 @@ pub async fn run() -> Result<()> {
downlink::setup().await;
api::setup().await?;
let mut signals = Signals::new([SIGINT, SIGTERM]).unwrap();
if let Some(signal) = signals.next().await {
warn!(signal = ?signal, "Signal received, terminating process");
}
Ok(())
}

View File

@ -5,8 +5,11 @@ use std::str::FromStr;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use diesel::backend::Backend;
#[cfg(feature = "postgres")]
use diesel::pg::Pg;
use diesel::sql_types::Text;
#[cfg(feature = "sqlite")]
use diesel::sqlite::Sqlite;
use diesel::{deserialize, serialize};
use serde::{Deserialize, Serialize};
@ -40,6 +43,7 @@ where
}
}
#[cfg(feature = "postgres")]
impl serialize::ToSql<Text, Pg> for Codec
where
str: serialize::ToSql<Text, Pg>,
@ -49,6 +53,14 @@ where
}
}
#[cfg(feature = "sqlite")]
impl serialize::ToSql<Text, Sqlite> for Codec {
fn to_sql(&self, out: &mut serialize::Output<'_, '_, Sqlite>) -> serialize::Result {
out.set_value(self.to_string());
Ok(serialize::IsNull::No)
}
}
impl FromStr for Codec {
type Err = anyhow::Error;

View File

@ -19,6 +19,7 @@ pub struct Configuration {
pub logging: Logging,
pub postgresql: Postgresql,
pub redis: Redis,
pub sqlite: Sqlite,
pub api: Api,
pub gateway: Gateway,
pub network: Network,
@ -90,6 +91,29 @@ impl Default for Redis {
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct Sqlite {
pub path: String,
pub pragmas: Vec<String>,
pub max_open_connections: u32,
}
impl Default for Sqlite {
fn default() -> Self {
Sqlite {
path: "sqlite://chirpstack.sqlite".into(),
pragmas: vec![
// Set busy_timeout to avoid manually managing transaction business/contention
"busy_timeout = 1000".to_string(),
// Enable foreign-keys since it is off by default
"foreign_keys = ON".to_string(),
],
max_open_connections: 4,
}
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct Api {

View File

@ -127,8 +127,8 @@ pub mod test {
for _ in 0..100000 {
let offset = get_ping_offset(beacon_ts, &dev_addr, ping_nb).unwrap();
assert!(offset <= ping_period - 1);
beacon_ts = beacon_ts + *BEACON_PERIOD;
assert!(offset < ping_period);
beacon_ts += *BEACON_PERIOD;
}
}
}

View File

@ -363,7 +363,7 @@ impl Data {
trace!("Selecting downlink gateway");
let gw_down = helpers::select_downlink_gateway(
Some(self.tenant.id),
Some(self.tenant.id.into()),
&self.device.get_device_session()?.region_config_id,
self.network_conf.gateway_prefer_min_margin,
self.device_gateway_rx_info.as_mut().unwrap(),
@ -519,7 +519,8 @@ impl Data {
},
};
integration::ack_event(self.application.id, &self.device.variables, &pl).await;
integration::ack_event(self.application.id.into(), &self.device.variables, &pl)
.await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of timeout");
continue;
@ -549,7 +550,8 @@ impl Data {
.collect(),
};
integration::log_event(self.application.id, &self.device.variables, &pl).await;
integration::log_event(self.application.id.into(), &self.device.variables, &pl)
.await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of max. payload size");
continue;
@ -585,7 +587,8 @@ impl Data {
.collect(),
};
integration::log_event(self.application.id, &self.device.variables, &pl).await;
integration::log_event(self.application.id.into(), &self.device.variables, &pl)
.await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of invalid frame-counter");
continue;
@ -1349,7 +1352,7 @@ impl Data {
ds.last_device_status_request = Some(Utc::now().into());
}
Some(ts) => {
let ts: DateTime<Utc> = ts.clone().try_into().map_err(anyhow::Error::msg)?;
let ts: DateTime<Utc> = (*ts).try_into().map_err(anyhow::Error::msg)?;
let req_interval = Duration::from_secs(60 * 60 * 24)
/ self.device_profile.device_status_req_interval as u32;
@ -1711,8 +1714,7 @@ impl Data {
match &rd.w_f_cnt_last_request {
Some(v) => {
let last_req: DateTime<Utc> =
v.clone().try_into().map_err(anyhow::Error::msg)?;
let last_req: DateTime<Utc> = (*v).try_into().map_err(anyhow::Error::msg)?;
if last_req
< Utc::now()
.checked_sub_signed(chrono::Duration::try_hours(24).unwrap())
@ -2729,7 +2731,7 @@ mod test {
name: "max payload size error".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
@ -2769,7 +2771,7 @@ mod test {
name: "is pending".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
f_cnt_down: Some(10),
@ -2801,7 +2803,7 @@ mod test {
name: "invalid frame-counter".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3],
@ -2842,14 +2844,14 @@ mod test {
name: "valid payload".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3],
..Default::default()
}],
expected_queue_item: Some(device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3],
@ -2875,7 +2877,7 @@ mod test {
let d = device::partial_update(
d.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(ds.clone())),
device_session: Some(Some(ds.clone().into())),
..Default::default()
},
)
@ -3419,11 +3421,14 @@ mod test {
dev_addr: Some(*dev_addr),
application_id: app.id,
device_profile_id: dp_ed.id,
device_session: Some(internal::DeviceSession {
dev_addr: dev_addr.to_vec(),
nwk_s_enc_key: vec![0; 16],
..Default::default()
}),
device_session: Some(
internal::DeviceSession {
dev_addr: dev_addr.to_vec(),
nwk_s_enc_key: vec![0; 16],
..Default::default()
}
.into(),
),
..Default::default()
})
.await
@ -3436,7 +3441,7 @@ mod test {
let d_relay = device::partial_update(
d_relay.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(test.device_session.clone())),
device_session: Some(Some(test.device_session.clone().into())),
..Default::default()
},
)
@ -3885,7 +3890,7 @@ mod test {
let d_relay = device::partial_update(
d_relay.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(test.device_session.clone())),
device_session: Some(Some(test.device_session.clone().into())),
..Default::default()
},
)
@ -4016,7 +4021,7 @@ mod test {
application: application::Application::default(),
device_profile: test.device_profile.clone(),
device: device::Device {
device_session: Some(test.device_session.clone()),
device_session: Some(test.device_session.clone().into()),
..Default::default()
},
network_conf: config::get_region_network("eu868").unwrap(),
@ -4127,7 +4132,7 @@ mod test {
application: application::Application::default(),
device_profile: test.device_profile.clone(),
device: device::Device {
device_session: Some(test.device_session.clone()),
device_session: Some(test.device_session.clone().into()),
..Default::default()
},
network_conf: config::get_region_network("eu868").unwrap(),
@ -4248,7 +4253,7 @@ mod test {
application: application::Application::default(),
device_profile: test.device_profile.clone(),
device: device::Device {
device_session: Some(test.device_session.clone()),
device_session: Some(test.device_session.clone().into()),
..Default::default()
},
network_conf: config::get_region_network("eu868").unwrap(),
@ -4505,7 +4510,7 @@ mod test {
let d_relay = device::partial_update(
d_relay.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(test.device_session.clone())),
device_session: Some(Some(test.device_session.clone().into())),
..Default::default()
},
)

View File

@ -239,7 +239,7 @@ mod tests {
},
// is_private_down is set, first gateway matches tenant.
Test {
tenant_id: Some(t.id),
tenant_id: Some(t.id.into()),
min_snr_margin: 0.0,
rx_info: internal::DeviceGatewayRxInfo {
items: vec![
@ -262,7 +262,7 @@ mod tests {
},
// is_private_down is set, second gateway matches tenant.
Test {
tenant_id: Some(t.id),
tenant_id: Some(t.id.into()),
min_snr_margin: 0.0,
rx_info: internal::DeviceGatewayRxInfo {
items: vec![
@ -319,7 +319,7 @@ mod tests {
for _ in 0..100 {
let out = select_downlink_gateway(
test.tenant_id,
&"eu868",
"eu868",
test.min_snr_margin,
&mut rx_info,
)
@ -328,8 +328,7 @@ mod tests {
}
assert_eq!(test.expected_gws.len(), gw_map.len());
assert_eq!(
true,
assert!(
expected_gws.keys().all(|k| gw_map.contains_key(k)),
"Expected: {:?}, got: {:?}",
expected_gws,

View File

@ -182,7 +182,7 @@ impl JoinAccept<'_> {
trace!("Select downlink gateway");
let gw_down = helpers::select_downlink_gateway(
Some(self.tenant.id),
Some(self.tenant.id.into()),
&self.uplink_frame_set.region_config_id,
self.network_conf.gateway_prefer_min_margin,
self.device_gateway_rx_info.as_mut().unwrap(),

View File

@ -434,7 +434,7 @@ impl TxAck {
..Default::default()
};
integration::log_event(app.id, &dev.variables, &pl).await;
integration::log_event(app.id.into(), &dev.variables, &pl).await;
Ok(())
}
@ -483,7 +483,7 @@ impl TxAck {
tx_info: self.downlink_frame_item.as_ref().unwrap().tx_info.clone(),
};
integration::txack_event(app.id, &dev.variables, &pl).await;
integration::txack_event(app.id.into(), &dev.variables, &pl).await;
Ok(())
}
@ -532,7 +532,7 @@ impl TxAck {
tx_info: self.downlink_frame_item.as_ref().unwrap().tx_info.clone(),
};
integration::txack_event(app.id, &dev.variables, &pl).await;
integration::txack_event(app.id.into(), &dev.variables, &pl).await;
Ok(())
}
@ -649,7 +649,7 @@ impl TxAck {
}
let dfl = stream_pb::DownlinkFrameLog {
time: dfl.time.clone(),
time: dfl.time,
phy_payload: phy.to_vec()?,
tx_info: dfl.tx_info.clone(),
downlink_id: dfl.downlink_id,

View File

@ -23,7 +23,7 @@ use tracing::{error, info, trace};
use super::GatewayBackend;
use crate::config::GatewayBackendMqtt;
use crate::helpers::tls::{get_root_certs, load_cert, load_key};
use crate::helpers::tls22::{get_root_certs, load_cert, load_key};
use crate::monitoring::prometheus;
use crate::{downlink, uplink};
use lrwn::region::CommonName;

View File

@ -1,2 +1,3 @@
pub mod errors;
pub mod tls;
pub mod tls22; // rustls 0.22

View File

@ -0,0 +1,88 @@
use std::fs::File;
use std::io::BufReader;
use anyhow::{Context, Result};
use rumqttc::tokio_rustls::rustls::{
self,
pki_types::{CertificateDer, PrivateKeyDer},
};
use tokio::fs;
// Return root certificates, optionally with the provided ca_file appended.
pub fn get_root_certs(ca_file: Option<String>) -> Result<rustls::RootCertStore> {
let mut roots = rustls::RootCertStore::empty();
for cert in rustls_native_certs::load_native_certs()? {
roots.add(cert)?;
}
if let Some(ca_file) = &ca_file {
let f = File::open(ca_file).context("Open CA certificate")?;
let mut reader = BufReader::new(f);
let certs = rustls_pemfile::certs(&mut reader);
for cert in certs.flatten() {
roots.add(cert)?;
}
}
Ok(roots)
}
pub async fn load_cert(cert_file: &str) -> Result<Vec<CertificateDer<'static>>> {
let cert_s = fs::read_to_string(cert_file)
.await
.context("Read TLS certificate")?;
let mut cert_b = cert_s.as_bytes();
let certs = rustls_pemfile::certs(&mut cert_b);
let mut out = Vec::new();
for cert in certs {
out.push(cert?.into_owned());
}
Ok(out)
}
pub async fn load_key(key_file: &str) -> Result<PrivateKeyDer<'static>> {
let key_s = fs::read_to_string(key_file)
.await
.context("Read private key")?;
let key_s = private_key_to_pkcs8(&key_s)?;
let mut key_b = key_s.as_bytes();
let mut keys = rustls_pemfile::pkcs8_private_keys(&mut key_b);
if let Some(key) = keys.next() {
match key {
Ok(v) => return Ok(PrivateKeyDer::Pkcs8(v.clone_key())),
Err(e) => {
return Err(anyhow!("Error parsing private key, error: {}", e));
}
}
}
Err(anyhow!("No private key found"))
}
pub fn private_key_to_pkcs8(pem: &str) -> Result<String> {
if pem.contains("RSA PRIVATE KEY") {
use rsa::{
pkcs1::DecodeRsaPrivateKey,
pkcs8::{EncodePrivateKey, LineEnding},
RsaPrivateKey,
};
let pkey = RsaPrivateKey::from_pkcs1_pem(pem).context("Read RSA PKCS#1")?;
let pkcs8_pem = pkey.to_pkcs8_pem(LineEnding::default())?;
Ok(pkcs8_pem.as_str().to_owned())
} else if pem.contains("EC PRIVATE KEY") {
use elliptic_curve::{
pkcs8::{EncodePrivateKey, LineEnding},
SecretKey,
};
// We assume it is a P256 based secret-key, which is the most popular curve.
// Attempting to decode it as P256 is still better than just failing to read it.
let pkey: SecretKey<p256::NistP256> =
SecretKey::from_sec1_pem(pem).context("Read EC SEC1")?;
let pkcs8_pem = pkey.to_pkcs8_pem(LineEnding::default())?;
Ok(pkcs8_pem.as_str().to_owned())
} else {
Ok(pem.to_string())
}
}

View File

@ -4,7 +4,7 @@ use std::time::Duration;
use anyhow::{Context, Result};
use async_trait::async_trait;
use base64::{engine::general_purpose, Engine as _};
use gcp_auth::{AuthenticationManager, CustomServiceAccount};
use gcp_auth::{CustomServiceAccount, TokenProvider};
use prost::Message;
use reqwest::header::{HeaderMap, AUTHORIZATION, CONTENT_TYPE};
use reqwest::Client;
@ -20,7 +20,7 @@ pub struct Integration {
json: bool,
project_id: String,
topic_name: String,
auth_manager: gcp_auth::AuthenticationManager,
service_account: gcp_auth::CustomServiceAccount,
timeout: Duration,
}
@ -46,7 +46,6 @@ impl Integration {
pub async fn new(conf: &GcpPubSubConfiguration) -> Result<Integration> {
trace!("Initializing GCP Pub-Sub integration");
let service_account = CustomServiceAccount::from_json(&conf.credentials_file)?;
let auth_manager = AuthenticationManager::try_from(service_account)?;
Ok(Integration {
json: match Encoding::try_from(conf.encoding)
@ -57,7 +56,7 @@ impl Integration {
},
project_id: conf.project_id.clone(),
topic_name: conf.topic_name.clone(),
auth_manager,
service_account,
timeout: Duration::from_secs(5),
})
}
@ -89,8 +88,8 @@ impl Integration {
let pl = serde_json::to_string(&pl)?;
let token = self
.auth_manager
.get_token(&["https://www.googleapis.com/auth/pubsub"])
.service_account
.token(&["https://www.googleapis.com/auth/pubsub"])
.await
.context("Get GCP bearer token")?;

View File

@ -43,7 +43,7 @@ pub async fn get_geoloc_buffer(
None => {
return false;
}
Some(v) => match v.clone().try_into() {
Some(v) => match (*v).try_into() {
Ok(v) => v,
Err(_) => {
return false;

View File

@ -44,11 +44,7 @@ impl Integration {
let di = pl.device_info.as_ref().unwrap();
info!(dev_eui = %di.dev_eui, "Forwarding join notification");
let ts: DateTime<Utc> = pl
.time
.as_ref()
.unwrap()
.clone()
let ts: DateTime<Utc> = (*pl.time.as_ref().unwrap())
.try_into()
.map_err(anyhow::Error::msg)?;
let dev_eui = EUI64::from_str(&di.dev_eui)?;
@ -74,11 +70,7 @@ impl Integration {
let di = pl.device_info.as_ref().unwrap();
info!(dev_eui = %di.dev_eui, "Forwarding updf message");
let ts: DateTime<Utc> = pl
.time
.as_ref()
.unwrap()
.clone()
let ts: DateTime<Utc> = (*pl.time.as_ref().unwrap())
.try_into()
.map_err(anyhow::Error::msg)?;
let dev_eui = EUI64::from_str(&di.dev_eui)?;
@ -150,11 +142,7 @@ impl Integration {
) -> Result<()> {
let di = pl.device_info.as_ref().unwrap();
info!(dev_eui = %di.dev_eui, "Forwarding uplink meta-data");
let ts: DateTime<Utc> = pl
.time
.as_ref()
.unwrap()
.clone()
let ts: DateTime<Utc> = (*pl.time.as_ref().unwrap())
.try_into()
.map_err(anyhow::Error::msg)?;
let dev_eui = EUI64::from_str(&di.dev_eui)?;
@ -242,11 +230,7 @@ impl Integration {
}
let di = pl.device_info.as_ref().unwrap();
let ts: DateTime<Utc> = pl
.time
.as_ref()
.unwrap()
.clone()
let ts: DateTime<Utc> = (*pl.time.as_ref().unwrap())
.try_into()
.map_err(anyhow::Error::msg)?;
let dev_eui = EUI64::from_str(&di.dev_eui)?;

View File

@ -28,6 +28,7 @@ pub mod mock;
mod mqtt;
mod mydevices;
mod pilot_things;
#[cfg(feature = "postgres")]
mod postgresql;
mod redis;
mod thingsboard;
@ -54,6 +55,7 @@ pub async fn setup() -> Result<()> {
.context("Setup MQTT integration")?,
));
}
#[cfg(feature = "postgres")]
"postgresql" => integrations.push(Box::new(
postgresql::Integration::new(&conf.integration.postgresql)
.await
@ -533,7 +535,7 @@ async fn handle_down_command(application_id: String, pl: integration::DownlinkCo
// Validate that the application_id from the topic is indeed the application ID to which
// the device belongs.
let dev = device::get(&dev_eui).await?;
if dev.application_id != app_id {
if Into::<Uuid>::into(dev.application_id) != app_id {
return Err(anyhow!(
"Application ID from topic does not match application ID from device"
));
@ -555,8 +557,8 @@ async fn handle_down_command(application_id: String, pl: integration::DownlinkCo
let qi = device_queue::DeviceQueueItem {
id: match pl.id.is_empty() {
true => Uuid::new_v4(),
false => Uuid::from_str(&pl.id)?,
true => Uuid::new_v4().into(),
false => Uuid::from_str(&pl.id)?.into(),
},
f_port: pl.f_port as i16,
confirmed: pl.confirmed,

View File

@ -19,7 +19,7 @@ use tracing::{error, info, trace, warn};
use super::Integration as IntegrationTrait;
use crate::config::MqttIntegration as Config;
use crate::helpers::tls::{get_root_certs, load_cert, load_key};
use crate::helpers::tls22::{get_root_certs, load_cert, load_key};
use chirpstack_api::integration;
pub struct Integration<'a> {

View File

@ -268,11 +268,7 @@ impl IntegrationTrait for Integration {
let e = EventUp {
deduplication_id: Uuid::from_str(&pl.deduplication_id)?,
time: pl
.time
.as_ref()
.unwrap()
.clone()
time: (*pl.time.as_ref().unwrap())
.try_into()
.map_err(anyhow::Error::msg)?,
tenant_id: Uuid::from_str(&di.tenant_id)?,
@ -314,11 +310,7 @@ impl IntegrationTrait for Integration {
let e = EventJoin {
deduplication_id: Uuid::from_str(&pl.deduplication_id)?,
time: pl
.time
.as_ref()
.unwrap()
.clone()
time: (*pl.time.as_ref().unwrap())
.try_into()
.map_err(anyhow::Error::msg)?,
tenant_id: Uuid::from_str(&di.tenant_id)?,
@ -352,11 +344,7 @@ impl IntegrationTrait for Integration {
let e = EventAck {
queue_item_id: Uuid::from_str(&pl.queue_item_id)?,
deduplication_id: Uuid::from_str(&pl.deduplication_id)?,
time: pl
.time
.as_ref()
.unwrap()
.clone()
time: (*pl.time.as_ref().unwrap())
.try_into()
.map_err(anyhow::Error::msg)?,
tenant_id: Uuid::from_str(&di.tenant_id)?,
@ -391,11 +379,7 @@ impl IntegrationTrait for Integration {
let e = EventTxAck {
queue_item_id: Uuid::from_str(&pl.queue_item_id)?,
downlink_id: pl.downlink_id as i64,
time: pl
.time
.as_ref()
.unwrap()
.clone()
time: (*pl.time.as_ref().unwrap())
.try_into()
.map_err(anyhow::Error::msg)?,
tenant_id: Uuid::from_str(&di.tenant_id)?,
@ -429,11 +413,7 @@ impl IntegrationTrait for Integration {
info!(dev_eui = %di.dev_eui, event = "log", "Inserting event");
let e = EventLog {
time: pl
.time
.as_ref()
.unwrap()
.clone()
time: (*pl.time.as_ref().unwrap())
.try_into()
.map_err(anyhow::Error::msg)?,
tenant_id: Uuid::from_str(&di.tenant_id)?,
@ -469,11 +449,7 @@ impl IntegrationTrait for Integration {
let e = EventStatus {
deduplication_id: Uuid::from_str(&pl.deduplication_id)?,
time: pl
.time
.as_ref()
.unwrap()
.clone()
time: (*pl.time.as_ref().unwrap())
.try_into()
.map_err(anyhow::Error::msg)?,
tenant_id: Uuid::from_str(&di.tenant_id)?,
@ -510,11 +486,7 @@ impl IntegrationTrait for Integration {
let e = EventLocation {
deduplication_id: Uuid::from_str(&pl.deduplication_id)?,
time: pl
.time
.as_ref()
.unwrap()
.clone()
time: (*pl.time.as_ref().unwrap())
.try_into()
.map_err(anyhow::Error::msg)?,
tenant_id: Uuid::from_str(&di.tenant_id)?,
@ -551,11 +523,7 @@ impl IntegrationTrait for Integration {
let e = EventIntegration {
deduplication_id: Uuid::from_str(&pl.deduplication_id)?,
time: pl
.time
.as_ref()
.unwrap()
.clone()
time: (*pl.time.as_ref().unwrap())
.try_into()
.map_err(anyhow::Error::msg)?,
tenant_id: Uuid::from_str(&di.tenant_id)?,

View File

@ -240,10 +240,10 @@ pub mod test {
async fn assert_reply(last_id: &str, event: &str, b: &[u8]) -> String {
let srr: StreamReadReply = redis::cmd("XREAD")
.arg("COUNT")
.arg(1 as usize)
.arg(1_usize)
.arg("STREAMS")
.arg("device:stream:event")
.arg(&last_id)
.arg(last_id)
.query_async(&mut get_async_redis_conn().await.unwrap())
.await
.unwrap();
@ -254,7 +254,7 @@ pub mod test {
let stream_id = &stream_key.ids[0];
let v = stream_id.map.get(event).unwrap();
assert_eq!(&redis::Value::Data(b.to_vec()), v);
assert_eq!(&redis::Value::BulkString(b.to_vec()), v);
stream_id.id.clone()
}

View File

@ -118,7 +118,7 @@ mod test {
for tst in &tests {
let mut dev = device::Device {
device_session: Some(tst.device_session.clone()),
device_session: Some(tst.device_session.clone().into()),
..Default::default()
};
let resp = handle(
@ -128,10 +128,10 @@ mod test {
);
if let Some(e) = &tst.expected_error {
assert_eq!(true, resp.is_err(), "{}", tst.name);
assert!(resp.is_err(), "{}", tst.name);
assert_eq!(e, &format!("{}", resp.err().unwrap()), "{}", tst.name);
} else {
assert_eq!(true, resp.unwrap().is_none());
assert!(resp.unwrap().is_none());
}
assert_eq!(

View File

@ -277,7 +277,7 @@ mod test {
device::partial_update(
dev.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(tst.device_session_ed.clone())),
device_session: Some(Some(tst.device_session_ed.clone().into())),
..Default::default()
},
)
@ -285,7 +285,7 @@ mod test {
.unwrap();
let mut relay_dev = device::Device {
device_session: Some(tst.device_session.clone()),
device_session: Some(tst.device_session.clone().into()),
..Default::default()
};
@ -297,10 +297,10 @@ mod test {
.await;
if let Some(e) = &tst.expected_error {
assert_eq!(true, resp.is_err(), "{}", tst.name);
assert!(resp.is_err(), "{}", tst.name);
assert_eq!(e, &format!("{}", resp.err().unwrap()), "{}", tst.name);
} else {
assert_eq!(true, resp.unwrap().is_none());
assert!(resp.unwrap().is_none());
}
let d = device::get(&dev.dev_eui).await.unwrap();

Some files were not shown because too many files have changed in this diff Show More