Compare commits

...

27 Commits

Author SHA1 Message Date
3829f591e4 Add expires_at to queue-items (unicast & multicast).
This makes it possible to automatically remove items from the queue in
case the expires_at timestamp has reached. This field is optional and
the default remains to never expire queue-items.
2024-09-17 11:54:29 +01:00
2919fb79e5 Fix escaping issue in TOML template.
Closes #486.
2024-09-13 12:59:09 +01:00
61d794b680 Remove OFF, which is not a valid tracing Level. 2024-09-12 14:42:51 +01:00
29b57fb72a Implement importer for lorawan-device-profiles repo. 2024-09-12 14:22:09 +01:00
190d977bf5 lrwn: Update Deserialize trait implementation. 2024-09-12 14:20:59 +01:00
9d04a74a94 Update dependencies. 2024-09-09 13:36:59 +01:00
5f8ddca7b7 api: Re-export pbjson_types and tonic.
Closes #504.
2024-09-09 11:59:51 +01:00
e04515f647 ui: Run make format to format ui code. 2024-08-28 14:44:22 +01:00
d69a0eee75 ui: Fix tooltip date formatting.
With the migration from moment to date-fns (#460) some of the formatting
strings were not updated accordingly.

Fixes #503.
2024-08-28 14:44:07 +01:00
e63296573b Implement support for SQLite database backend. (#418) (#500)
This feature makes it possible to select between PostgreSQL and SQLite as database backend using a compile feature-flag. It is not possible to enable both at the same time.

---------

Co-authored-by: Momo Bel <plopyomomo@gmail.com>
2024-08-26 13:22:35 +01:00
800d7d0efe api: Upgrade io.grpc dependencies in Java API. (#494)
This fixes a compatibility issue with Netty.

---
Co-authored-by: Guillaume Milani <guillaume.milani@sontex.ch>
2024-08-21 14:49:08 +01:00
489a35e0ec Bump version to 4.9.0 2024-08-15 09:06:19 +01:00
1848097e6c Fix missing last_seen_at field update of relay gateway. 2024-08-13 15:54:51 +01:00
76b7ec4881 Add signal handling.
Fixes #480.
2024-08-13 13:04:06 +01:00
3aaa114f46 Update deadpool-redis. 2024-08-08 16:53:07 +01:00
a811ec5c80 Bump version to 4.9.0-test.6 2024-08-07 15:11:09 +01:00
5794f41324 Fix clippy feedback (cargo clippy --fix). 2024-08-07 14:59:25 +01:00
7158c0c610 ui: Update codec template with JSDoc (#473) 2024-08-05 11:17:13 +01:00
eda6646b18 Update dependencies. 2024-08-05 10:15:23 +01:00
40a2b83bf5 Update redis dependency. 2024-08-05 10:06:48 +01:00
4e0106a4e8 Replace warp with axum.
The warp dependency was causing some issues with upgrading dependencies
as it depends on http v0.2, where other dependencies (e.g. tonic) have
already upgraded to http v1+.
2024-08-01 11:33:57 +01:00
98978135c4 ui: Fix formatting template after merging #460. 2024-07-30 11:18:52 +01:00
6a691c62e2 Update dependencies. 2024-07-24 14:12:50 +01:00
66ab41036b Update lapin crate.
This disables the default features (rustls), because lapin enables the
default rustls features, which pulls in the aws-lc-rs dependency besides
ring.

Most likely, the next lapin version will fix this by exposing feature
flags to either enable aws-lc-rs or ring backend for rustls.
2024-07-24 14:05:46 +01:00
dc57e6fe51 Update rustls to 0.23. 2024-07-23 14:03:09 +01:00
ebc4065ca2 Bump version to 4.9.0-test.5 2024-07-23 11:03:39 +01:00
a23797ddbb Fix updating dependencies.
The previous update dependencies commit contained a dependency that
pulled in the aws-lc-rs crate, which fails to build on ARMv7. See also
370b84cb09f0d55c9cc1d993df2474e579e7fa94.

This commit reverts the updates and only updates part of the crates.

A proper fix will be to update all dependencies to rustls 0.23 such that
we can enable the ring feature flag (which is the 0.22 default).
2024-07-23 10:39:58 +01:00
216 changed files with 7250 additions and 4353 deletions

View File

@ -13,6 +13,13 @@ env:
jobs:
tests:
runs-on: ubuntu-latest
strategy:
matrix:
database:
- postgres
- sqlite
env:
DATABASE: ${{ matrix.database }}
steps:
-
name: Checkout
@ -32,7 +39,7 @@ jobs:
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: ${{ runner.os }}-cargo-test-${{ hashFiles('**/Cargo.lock') }}
key: ${{ runner.os }}-cargo-test-${{ matrix.database }}-${{ hashFiles('**/Cargo.lock') }}
-
name: Start dependency services
run: docker compose up -d

4
.gitignore vendored
View File

@ -11,8 +11,12 @@
# Binary packages
/dist
# SQLite databases
*.sqlite
# Rust target directory
**/target
**/target-sqlite
# Certificates
/chirpstack/configuration/certs/*

1368
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -8,7 +8,7 @@ dist:
# Install dev dependencies
dev-dependencies:
cargo install cross --version 0.2.5
cargo install diesel_cli --version 2.2.1 --no-default-features --features postgres
cargo install diesel_cli --version 2.2.1 --no-default-features --features postgres,sqlite
cargo install cargo-deb --version 1.43.1
cargo install cargo-generate-rpm --version 0.12.1

View File

@ -84,7 +84,11 @@ docker compose up -d
Run the following command to run the ChirpStack tests:
```bash
# Test (with PostgresQL database backend)
make test
# Test with SQLite database backend
DATABASE=sqlite make test
```
### Building ChirpStack binaries
@ -109,6 +113,34 @@ make release-amd64
make dist
```
By default the above commands will build ChirpStack with the PostgresQL database
database backend. Set the `DATABASE=sqlite` env. variable to compile ChirpStack
with the SQLite database backend.
### Database migrations
To create a new database migration, execute:
```
make migration-generate NAME=test-migration
```
To apply migrations, execute:
```
make migration-run
```
To revert a migration, execute:
```
make migration-revert
```
By default the above commands will execute the migration commands using the
PostgresQL database backend. To execute migration commands for the SQLite
database backend, set the `DATABASE=sqlite` env. variable.
## License
ChirpStack Network Server is distributed under the MIT license. See also

View File

@ -1,6 +1,6 @@
{
"name": "@chirpstack/chirpstack-api-grpc-web",
"version": "4.9.0-test.4",
"version": "4.9.0",
"description": "Chirpstack gRPC-web API",
"license": "MIT",
"devDependencies": {

View File

@ -8,7 +8,7 @@ plugins {
}
group = "io.chirpstack"
version = "4.9.0-test.4"
version = "4.9.0"
repositories {
mavenCentral()
@ -21,10 +21,10 @@ buildscript {
}
dependencies {
api("io.grpc:grpc-protobuf:1.51.0")
api("io.grpc:grpc-api:1.51.0")
api("io.grpc:grpc-stub:1.51.0")
api("io.grpc:grpc-netty:1.51.0")
api("io.grpc:grpc-protobuf:1.59.1")
api("io.grpc:grpc-api:1.59.1")
api("io.grpc:grpc-stub:1.59.1")
api("io.grpc:grpc-netty:1.59.1")
implementation("javax.annotation:javax.annotation-api:1.3.2")
}

2
api/js/package.json vendored
View File

@ -1,6 +1,6 @@
{
"name": "@chirpstack/chirpstack-api",
"version": "4.9.0-test.4",
"version": "4.9.0",
"description": "Chirpstack JS and TS API",
"license": "MIT",
"devDependencies": {

View File

@ -9,7 +9,7 @@ plugins {
}
group = "io.chirpstack"
version = "4.9.0-test.4"
version = "4.9.0"
repositories {
mavenCentral()

View File

@ -3,7 +3,7 @@
"description": "Chirpstack PHP API",
"license": "MIT",
"type": "library",
"version": "4.9.0-test.4",
"version": "4.9.0",
"require": {
"php": ">=7.0.0",
"grpc/grpc": "^v1.57.0",

View File

@ -539,6 +539,10 @@ message DeviceQueueItem {
// the data payload. In this case, the f_cnt_down field must be set to
// the corresponding frame-counter which has been used during the encryption.
bool is_encrypted = 9;
// Expires at (optional).
// Expired queue-items will be automatically removed from the queue.
google.protobuf.Timestamp expires_at = 10;
}
message EnqueueDeviceQueueItemRequest { DeviceQueueItem queue_item = 1; }
@ -582,4 +586,4 @@ message GetDeviceNextFCntDownRequest {
message GetDeviceNextFCntDownResponse {
// FCntDown.
uint32 f_cnt_down = 1;
}
}

View File

@ -302,6 +302,10 @@ message MulticastGroupQueueItem {
// Payload.
bytes data = 4;
// Expires at (optional).
// Expired queue-items will be automatically removed from the queue.
google.protobuf.Timestamp expires_at = 5;
}
message EnqueueMulticastGroupQueueItemRequest {

View File

@ -60,6 +60,9 @@ enum LogCode {
// Downlink frame-counter.
F_CNT_DOWN = 10;
// Downlink has expired.
EXPIRED = 11;
}
// Device information.

View File

@ -18,7 +18,7 @@ CLASSIFIERS = [
setup(
name='chirpstack-api',
version = "4.9.0-test.4",
version = "4.9.0",
url='https://github.com/brocaar/chirpstack-api',
author='Orne Brocaar',
author_email='info@brocaar.com',

18
api/rust/Cargo.toml vendored
View File

@ -1,7 +1,7 @@
[package]
name = "chirpstack_api"
description = "ChirpStack Protobuf / gRPC API definitions."
version = "4.9.0-test.4"
version = "4.9.0"
authors = ["Orne Brocaar <info@brocaar.com>"]
license = "MIT"
homepage = "https://www.chirpstack.io"
@ -12,27 +12,25 @@
default = ["api", "json"]
api = ["tonic/transport", "tonic-build/transport", "tokio"]
json = ["pbjson", "pbjson-types", "serde"]
diesel = ["dep:diesel"]
internal = []
[dependencies]
prost = "0.12"
prost-types = "0.12"
prost = "0.13"
prost-types = "0.13"
hex = "0.4"
rand = "0.8"
tonic = { version = "0.11", features = [
tonic = { version = "0.12", features = [
"codegen",
"prost",
], default-features = false, optional = true }
tokio = { version = "1.38", features = ["macros"], optional = true }
pbjson = { version = "0.6", optional = true }
pbjson-types = { version = "0.6", optional = true }
pbjson = { version = "0.7", optional = true }
pbjson-types = { version = "0.7", optional = true }
serde = { version = "1.0", optional = true }
diesel = { version = "2.2", features = ["postgres_backend"], optional = true }
[build-dependencies]
tonic-build = { version = "0.11", features = [
tonic-build = { version = "0.12", features = [
"prost",
], default-features = false }
pbjson-build = "0.6"
pbjson-build = "0.7"

View File

@ -539,6 +539,10 @@ message DeviceQueueItem {
// the data payload. In this case, the f_cnt_down field must be set to
// the corresponding frame-counter which has been used during the encryption.
bool is_encrypted = 9;
// Expires at (optional).
// Expired queue-items will be automatically removed from the queue.
google.protobuf.Timestamp expires_at = 10;
}
message EnqueueDeviceQueueItemRequest { DeviceQueueItem queue_item = 1; }
@ -582,4 +586,4 @@ message GetDeviceNextFCntDownRequest {
message GetDeviceNextFCntDownResponse {
// FCntDown.
uint32 f_cnt_down = 1;
}
}

View File

@ -302,6 +302,10 @@ message MulticastGroupQueueItem {
// Payload.
bytes data = 4;
// Expires at (optional).
// Expired queue-items will be automatically removed from the queue.
google.protobuf.Timestamp expires_at = 5;
}
message EnqueueMulticastGroupQueueItemRequest {

View File

@ -60,6 +60,9 @@ enum LogCode {
// Downlink frame-counter.
F_CNT_DOWN = 10;
// Downlink has expired.
EXPIRED = 11;
}
// Device information.

24
api/rust/src/gw.rs vendored
View File

@ -95,7 +95,7 @@ impl UplinkFrame {
})
}
uplink_tx_info_legacy::ModulationInfo::FskModulationInfo(info) => {
modulation::Parameters::Fsk(info.clone())
modulation::Parameters::Fsk(*info)
}
uplink_tx_info_legacy::ModulationInfo::LrFhssModulationInfo(info) => {
modulation::Parameters::LrFhss(LrFhssModulationInfo {
@ -120,9 +120,9 @@ impl UplinkFrame {
self.rx_info = Some(UplinkRxInfo {
gateway_id: hex::encode(&rx_info.gateway_id),
uplink_id: rng.gen::<u32>(),
gw_time: rx_info.time.clone(),
gw_time: rx_info.time,
ns_time: None,
time_since_gps_epoch: rx_info.time_since_gps_epoch.clone(),
time_since_gps_epoch: rx_info.time_since_gps_epoch,
fine_time_since_gps_epoch: None,
rssi: rx_info.rssi,
snr: rx_info.lora_snr as f32,
@ -130,7 +130,7 @@ impl UplinkFrame {
rf_chain: rx_info.rf_chain,
board: rx_info.board,
antenna: rx_info.antenna,
location: rx_info.location.clone(),
location: rx_info.location,
context: rx_info.context.clone(),
metadata: rx_info.metadata.clone(),
crc_status: rx_info.crc_status,
@ -195,23 +195,19 @@ impl DownlinkFrame {
Some(timing::Parameters::Immediately(v)) => {
tx_info_legacy.timing = DownlinkTiming::Immediately.into();
tx_info_legacy.timing_info = Some(
downlink_tx_info_legacy::TimingInfo::ImmediatelyTimingInfo(
v.clone(),
),
downlink_tx_info_legacy::TimingInfo::ImmediatelyTimingInfo(*v),
);
}
Some(timing::Parameters::Delay(v)) => {
tx_info_legacy.timing = DownlinkTiming::Delay.into();
tx_info_legacy.timing_info = Some(
downlink_tx_info_legacy::TimingInfo::DelayTimingInfo(v.clone()),
);
tx_info_legacy.timing_info =
Some(downlink_tx_info_legacy::TimingInfo::DelayTimingInfo(*v));
}
Some(timing::Parameters::GpsEpoch(v)) => {
tx_info_legacy.timing = DownlinkTiming::GpsEpoch.into();
tx_info_legacy.timing_info =
Some(downlink_tx_info_legacy::TimingInfo::GpsEpochTimingInfo(
v.clone(),
));
tx_info_legacy.timing_info = Some(
downlink_tx_info_legacy::TimingInfo::GpsEpochTimingInfo(*v),
);
}
_ => {}
}

View File

@ -32,6 +32,7 @@ impl Into<String> for LogCode {
LogCode::DownlinkGateway => "DOWNLINK_GATEWAY",
LogCode::RelayNewEndDevice => "RELAY_NEW_END_DEVICE",
LogCode::FCntDown => "F_CNT_DOWN",
LogCode::Expired => "EXPIRED",
}
.to_string()
}

View File

@ -2,13 +2,6 @@ include!(concat!(env!("OUT_DIR"), "/internal/internal.rs"));
#[cfg(feature = "json")]
include!(concat!(env!("OUT_DIR"), "/internal/internal.serde.rs"));
#[cfg(feature = "diesel")]
use diesel::{backend::Backend, deserialize, serialize, sql_types::Binary};
#[cfg(feature = "diesel")]
use prost::Message;
#[cfg(feature = "diesel")]
use std::io::Cursor;
impl DeviceSession {
pub fn get_a_f_cnt_down(&self) -> u32 {
if self.mac_version().to_string().starts_with("1.0") {
@ -30,28 +23,3 @@ impl DeviceSession {
}
}
}
#[cfg(feature = "diesel")]
impl<ST, DB> deserialize::FromSql<ST, DB> for DeviceSession
where
DB: Backend,
*const [u8]: deserialize::FromSql<ST, DB>,
{
fn from_sql(value: DB::RawValue<'_>) -> deserialize::Result<Self> {
let bytes = <Vec<u8> as deserialize::FromSql<ST, DB>>::from_sql(value)?;
Ok(DeviceSession::decode(&mut Cursor::new(bytes))?)
}
}
#[cfg(feature = "diesel")]
impl serialize::ToSql<Binary, diesel::pg::Pg> for DeviceSession
where
[u8]: serialize::ToSql<Binary, diesel::pg::Pg>,
{
fn to_sql(&self, out: &mut serialize::Output<'_, '_, diesel::pg::Pg>) -> serialize::Result {
<[u8] as serialize::ToSql<Binary, diesel::pg::Pg>>::to_sql(
&self.encode_to_vec(),
&mut out.reborrow(),
)
}
}

5
api/rust/src/lib.rs vendored
View File

@ -1,4 +1,9 @@
#[cfg(feature = "json")]
pub use pbjson_types;
pub use prost;
#[cfg(feature = "api")]
pub use tonic;
#[cfg(feature = "api")]
pub mod api;
pub mod common;

View File

@ -1,6 +1,6 @@
[package]
name = "backend"
version = "4.9.0-test.4"
version = "4.9.0"
authors = ["Orne Brocaar <info@brocaar.com>"]
edition = "2018"
publish = false

View File

@ -3,14 +3,14 @@
description = "Library for building external ChirpStack integrations"
homepage = "https://www.chirpstack.io/"
license = "MIT"
version = "4.9.0-test.4"
version = "4.9.0"
authors = ["Orne Brocaar <info@brocaar.com>"]
edition = "2021"
repository = "https://github.com/chirpstack/chirpstack"
[dependencies]
chirpstack_api = { path = "../api/rust", version = "4.9.0-test.1" }
redis = { version = "0.25", features = [
redis = { version = "0.26", features = [
"cluster-async",
"tokio-rustls-comp",
] }
@ -23,7 +23,7 @@
], default-features = true }
async-trait = "0.1.79"
serde = { version = "1.0", features = ["derive"] }
tokio = { version = "1.38", features = ["macros", "rt-multi-thread"] }
lazy_static = "1.5"
tokio = { version = "1.36", features = ["macros", "rt-multi-thread"] }
lazy_static = "1.4"
serde_json = "1.0"
toml = "0.8"

View File

@ -215,7 +215,7 @@ impl Integration {
info!(key = %k, "Event received from Redis stream");
match k.as_ref() {
"up" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl = integration_pb::UplinkEvent::decode(
&mut Cursor::new(b),
)?;
@ -223,21 +223,21 @@ impl Integration {
}
}
"join" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl =
integration_pb::JoinEvent::decode(&mut Cursor::new(b))?;
tokio::spawn(join_event(pl));
}
}
"ack" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl =
integration_pb::AckEvent::decode(&mut Cursor::new(b))?;
tokio::spawn(ack_event(pl));
}
}
"txack" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl = integration_pb::TxAckEvent::decode(
&mut Cursor::new(b),
)?;
@ -245,7 +245,7 @@ impl Integration {
}
}
"status" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl = integration_pb::StatusEvent::decode(
&mut Cursor::new(b),
)?;
@ -253,14 +253,14 @@ impl Integration {
}
}
"log" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl =
integration_pb::LogEvent::decode(&mut Cursor::new(b))?;
tokio::spawn(log_event(pl));
}
}
"location" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl = integration_pb::LocationEvent::decode(
&mut Cursor::new(b),
)?;
@ -268,7 +268,7 @@ impl Integration {
}
}
"integration" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let pl = integration_pb::IntegrationEvent::decode(
&mut Cursor::new(b),
)?;

View File

@ -3,7 +3,7 @@
description = "ChirpStack is an open-source LoRaWAN(TM) Network Server"
repository = "https://github.com/chirpstack/chirpstack"
homepage = "https://www.chirpstack.io/"
version = "4.9.0-test.4"
version = "4.9.0"
authors = ["Orne Brocaar <info@brocaar.com>"]
edition = "2021"
publish = false
@ -26,23 +26,19 @@
email_address = "0.2"
diesel = { version = "2.2", features = [
"chrono",
"uuid",
"serde_json",
"numeric",
"64-column-tables",
"postgres_backend",
] }
diesel_migrations = { version = "2.2" }
diesel-async = { version = "0.5", features = [
"deadpool",
"postgres",
"async-connection-wrapper",
] }
tokio-postgres = "0.7"
tokio-postgres-rustls = "0.11"
tokio-postgres = { version = "0.7", optional = true }
tokio-postgres-rustls = { version = "0.12", optional = true }
bigdecimal = "0.4"
redis = { version = "0.25.2", features = ["tls-rustls", "tokio-rustls-comp"] }
deadpool-redis = { version = "0.15", features = ["cluster"] }
redis = { version = "0.26", features = ["tls-rustls", "tokio-rustls-comp"] }
deadpool-redis = { version = "0.16", features = ["cluster"] }
# Logging
tracing = "0.1"
@ -53,11 +49,7 @@
], default-features = true }
# ChirpStack API definitions
chirpstack_api = { path = "../api/rust", features = [
"default",
"internal",
"diesel",
] }
chirpstack_api = { path = "../api/rust", features = ["default", "internal"] }
lrwn = { path = "../lrwn", features = [
"serde",
"diesel",
@ -78,8 +70,8 @@
sha2 = "0.10"
urlencoding = "2.1"
geohash = "0.13"
gcp_auth = "0.11"
lapin = "2.3"
gcp_auth = "0.12"
lapin = { version = "2.5", default-features = false }
tokio-executor-trait = "2.1"
tokio-reactor-trait = "1.1"
rdkafka = { version = "0.36", default-features = false, features = [
@ -88,26 +80,26 @@
] }
# gRPC and Protobuf
tonic = "0.11"
tonic-web = "0.11"
tonic-reflection = "0.11"
tokio = { version = "1.38", features = ["macros", "rt-multi-thread"] }
tonic = "0.12"
tonic-web = "0.12"
tonic-reflection = "0.12"
tokio = { version = "1.40", features = ["macros", "rt-multi-thread"] }
tokio-stream = "0.1"
prost-types = "0.12"
prost = "0.12"
pbjson-types = "0.6"
prost-types = "0.13"
prost = "0.13"
pbjson-types = "0.7"
# gRPC and HTTP multiplexing
warp = { version = "0.3", features = ["tls"], default-features = false }
hyper = "0.14"
tower = "0.4"
axum = "0.7"
axum-server = { version = "0.7.1", features = ["tls-rustls-no-provider"] }
tower = { version = "0.5", features = ["util"] }
futures = "0.3"
futures-util = "0.3"
http = "0.2"
http-body = "0.4"
http = "1.1"
http-body = "1.0"
rust-embed = "8.5"
mime_guess = "2.0"
tower-http = { version = "0.4", features = ["trace", "auth"] }
tower-http = { version = "0.5", features = ["trace", "auth"] }
# Error handling
thiserror = "1.0"
@ -116,9 +108,14 @@
# Authentication
pbkdf2 = { version = "0.12", features = ["simple"] }
rand_core = { version = "0.6", features = ["std"] }
jsonwebtoken = "9.2"
rustls = "0.22"
rustls-native-certs = "0.7"
jsonwebtoken = "9.3"
rustls = { version = "0.23", default-features = false, features = [
"logging",
"std",
"tls12",
"ring",
] }
rustls-native-certs = "0.8"
rustls-pemfile = "2.1"
pem = "3.0"
x509-parser = "0.16"
@ -151,19 +148,39 @@
aes = "0.8"
rand = "0.8"
base64 = "0.22"
async-recursion = "1.0"
async-recursion = "1.1"
regex = "1.10"
petgraph = "0.6"
prometheus-client = "0.22"
pin-project = "1.1"
scoped-futures = { version = "0.1", features = ["std"] }
signal-hook = "0.3"
signal-hook-tokio = { version = "0.3", features = ["futures-v0_3"] }
# Development and testing
[dev-dependencies]
httpmock = "0.7.0"
bytes = "1.6"
bytes = "1.7"
dotenv = "0.15"
[features]
default = ["postgres"]
postgres = [
"tokio-postgres",
"tokio-postgres-rustls",
"diesel/postgres_backend",
"diesel/serde_json",
"diesel/uuid",
"diesel-async/postgres",
"lrwn/postgres",
]
sqlite = [
"diesel/sqlite",
"diesel/returning_clauses_for_sqlite_3_35",
"lrwn/sqlite",
"diesel-async/sync-connection-wrapper",
"diesel-async/sqlite",
]
test-all-integrations = [
"test-integration-amqp",
"test-integration-kafka",

View File

@ -1,20 +1,21 @@
.PHONY: dist
PKG_VERSION := $(shell cargo metadata --no-deps --format-version 1 | jq -r '.packages[0].version')
DATABASE ?= postgres
debug-amd64:
cross build --target x86_64-unknown-linux-musl
cross build --target x86_64-unknown-linux-musl --no-default-features --features="$(DATABASE)"
release-amd64:
cross build --target x86_64-unknown-linux-musl --release
cross build --target x86_64-unknown-linux-musl --release --no-default-features --features="$(DATABASE)"
dist:
# Keep these in this order, as aarch64 is based on Debian Buster (older),
# the others on Bullseye. For some build scripts we want to build against
# least recent LIBC.
cross build --target aarch64-unknown-linux-musl --release
cross build --target x86_64-unknown-linux-musl --release
cross build --target armv7-unknown-linux-musleabihf --release
cross build --target aarch64-unknown-linux-musl --release --no-default-features --features="$(DATABASE)"
cross build --target x86_64-unknown-linux-musl --release --no-default-features --features="$(DATABASE)"
cross build --target armv7-unknown-linux-musleabihf --release --no-default-features --features="$(DATABASE)"
cargo deb --target x86_64-unknown-linux-musl --no-build --no-strip
cargo deb --target armv7-unknown-linux-musleabihf --no-build --no-strip
@ -40,10 +41,38 @@ dist:
test:
cargo fmt --check
cargo clippy --no-deps
TZ=UTC cargo test
cargo clippy --no-deps --no-default-features --features="$(DATABASE)"
TZ=UTC cargo test --no-default-features --features="$(DATABASE)"
test-all:
cargo fmt --check
cargo clippy --no-deps
TZ=UTC cargo test --features test-all-integrations
cargo clippy --no-deps --no-default-features --features="$(DATABASE)"
TZ=UTC cargo test --no-default-features --features="$(DATABASE),test-all-integrations"
migration-generate:
ifeq ($(NAME),)
@echo "You must provide a NAME parameter, e.g. make migration-generate NAME=test-migration"
else
diesel --config-file diesel_$(DATABASE).toml migration --migration-dir migrations_$(DATABASE) generate $(NAME)
endif
migration-run: chirpstack_test.sqlite
ifeq ($(DATABASE),postgres)
diesel --config-file diesel_postgres.toml migration --migration-dir migrations_postgres run
endif
ifeq ($(DATABASE),sqlite)
DATABASE_URL="chirpstack_test.sqlite" diesel --config-file diesel_sqlite.toml migration --migration-dir migrations_sqlite run
sed -i 's/Timestamp/TimestamptzSqlite/g' src/storage/schema_sqlite.rs
endif
migration-revert: chirpstack_test.sqlite
ifeq ($(DATABASE),postgres)
diesel --config-file diesel_postgres.toml migration --migration-dir migrations_postgres revert
endif
ifeq ($(DATABASE),sqlite)
DATABASE_URL="chirpstack_test.sqlite" diesel --config-file diesel_sqlite.toml migration --migration-dir migrations_sqlite revert
sed -i 's/Timestamp/TimestamptzSqlite/g' src/storage/schema_sqlite.rs
endif
chirpstack_test.sqlite:
DATABASE_URL=chirpstack_test.sqlite diesel --config-file diesel_sqlite.toml setup --migration-dir migrations_sqlite

View File

@ -2,4 +2,4 @@
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/storage/schema.rs"
file = "src/storage/schema_postgres.rs"

View File

@ -0,0 +1,5 @@
# For documentation on how to configure this file,
# see diesel.rs/guides/configuring-diesel-cli
[print_schema]
file = "src/storage/schema_sqlite.rs"

View File

@ -0,0 +1,6 @@
alter table device_queue_item
drop column expires_at;
alter table multicast_group_queue_item
drop column expires_at;

View File

@ -0,0 +1,6 @@
alter table multicast_group_queue_item
add column expires_at timestamp with time zone null;
alter table device_queue_item
add column expires_at timestamp with time zone null;

View File

@ -0,0 +1,18 @@
drop table relay_gateway;
drop table multicast_group_gateway;
drop table multicast_group_queue_item;
drop table multicast_group_device;
drop table multicast_group;
drop table device_queue_item;
drop table device_keys;
drop table device;
drop table device_profile;
drop table api_key;
drop table application_integration;
drop table application;
drop table gateway;
drop table tenant_user;
drop table tenant;
drop table "user";
drop table relay_device;
drop table device_profile_template;

View File

@ -0,0 +1,392 @@
-- user
create table "user" (
id text not null primary key,
external_id text null,
created_at datetime not null,
updated_at datetime not null,
is_admin boolean not null,
is_active boolean not null,
email text not null,
email_verified boolean not null,
password_hash varchar(200) not null,
note text not null
);
create unique index idx_user_email on "user"(email);
create unique index idx_user_external_id on "user"(external_id);
insert into "user" (
id,
created_at,
updated_at,
is_admin,
is_active,
email,
email_verified,
password_hash,
note
) values (
'05244f12-6daf-4e1f-8315-c66783a0ab56',
datetime('now'),
datetime('now'),
TRUE,
TRUE,
'admin',
FALSE,
'$pbkdf2-sha512$i=1,l=64$l8zGKtxRESq3PA2kFhHRWA$H3lGMxOt55wjwoc+myeOoABofJY9oDpldJa7fhqdjbh700V6FLPML75UmBOt9J5VFNjAL1AvqCozA1HJM0QVGA',
''
);
-- tenant
create table tenant (
id text not null primary key,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
description text not null,
can_have_gateways boolean not null,
max_device_count integer not null,
max_gateway_count integer not null,
private_gateways_up boolean not null,
private_gateways_down boolean not null default FALSE,
tags text not null default '{}'
);
-- sqlite has advanced text search with https://www.sqlite.org/fts5.html
-- but looks like it is for a full table and not specific per column, to investigate
create index idx_tenant_name_trgm on "tenant"(name);
insert into "tenant" (
id,
created_at,
updated_at,
name,
description,
can_have_gateways,
max_device_count,
max_gateway_count,
private_gateways_up
) values (
'52f14cd4-c6f1-4fbd-8f87-4025e1d49242',
datetime('now'),
datetime('now'),
'ChirpStack',
'',
TRUE,
0,
0,
FALSE
);
-- tenant user
create table tenant_user (
tenant_id text not null references tenant on delete cascade,
user_id text not null references "user" on delete cascade,
created_at datetime not null,
updated_at datetime not null,
is_admin boolean not null,
is_device_admin boolean not null,
is_gateway_admin boolean not null,
primary key (tenant_id, user_id)
);
create index idx_tenant_user_user_id on tenant_user (user_id);
-- gateway
create table gateway (
gateway_id blob not null primary key,
tenant_id text not null references tenant on delete cascade,
created_at datetime not null,
updated_at datetime not null,
last_seen_at datetime,
name varchar(100) not null,
description text not null,
latitude double precision not null,
longitude double precision not null,
altitude real not null,
stats_interval_secs integer not null,
tls_certificate blob,
tags text not null,
properties text not null
);
create index idx_gateway_tenant_id on gateway (tenant_id);
create index idx_gateway_name_trgm on gateway (name);
create index idx_gateway_id_trgm on gateway (hex(gateway_id));
create index idx_gateway_tags on gateway (tags);
-- application
create table application (
id text not null primary key,
tenant_id text not null references tenant on delete cascade,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
description text not null,
mqtt_tls_cert blob,
tags text not null default '{}'
);
create index idx_application_tenant_id on application (tenant_id);
create index idx_application_name_trgm on application (name);
-- application integration
create table application_integration (
application_id text not null references application on delete cascade,
kind varchar(20) not null,
created_at datetime not null,
updated_at datetime not null,
configuration text not null,
primary key (application_id, kind)
);
-- api-key
create table api_key (
id text not null primary key,
created_at datetime not null,
name varchar(100) not null,
is_admin boolean not null,
tenant_id text null references tenant on delete cascade
);
create index idx_api_key_tenant_id on api_key (tenant_id);
-- device-profile
create table device_profile (
id text not null primary key,
tenant_id text not null references tenant on delete cascade,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
region varchar(10) not null,
mac_version varchar(10) not null,
reg_params_revision varchar(20) not null,
adr_algorithm_id varchar(100) not null,
payload_codec_runtime varchar(20) not null,
uplink_interval integer not null,
device_status_req_interval integer not null,
supports_otaa boolean not null,
supports_class_b boolean not null,
supports_class_c boolean not null,
class_b_timeout integer not null,
class_b_ping_slot_nb_k integer not null,
class_b_ping_slot_dr smallint not null,
class_b_ping_slot_freq bigint not null,
class_c_timeout integer not null,
abp_rx1_delay smallint not null,
abp_rx1_dr_offset smallint not null,
abp_rx2_dr smallint not null,
abp_rx2_freq bigint not null,
tags text not null,
payload_codec_script text not null default '',
flush_queue_on_activate boolean not null default FALSE,
description text not null default '',
measurements text not null default '{}',
auto_detect_measurements boolean not null default TRUE,
region_config_id varchar(100) null,
is_relay boolean not null default FALSE,
is_relay_ed boolean not null default FALSE,
relay_ed_relay_only boolean not null default FALSE,
relay_enabled boolean not null default FALSE,
relay_cad_periodicity smallint not null default 0,
relay_default_channel_index smallint not null default 0,
relay_second_channel_freq bigint not null default 0,
relay_second_channel_dr smallint not null default 0,
relay_second_channel_ack_offset smallint not null default 0,
relay_ed_activation_mode smallint not null default 0,
relay_ed_smart_enable_level smallint not null default 0,
relay_ed_back_off smallint not null default 0,
relay_ed_uplink_limit_bucket_size smallint not null default 0,
relay_ed_uplink_limit_reload_rate smallint not null default 0,
relay_join_req_limit_reload_rate smallint not null default 0,
relay_notify_limit_reload_rate smallint not null default 0,
relay_global_uplink_limit_reload_rate smallint not null default 0,
relay_overall_limit_reload_rate smallint not null default 0,
relay_join_req_limit_bucket_size smallint not null default 0,
relay_notify_limit_bucket_size smallint not null default 0,
relay_global_uplink_limit_bucket_size smallint not null default 0,
relay_overall_limit_bucket_size smallint not null default 0,
allow_roaming boolean not null default TRUE,
rx1_delay smallint not null default 0
);
create index idx_device_profile_tenant_id on device_profile (tenant_id);
create index idx_device_profile_name_trgm on device_profile (name);
create index idx_device_profile_tags on device_profile (tags);
-- device
create table device (
dev_eui blob not null primary key,
application_id text not null references application on delete cascade,
device_profile_id text not null references device_profile on delete cascade,
created_at datetime not null,
updated_at datetime not null,
last_seen_at datetime,
scheduler_run_after datetime,
name varchar(100) not null,
description text not null,
external_power_source boolean not null,
battery_level numeric(5, 2),
margin int,
dr smallint,
latitude double precision,
longitude double precision,
altitude real,
dev_addr blob,
enabled_class char(1) not null,
skip_fcnt_check boolean not null,
is_disabled boolean not null,
tags text not null,
variables text not null,
join_eui blob not null default x'00000000',
secondary_dev_addr blob,
device_session blob
);
create index idx_device_application_id on device (application_id);
create index idx_device_device_profile_id on device (device_profile_id);
create index idx_device_name_trgm on device (name);
create index idx_device_dev_eui_trgm on device (hex(dev_eui));
create index idx_device_dev_addr_trgm on device (hex(dev_addr));
create index idx_device_tags on device (tags);
create table device_keys (
dev_eui blob not null primary key references device on delete cascade,
created_at datetime not null,
updated_at datetime not null,
nwk_key blob not null,
app_key blob not null,
dev_nonces text not null,
join_nonce int not null
);
create table device_queue_item (
id text not null primary key,
dev_eui blob references device on delete cascade not null,
created_at datetime not null,
f_port smallint not null,
confirmed boolean not null,
data blob not null,
is_pending boolean not null,
f_cnt_down bigint null,
timeout_after datetime,
is_encrypted boolean default FALSE not null
);
create index idx_device_queue_item_dev_eui on device_queue_item (dev_eui);
create index idx_device_queue_item_created_at on device_queue_item (created_at);
create index idx_device_queue_item_timeout_after on device_queue_item (timeout_after);
-- multicast groups
create table multicast_group (
id text not null primary key,
application_id text not null references application on delete cascade,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
region varchar(10) not null,
mc_addr blob not null,
mc_nwk_s_key blob not null,
mc_app_s_key blob not null,
f_cnt bigint not null,
group_type char(1) not null,
dr smallint not null,
frequency bigint not null,
class_b_ping_slot_nb_k smallint not null,
class_c_scheduling_type varchar(20) not null default 'delay'
);
create index idx_multicast_group_application_id on multicast_group (application_id);
create index idx_multicast_group_name_trgm on multicast_group (name);
create table multicast_group_device (
multicast_group_id text not null references multicast_group on delete cascade,
dev_eui blob not null references device on delete cascade,
created_at datetime not null,
primary key (multicast_group_id, dev_eui)
);
create table multicast_group_queue_item (
id text not null primary key,
created_at datetime not null,
scheduler_run_after datetime not null,
multicast_group_id text not null references multicast_group on delete cascade,
gateway_id blob not null references gateway on delete cascade,
f_cnt bigint not null,
f_port smallint not null,
data blob not null,
emit_at_time_since_gps_epoch bigint
);
create index idx_multicast_group_queue_item_multicast_group_id on multicast_group_queue_item (multicast_group_id);
create index idx_multicast_group_queue_item_scheduler_run_after on multicast_group_queue_item (scheduler_run_after);
create table device_profile_template (
id text not null primary key,
created_at datetime not null,
updated_at datetime not null,
name varchar(100) not null,
description text not null,
vendor varchar(100) not null,
firmware varchar(100) not null,
region varchar(10) not null,
mac_version varchar(10) not null,
reg_params_revision varchar(20) not null,
adr_algorithm_id varchar(100) not null,
payload_codec_runtime varchar(20) not null,
payload_codec_script text not null,
uplink_interval integer not null,
device_status_req_interval integer not null,
flush_queue_on_activate boolean not null,
supports_otaa boolean not null,
supports_class_b boolean not null,
supports_class_c boolean not null,
class_b_timeout integer not null,
class_b_ping_slot_nb_k integer not null,
class_b_ping_slot_dr smallint not null,
class_b_ping_slot_freq bigint not null,
class_c_timeout integer not null,
abp_rx1_delay smallint not null,
abp_rx1_dr_offset smallint not null,
abp_rx2_dr smallint not null,
abp_rx2_freq bigint not null,
tags text not null,
measurements text not null default '{}',
auto_detect_measurements boolean not null default TRUE
);
create table multicast_group_gateway (
multicast_group_id text not null references multicast_group on delete cascade,
gateway_id blob not null references gateway on delete cascade,
created_at datetime not null,
primary key (multicast_group_id, gateway_id)
);
create table relay_device (
relay_dev_eui blob not null references device on delete cascade,
dev_eui blob not null references device on delete cascade,
created_at datetime not null,
primary key (relay_dev_eui, dev_eui)
);
create index idx_tenant_tags on tenant (tags);
create index idx_application_tags on application (tags);
create index idx_device_dev_addr on device (dev_addr);
create index idx_device_secondary_dev_addr on device (secondary_dev_addr);
-- relay gateway
create table relay_gateway (
tenant_id text not null references tenant on delete cascade,
relay_id blob not null,
created_at datetime not null,
updated_at datetime not null,
last_seen_at datetime,
name varchar(100) not null,
description text not null,
stats_interval_secs integer not null,
region_config_id varchar(100) not null,
primary key (tenant_id, relay_id)
);

View File

@ -0,0 +1,6 @@
alter table device_queue_item
drop column expires_at;
alter table multicast_group_queue_item
drop column expires_at;

View File

@ -0,0 +1,5 @@
alter table multicast_group_queue_item
add column expires_at datetime null;
alter table device_queue_item
add column expires_at datetime null;

View File

@ -44,7 +44,7 @@ impl ApplicationService for Application {
.await?;
let a = application::Application {
tenant_id,
tenant_id: tenant_id.into(),
name: req_app.name.clone(),
description: req_app.description.clone(),
tags: fields::KeyValue::new(req_app.tags.clone()),
@ -119,7 +119,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update(application::Application {
id: app_id,
id: app_id.into(),
name: req_app.name.to_string(),
description: req_app.description.to_string(),
tags: fields::KeyValue::new(req_app.tags.clone()),
@ -279,7 +279,7 @@ impl ApplicationService for Application {
.await?;
let i = application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::Http,
configuration: application::IntegrationConfiguration::Http(
application::HttpConfiguration {
@ -367,7 +367,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::Http,
configuration: application::IntegrationConfiguration::Http(
application::HttpConfiguration {
@ -438,7 +438,7 @@ impl ApplicationService for Application {
.await?;
let i = application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::InfluxDb,
configuration: application::IntegrationConfiguration::InfluxDb(
application::InfluxDbConfiguration {
@ -535,7 +535,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::InfluxDb,
configuration: application::IntegrationConfiguration::InfluxDb(
application::InfluxDbConfiguration {
@ -610,7 +610,7 @@ impl ApplicationService for Application {
.await?;
let i = application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::ThingsBoard,
configuration: application::IntegrationConfiguration::ThingsBoard(
application::ThingsBoardConfiguration {
@ -689,7 +689,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::ThingsBoard,
configuration: application::IntegrationConfiguration::ThingsBoard(
application::ThingsBoardConfiguration {
@ -755,7 +755,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::MyDevices,
configuration: application::IntegrationConfiguration::MyDevices(
application::MyDevicesConfiguration {
@ -832,7 +832,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::MyDevices,
configuration: application::IntegrationConfiguration::MyDevices(
application::MyDevicesConfiguration {
@ -907,7 +907,7 @@ impl ApplicationService for Application {
};
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::LoraCloud,
configuration: application::IntegrationConfiguration::LoraCloud(
application::LoraCloudConfiguration {
@ -1032,7 +1032,7 @@ impl ApplicationService for Application {
};
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::LoraCloud,
configuration: application::IntegrationConfiguration::LoraCloud(
application::LoraCloudConfiguration {
@ -1119,7 +1119,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::GcpPubSub,
configuration: application::IntegrationConfiguration::GcpPubSub(
application::GcpPubSubConfiguration {
@ -1202,7 +1202,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::GcpPubSub,
configuration: application::IntegrationConfiguration::GcpPubSub(
application::GcpPubSubConfiguration {
@ -1271,7 +1271,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::AwsSns,
configuration: application::IntegrationConfiguration::AwsSns(
application::AwsSnsConfiguration {
@ -1354,7 +1354,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::AwsSns,
configuration: application::IntegrationConfiguration::AwsSns(
application::AwsSnsConfiguration {
@ -1424,7 +1424,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::AzureServiceBus,
configuration: application::IntegrationConfiguration::AzureServiceBus(
application::AzureServiceBusConfiguration {
@ -1506,7 +1506,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::AzureServiceBus,
configuration: application::IntegrationConfiguration::AzureServiceBus(
application::AzureServiceBusConfiguration {
@ -1574,7 +1574,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::PilotThings,
configuration: application::IntegrationConfiguration::PilotThings(
application::PilotThingsConfiguration {
@ -1653,7 +1653,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::PilotThings,
configuration: application::IntegrationConfiguration::PilotThings(
application::PilotThingsConfiguration {
@ -1730,7 +1730,7 @@ impl ApplicationService for Application {
}
let _ = application::create_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::Ifttt,
configuration: application::IntegrationConfiguration::Ifttt(
application::IftttConfiguration {
@ -1814,7 +1814,7 @@ impl ApplicationService for Application {
.await?;
let _ = application::update_integration(application::Integration {
application_id: app_id,
application_id: app_id.into(),
kind: application::IntegrationKind::Ifttt,
configuration: application::IntegrationConfiguration::Ifttt(
application::IftttConfiguration {
@ -1947,7 +1947,7 @@ pub mod test {
let mut create_req = Request::new(create_req);
create_req
.extensions_mut()
.insert(AuthID::User(u.id.clone()));
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let create_resp = service.create(create_req).await.unwrap();
let create_resp = create_resp.get_ref();
@ -1956,7 +1956,9 @@ pub mod test {
id: create_resp.id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Application {
@ -1978,7 +1980,9 @@ pub mod test {
}),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id.clone()));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update(up_req).await.unwrap();
//get
@ -1986,7 +1990,9 @@ pub mod test {
id: create_resp.id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Application {
@ -2006,7 +2012,9 @@ pub mod test {
offset: 0,
};
let mut list_req = Request::new(list_req);
list_req.extensions_mut().insert(AuthID::User(u.id.clone()));
list_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let list_resp = service.list(list_req).await.unwrap();
assert_eq!(1, list_resp.get_ref().total_count);
assert_eq!(1, list_resp.get_ref().result.len());
@ -2016,14 +2024,18 @@ pub mod test {
id: create_resp.id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.delete(del_req).await.unwrap();
let del_req = api::DeleteApplicationRequest {
id: create_resp.id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
}
@ -2038,7 +2050,7 @@ pub mod test {
.unwrap();
application::create(application::Application {
tenant_id: t.id.clone(),
tenant_id: t.id,
name: "test-app".into(),
..Default::default()
})
@ -2059,7 +2071,7 @@ pub mod test {
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
let mut req = Request::new(req);
req.extensions_mut().insert(AuthID::User(user_id.clone()));
req.extensions_mut().insert(AuthID::User(*user_id));
req
}

View File

@ -95,14 +95,11 @@ pub mod test {
assert_eq!(claim, decoded);
// different key
assert_eq!(
true,
AuthClaim::decode(&token, other_secret.as_ref()).is_err()
);
assert!(AuthClaim::decode(&token, other_secret.as_ref()).is_err());
// expired
claim.exp = Some(exp.timestamp() as usize);
let token = claim.encode(secrect.as_ref()).unwrap();
assert_eq!(true, AuthClaim::decode(&token, secrect.as_ref()).is_err());
assert!(AuthClaim::decode(&token, secrect.as_ref()).is_err());
}
}

View File

@ -6,7 +6,7 @@ pub mod claims;
pub mod error;
pub mod validator;
#[derive(PartialEq, Eq, Debug)]
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum AuthID {
None,
User(Uuid),

File diff suppressed because it is too large Load Diff

View File

@ -5,18 +5,28 @@ use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use axum::{
body::Bytes,
response::{IntoResponse, Json, Response},
Router,
};
use chrono::Utc;
use http::StatusCode;
use redis::streams::StreamReadReply;
use rustls::{
server::{NoClientAuth, WebPkiClientVerifier},
ServerConfig,
};
use serde::Serialize;
use tokio::sync::oneshot;
use tokio::task;
use tracing::{error, info, span, warn, Instrument, Level};
use uuid::Uuid;
use warp::{http::StatusCode, Filter, Reply};
use crate::backend::{joinserver, keywrap, roaming};
use crate::downlink::data_fns;
use crate::helpers::errors::PrintFullError;
use crate::helpers::tls::{get_root_certs, load_cert, load_key};
use crate::storage::{
device, error::Error as StorageError, get_async_redis_conn, passive_roaming, redis_key,
};
@ -39,47 +49,47 @@ pub async fn setup() -> Result<()> {
let addr: SocketAddr = conf.backend_interfaces.bind.parse()?;
info!(bind = %conf.backend_interfaces.bind, "Setting up backend interfaces API");
let routes = warp::post()
.and(warp::body::content_length_limit(1024 * 16))
.and(warp::body::aggregate())
.then(handle_request);
let app = Router::new().fallback(handle_request);
if !conf.backend_interfaces.ca_cert.is_empty()
|| !conf.backend_interfaces.tls_cert.is_empty()
|| !conf.backend_interfaces.tls_key.is_empty()
{
let mut w = warp::serve(routes).tls();
if !conf.backend_interfaces.ca_cert.is_empty() {
w = w.client_auth_required_path(&conf.backend_interfaces.ca_cert);
}
if !conf.backend_interfaces.tls_cert.is_empty() {
w = w.cert_path(&conf.backend_interfaces.tls_cert);
}
if !conf.backend_interfaces.tls_key.is_empty() {
w = w.key_path(&conf.backend_interfaces.tls_key);
}
w.run(addr).await;
let mut server_config = ServerConfig::builder()
.with_client_cert_verifier(if conf.backend_interfaces.ca_cert.is_empty() {
Arc::new(NoClientAuth)
} else {
let root_certs = get_root_certs(Some(conf.backend_interfaces.ca_cert.clone()))?;
WebPkiClientVerifier::builder(root_certs.into()).build()?
})
.with_single_cert(
load_cert(&conf.backend_interfaces.tls_cert).await?,
load_key(&conf.backend_interfaces.tls_key).await?,
)?;
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
axum_server::bind_rustls(
addr,
axum_server::tls_rustls::RustlsConfig::from_config(Arc::new(server_config)),
)
.serve(app.into_make_service())
.await?;
} else {
warp::serve(routes).run(addr).await;
axum_server::bind(addr)
.serve(app.into_make_service())
.await?;
}
Ok(())
}
pub async fn handle_request(mut body: impl warp::Buf) -> http::Response<hyper::Body> {
let mut b: Vec<u8> = vec![];
while body.has_remaining() {
b.extend_from_slice(body.chunk());
let cnt = body.chunk().len();
body.advance(cnt);
}
pub async fn handle_request(b: Bytes) -> Response {
let b: Vec<u8> = b.into();
let bp: BasePayload = match serde_json::from_slice(&b) {
Ok(v) => v,
Err(e) => {
return warp::reply::with_status(e.to_string(), StatusCode::BAD_REQUEST)
.into_response();
return (StatusCode::BAD_REQUEST, e.to_string()).into_response();
}
};
@ -87,7 +97,7 @@ pub async fn handle_request(mut body: impl warp::Buf) -> http::Response<hyper::B
_handle_request(bp, b).instrument(span).await
}
pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hyper::Body> {
pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> Response {
info!("Request received");
let sender_client = {
@ -100,7 +110,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
let msg = format!("Error decoding SenderID: {}", e);
let pl = bp.to_base_payload_result(backend::ResultCode::MalformedRequest, &msg);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
return Json(&pl).into_response();
}
};
@ -111,7 +121,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
let msg = format!("Unknown SenderID: {}", sender_id);
let pl = bp.to_base_payload_result(backend::ResultCode::UnknownSender, &msg);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
return Json(&pl).into_response();
}
}
} else if bp.sender_id.len() == 3 {
@ -123,7 +133,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
let msg = format!("Error decoding SenderID: {}", e);
let pl = bp.to_base_payload_result(backend::ResultCode::MalformedRequest, &msg);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
return Json(&pl).into_response();
}
};
@ -134,7 +144,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
let msg = format!("Unknown SenderID: {}", sender_id);
let pl = bp.to_base_payload_result(backend::ResultCode::UnknownSender, &msg);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
return Json(&pl).into_response();
}
}
} else {
@ -145,7 +155,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
"Invalid SenderID length",
);
log_request_response(&bp, &b, &pl).await;
return warp::reply::json(&pl).into_response();
return Json(&pl).into_response();
}
};
@ -156,7 +166,7 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
error!(error = %e.full(), "Handle async answer error");
}
});
return warp::reply::with_status("", StatusCode::OK).into_response();
return (StatusCode::OK, "").into_response();
}
match bp.message_type {
@ -165,11 +175,11 @@ pub async fn _handle_request(bp: BasePayload, b: Vec<u8>) -> http::Response<hype
MessageType::XmitDataReq => handle_xmit_data_req(sender_client, bp, &b).await,
MessageType::HomeNSReq => handle_home_ns_req(sender_client, bp, &b).await,
// Unknown message
_ => warp::reply::with_status(
"Handler for {:?} is not implemented",
_ => (
StatusCode::INTERNAL_SERVER_ERROR,
format!("Handler for {:?} is not implemented", bp.message_type),
)
.into_response(),
.into_response(),
}
}
@ -201,7 +211,7 @@ async fn handle_pr_start_req(
sender_client: Arc<backend::Client>,
bp: backend::BasePayload,
b: &[u8],
) -> http::Response<hyper::Body> {
) -> Response {
if sender_client.is_async() {
let b = b.to_vec();
task::spawn(async move {
@ -222,18 +232,17 @@ async fn handle_pr_start_req(
error!(error = %e.full(), transaction_id = bp.transaction_id, "Send async PRStartAns error");
}
});
warp::reply::with_status("", StatusCode::OK).into_response()
(StatusCode::OK, "").into_response()
} else {
match _handle_pr_start_req(b).await {
Ok(ans) => {
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
Err(e) => {
let ans = err_to_response(e, &bp);
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
}
}
@ -363,7 +372,7 @@ async fn handle_pr_stop_req(
sender_client: Arc<backend::Client>,
bp: backend::BasePayload,
b: &[u8],
) -> http::Response<hyper::Body> {
) -> Response {
if sender_client.is_async() {
let b = b.to_vec();
task::spawn(async move {
@ -383,18 +392,17 @@ async fn handle_pr_stop_req(
error!(error = %e.full(), "Send async PRStopAns error");
}
});
warp::reply::with_status("", StatusCode::OK).into_response()
(StatusCode::OK, "").into_response()
} else {
match _handle_pr_stop_req(b).await {
Ok(ans) => {
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
Err(e) => {
let ans = err_to_response(e, &bp);
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
}
}
@ -430,13 +438,13 @@ async fn handle_xmit_data_req(
sender_client: Arc<backend::Client>,
bp: backend::BasePayload,
b: &[u8],
) -> http::Response<hyper::Body> {
) -> Response {
let pl: backend::XmitDataReqPayload = match serde_json::from_slice(b) {
Ok(v) => v,
Err(e) => {
let ans = err_to_response(anyhow::Error::new(e), &bp);
log_request_response(&bp, b, &ans).await;
return warp::reply::json(&ans).into_response();
return Json(&ans).into_response();
}
};
@ -465,18 +473,17 @@ async fn handle_xmit_data_req(
error!(error = %e.full(), "Send async XmitDataAns error");
}
});
warp::reply::with_status("", StatusCode::OK).into_response()
(StatusCode::OK, "").into_response()
} else {
match _handle_xmit_data_req(pl).await {
Ok(ans) => {
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
Err(e) => {
let ans = err_to_response(e, &bp);
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
}
}
@ -529,13 +536,13 @@ async fn handle_home_ns_req(
sender_client: Arc<backend::Client>,
bp: backend::BasePayload,
b: &[u8],
) -> http::Response<hyper::Body> {
) -> Response {
let pl: backend::HomeNSReqPayload = match serde_json::from_slice(b) {
Ok(v) => v,
Err(e) => {
let ans = err_to_response(anyhow::Error::new(e), &bp);
log_request_response(&bp, b, &ans).await;
return warp::reply::json(&ans).into_response();
return Json(&ans).into_response();
}
};
@ -560,17 +567,17 @@ async fn handle_home_ns_req(
}
});
warp::reply::with_status("", StatusCode::OK).into_response()
(StatusCode::OK, "").into_response()
} else {
match _handle_home_ns_req(pl).await {
Ok(ans) => {
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
Err(e) => {
let ans = err_to_response(e, &bp);
log_request_response(&bp, b, &ans).await;
warp::reply::json(&ans).into_response()
Json(&ans).into_response()
}
}
}
@ -587,7 +594,7 @@ async fn _handle_home_ns_req(pl: backend::HomeNSReqPayload) -> Result<backend::H
})
}
async fn handle_async_ans(bp: &BasePayload, b: &[u8]) -> Result<http::Response<hyper::Body>> {
async fn handle_async_ans(bp: &BasePayload, b: &[u8]) -> Result<Response> {
let transaction_id = bp.transaction_id;
let key = redis_key(format!("backend:async:{}", transaction_id));
@ -609,7 +616,7 @@ async fn handle_async_ans(bp: &BasePayload, b: &[u8]) -> Result<http::Response<h
.query_async(&mut get_async_redis_conn().await?)
.await?;
Ok(warp::reply().into_response())
Ok((StatusCode::OK, "").into_response())
}
pub async fn get_async_receiver(
@ -651,7 +658,7 @@ pub async fn get_async_receiver(
for (k, v) in &stream_id.map {
match k.as_ref() {
"pl" => {
if let redis::Value::Data(b) = v {
if let redis::Value::BulkString(b) = v {
let _ = tx.send(b.to_vec());
return;
}

View File

@ -64,8 +64,8 @@ impl DeviceService for Device {
let d = device::Device {
dev_eui,
application_id: app_id,
device_profile_id: dp_id,
application_id: app_id.into(),
device_profile_id: dp_id.into(),
name: req_d.name.clone(),
description: req_d.description.clone(),
skip_fcnt_check: req_d.skip_fcnt_check,
@ -191,8 +191,8 @@ impl DeviceService for Device {
// update
let _ = device::update(device::Device {
dev_eui,
application_id: app_id,
device_profile_id: dp_id,
application_id: app_id.into(),
device_profile_id: dp_id.into(),
name: req_d.name.clone(),
description: req_d.description.clone(),
skip_fcnt_check: req_d.skip_fcnt_check,
@ -533,7 +533,7 @@ impl DeviceService for Device {
dp.reset_session_to_boot_params(&mut ds);
let mut device_changeset = device::DeviceChangeset {
device_session: Some(Some(ds)),
device_session: Some(Some(ds.into())),
dev_addr: Some(Some(dev_addr)),
secondary_dev_addr: Some(None),
..Default::default()
@ -695,20 +695,18 @@ impl DeviceService for Device {
.await?;
let start = SystemTime::try_from(
req.start
*req.start
.as_ref()
.ok_or_else(|| anyhow!("start is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
let end = SystemTime::try_from(
req.end
*req.end
.as_ref()
.ok_or_else(|| anyhow!("end is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
@ -819,20 +817,18 @@ impl DeviceService for Device {
.await?;
let start = SystemTime::try_from(
req.start
*req.start
.as_ref()
.ok_or_else(|| anyhow!("start is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
let end = SystemTime::try_from(
req.end
*req.end
.as_ref()
.ok_or_else(|| anyhow!("end is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
@ -1089,7 +1085,7 @@ impl DeviceService for Device {
}
let qi = device_queue::DeviceQueueItem {
id: Uuid::new_v4(),
id: Uuid::new_v4().into(),
dev_eui,
f_port: req_qi.f_port as i16,
confirmed: req_qi.confirmed,
@ -1099,6 +1095,14 @@ impl DeviceService for Device {
} else {
None
},
expires_at: if let Some(expires_at) = req_qi.expires_at {
let expires_at: std::time::SystemTime = expires_at
.try_into()
.map_err(|e: prost_types::TimestampError| e.status())?;
Some(expires_at.into())
} else {
None
},
data,
..Default::default()
};
@ -1173,6 +1177,10 @@ impl DeviceService for Device {
is_pending: qi.is_pending,
f_cnt_down: qi.f_cnt_down.unwrap_or(0) as u32,
is_encrypted: qi.is_encrypted,
expires_at: qi.expires_at.map(|v| {
let v: std::time::SystemTime = v.into();
v.into()
}),
})
.collect(),
});
@ -1256,7 +1264,7 @@ pub mod test {
// create application
let app = application::create(application::Application {
name: "test-app".into(),
tenant_id: t.id.clone(),
tenant_id: t.id,
..Default::default()
})
.await
@ -1265,7 +1273,7 @@ pub mod test {
// create device-profile
let dp = device_profile::create(device_profile::DeviceProfile {
name: "test-dp".into(),
tenant_id: t.id.clone(),
tenant_id: t.id,
..Default::default()
})
.await
@ -1421,12 +1429,10 @@ pub mod test {
);
// flush dev nonces
let _ = device_keys::set_dev_nonces(
&EUI64::from_str("0102030405060708").unwrap(),
&vec![1, 2, 3],
)
.await
.unwrap();
let _ =
device_keys::set_dev_nonces(&EUI64::from_str("0102030405060708").unwrap(), &[1, 2, 3])
.await
.unwrap();
let flush_dev_nonces_req = get_request(
&u.id,
api::FlushDevNoncesRequest {
@ -1545,11 +1551,14 @@ pub mod test {
dev.dev_eui,
&device::DeviceChangeset {
dev_addr: Some(Some(DevAddr::from_be_bytes([1, 2, 3, 4]))),
device_session: Some(Some(internal::DeviceSession {
dev_addr: vec![1, 2, 3, 4],
js_session_key_id: vec![8, 7, 6, 5, 4, 3, 2, 1],
..Default::default()
})),
device_session: Some(Some(
internal::DeviceSession {
dev_addr: vec![1, 2, 3, 4],
js_session_key_id: vec![8, 7, 6, 5, 4, 3, 2, 1],
..Default::default()
}
.into(),
)),
..Default::default()
},
)
@ -1574,14 +1583,17 @@ pub mod test {
device::partial_update(
dev.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(internal::DeviceSession {
dev_addr: vec![1, 2, 3, 4],
app_s_key: Some(common::KeyEnvelope {
kek_label: "test-key".into(),
aes_key: vec![8, 7, 6, 5, 4, 3, 2, 1, 8, 7, 6, 5, 4, 3, 2, 1],
}),
..Default::default()
})),
device_session: Some(Some(
internal::DeviceSession {
dev_addr: vec![1, 2, 3, 4],
app_s_key: Some(common::KeyEnvelope {
kek_label: "test-key".into(),
aes_key: vec![8, 7, 6, 5, 4, 3, 2, 1, 8, 7, 6, 5, 4, 3, 2, 1],
}),
..Default::default()
}
.into(),
)),
..Default::default()
},
)
@ -1618,7 +1630,7 @@ pub mod test {
.await
.unwrap();
let dev_addr = DevAddr::from_str(&get_random_dev_addr_resp.get_ref().dev_addr).unwrap();
let mut dev_addr_copy = dev_addr.clone();
let mut dev_addr_copy = dev_addr;
dev_addr_copy.set_dev_addr_prefix(NetID::from_str("000000").unwrap().dev_addr_prefix());
assert_eq!(dev_addr, dev_addr_copy);
@ -1666,10 +1678,10 @@ pub mod test {
assert_eq!(2, get_queue_resp.total_count);
assert_eq!(2, get_queue_resp.result.len());
assert_eq!(vec![3, 2, 1], get_queue_resp.result[0].data);
assert_eq!(false, get_queue_resp.result[0].is_encrypted);
assert!(!get_queue_resp.result[0].is_encrypted);
assert_eq!(0, get_queue_resp.result[0].f_cnt_down);
assert_eq!(vec![1, 2, 3], get_queue_resp.result[1].data);
assert_eq!(true, get_queue_resp.result[1].is_encrypted);
assert!(get_queue_resp.result[1].is_encrypted);
assert_eq!(10, get_queue_resp.result[1].f_cnt_down);
// get next FCntDown (from queue)
@ -1726,7 +1738,7 @@ pub mod test {
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
let mut req = Request::new(req);
req.extensions_mut().insert(AuthID::User(user_id.clone()));
req.extensions_mut().insert(AuthID::User(*user_id));
req
}
}

View File

@ -45,7 +45,7 @@ impl DeviceProfileService for DeviceProfile {
.await?;
let mut dp = device_profile::DeviceProfile {
tenant_id,
tenant_id: tenant_id.into(),
name: req_dp.name.clone(),
description: req_dp.description.clone(),
region: req_dp.region().from_proto(),
@ -247,7 +247,7 @@ impl DeviceProfileService for DeviceProfile {
// update
let _ = device_profile::update(device_profile::DeviceProfile {
id: dp_id,
id: dp_id.into(),
name: req_dp.name.clone(),
description: req_dp.description.clone(),
region: req_dp.region().from_proto(),
@ -600,7 +600,7 @@ pub mod test {
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
let mut req = Request::new(req);
req.extensions_mut().insert(AuthID::User(user_id.clone()));
req.extensions_mut().insert(AuthID::User(*user_id));
req
}
}

View File

@ -444,7 +444,7 @@ pub mod test {
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
let mut req = Request::new(req);
req.extensions_mut().insert(AuthID::User(user_id.clone()));
req.extensions_mut().insert(AuthID::User(*user_id));
req
}
}

View File

@ -58,7 +58,7 @@ impl GatewayService for Gateway {
let gw = gateway::Gateway {
gateway_id: EUI64::from_str(&req_gw.gateway_id).map_err(|e| e.status())?,
tenant_id,
tenant_id: tenant_id.into(),
name: req_gw.name.clone(),
description: req_gw.description.clone(),
latitude: lat,
@ -345,20 +345,18 @@ impl GatewayService for Gateway {
.await?;
let start = SystemTime::try_from(
req.start
*req.start
.as_ref()
.ok_or_else(|| anyhow!("start is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
let end = SystemTime::try_from(
req.end
*req.end
.as_ref()
.ok_or_else(|| anyhow!("end is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
@ -634,20 +632,18 @@ impl GatewayService for Gateway {
.await?;
let start = SystemTime::try_from(
req.start
*req.start
.as_ref()
.ok_or_else(|| anyhow!("start is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
let end = SystemTime::try_from(
req.end
*req.end
.as_ref()
.ok_or_else(|| anyhow!("end is None"))
.map_err(|e| e.status())?
.clone(),
.map_err(|e| e.status())?,
)
.map_err(|e| e.status())?;
@ -855,8 +851,8 @@ impl GatewayService for Gateway {
.await?;
let _ = gateway::update_relay_gateway(gateway::RelayGateway {
tenant_id,
relay_id,
tenant_id: tenant_id.into(),
name: req_relay.name.clone(),
description: req_relay.description.clone(),
stats_interval_secs: req_relay.stats_interval as i32,
@ -1034,7 +1030,7 @@ pub mod test {
let mut create_req = Request::new(create_req);
create_req
.extensions_mut()
.insert(AuthID::User(u.id.clone()));
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.create(create_req).await.unwrap();
// get
@ -1042,7 +1038,9 @@ pub mod test {
gateway_id: "0102030405060708".into(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Gateway {
@ -1076,7 +1074,9 @@ pub mod test {
}),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id.clone()));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update(up_req).await.unwrap();
// get
@ -1084,7 +1084,9 @@ pub mod test {
gateway_id: "0102030405060708".into(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Gateway {
@ -1111,7 +1113,9 @@ pub mod test {
..Default::default()
};
let mut list_req = Request::new(list_req);
list_req.extensions_mut().insert(AuthID::User(u.id.clone()));
list_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let list_resp = service.list(list_req).await.unwrap();
assert_eq!(1, list_resp.get_ref().total_count);
assert_eq!(1, list_resp.get_ref().result.len());
@ -1121,14 +1125,18 @@ pub mod test {
gateway_id: "0102030405060708".into(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.delete(del_req).await.unwrap();
let del_req = api::DeleteGatewayRequest {
gateway_id: "0102030405060708".into(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
}
@ -1160,7 +1168,7 @@ pub mod test {
// create gateway
let _ = gateway::create(gateway::Gateway {
gateway_id: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
tenant_id: t.id.clone(),
tenant_id: t.id,
name: "test-gw".into(),
..Default::default()
})
@ -1172,7 +1180,7 @@ pub mod test {
// insert stats
let mut m = metrics::Record {
kind: metrics::Kind::ABSOLUTE,
time: now.into(),
time: now,
metrics: HashMap::new(),
};
@ -1206,7 +1214,7 @@ pub mod test {
let mut stats_req = Request::new(stats_req);
stats_req
.extensions_mut()
.insert(AuthID::User(u.id.clone()));
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let stats_resp = service.get_metrics(stats_req).await.unwrap();
let stats_resp = stats_resp.get_ref();
assert_eq!(
@ -1257,7 +1265,7 @@ pub mod test {
// create gateway
let _ = gateway::create(gateway::Gateway {
gateway_id: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
tenant_id: t.id.clone(),
tenant_id: t.id,
name: "test-gw".into(),
..Default::default()
})
@ -1269,7 +1277,7 @@ pub mod test {
// insert stats
let mut m = metrics::Record {
kind: metrics::Kind::COUNTER,
time: now.into(),
time: now,
metrics: HashMap::new(),
};
@ -1297,9 +1305,7 @@ pub mod test {
end: Some(now_st.into()),
};
let mut stats_req = Request::new(stats_req);
stats_req
.extensions_mut()
.insert(AuthID::User(u.id.clone()));
stats_req.extensions_mut().insert(AuthID::User(u.id.into()));
let stats_resp = service.get_duty_cycle_metrics(stats_req).await.unwrap();
let stats_resp = stats_resp.get_ref();
assert_eq!(
@ -1373,7 +1379,9 @@ pub mod test {
relay_id: "01020304".into(),
};
let mut get_relay_req = Request::new(get_relay_req);
get_relay_req.extensions_mut().insert(AuthID::User(u.id));
get_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let get_relay_resp = service.get_relay_gateway(get_relay_req).await.unwrap();
assert_eq!(
Some(api::RelayGateway {
@ -1399,7 +1407,9 @@ pub mod test {
}),
};
let mut up_relay_req = Request::new(up_relay_req);
up_relay_req.extensions_mut().insert(AuthID::User(u.id));
up_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let _ = service.update_relay_gateway(up_relay_req).await.unwrap();
// get relay gateway
@ -1408,7 +1418,9 @@ pub mod test {
relay_id: "01020304".into(),
};
let mut get_relay_req = Request::new(get_relay_req);
get_relay_req.extensions_mut().insert(AuthID::User(u.id));
get_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let get_relay_resp = service.get_relay_gateway(get_relay_req).await.unwrap();
assert_eq!(
Some(api::RelayGateway {
@ -1429,7 +1441,9 @@ pub mod test {
offset: 0,
};
let mut list_relay_req = Request::new(list_relay_req);
list_relay_req.extensions_mut().insert(AuthID::User(u.id));
list_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let list_relay_resp = service.list_relay_gateways(list_relay_req).await.unwrap();
assert_eq!(1, list_relay_resp.get_ref().total_count);
assert_eq!(1, list_relay_resp.get_ref().result.len());
@ -1440,7 +1454,9 @@ pub mod test {
relay_id: "01020304".into(),
};
let mut del_relay_req = Request::new(del_relay_req);
del_relay_req.extensions_mut().insert(AuthID::User(u.id));
del_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let del_relay_resp = service.delete_relay_gateway(del_relay_req).await;
assert!(del_relay_resp.is_ok());
@ -1449,7 +1465,9 @@ pub mod test {
relay_id: "01020304".into(),
};
let mut del_relay_req = Request::new(del_relay_req);
del_relay_req.extensions_mut().insert(AuthID::User(u.id));
del_relay_req
.extensions_mut()
.insert(AuthID::User(u.id.into()));
let del_relay_resp = service.delete_relay_gateway(del_relay_req).await;
assert!(del_relay_resp.is_err());
}

View File

@ -0,0 +1,141 @@
use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};
use futures::ready;
use http::{header::CONTENT_TYPE, Request, Response};
use http_body::Body;
use pin_project::pin_project;
use tower::{Layer, Service};
type BoxError = Box<dyn std::error::Error + Send + Sync>;
#[pin_project(project = GrpcMultiplexFutureEnumProj)]
enum GrpcMultiplexFutureEnum<FS, FO> {
Grpc {
#[pin]
future: FS,
},
Other {
#[pin]
future: FO,
},
}
#[pin_project]
pub struct GrpcMultiplexFuture<FS, FO> {
#[pin]
future: GrpcMultiplexFutureEnum<FS, FO>,
}
impl<ResBody, FS, FO, ES, EO> Future for GrpcMultiplexFuture<FS, FO>
where
ResBody: Body,
FS: Future<Output = Result<Response<ResBody>, ES>>,
FO: Future<Output = Result<Response<ResBody>, EO>>,
ES: Into<BoxError> + Send,
EO: Into<BoxError> + Send,
{
type Output = Result<Response<ResBody>, Box<dyn std::error::Error + Send + Sync + 'static>>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.future.project() {
GrpcMultiplexFutureEnumProj::Grpc { future } => future.poll(cx).map_err(Into::into),
GrpcMultiplexFutureEnumProj::Other { future } => future.poll(cx).map_err(Into::into),
}
}
}
#[derive(Debug, Clone)]
pub struct GrpcMultiplexService<S, O> {
grpc: S,
other: O,
grpc_ready: bool,
other_ready: bool,
}
impl<ReqBody, ResBody, S, O> Service<Request<ReqBody>> for GrpcMultiplexService<S, O>
where
ResBody: Body,
S: Service<Request<ReqBody>, Response = Response<ResBody>>,
O: Service<Request<ReqBody>, Response = Response<ResBody>>,
S::Error: Into<BoxError> + Send,
O::Error: Into<BoxError> + Send,
{
type Response = S::Response;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
type Future = GrpcMultiplexFuture<S::Future, O::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
loop {
match (self.grpc_ready, self.other_ready) {
(true, true) => {
return Ok(()).into();
}
(false, _) => {
ready!(self.grpc.poll_ready(cx)).map_err(Into::into)?;
self.grpc_ready = true;
}
(_, false) => {
ready!(self.other.poll_ready(cx)).map_err(Into::into)?;
self.other_ready = true;
}
}
}
}
fn call(&mut self, request: Request<ReqBody>) -> Self::Future {
assert!(self.grpc_ready);
assert!(self.other_ready);
if is_grpc_request(&request) {
GrpcMultiplexFuture {
future: GrpcMultiplexFutureEnum::Grpc {
future: self.grpc.call(request),
},
}
} else {
GrpcMultiplexFuture {
future: GrpcMultiplexFutureEnum::Other {
future: self.other.call(request),
},
}
}
}
}
#[derive(Debug, Clone)]
pub struct GrpcMultiplexLayer<O> {
other: O,
}
impl<O> GrpcMultiplexLayer<O> {
pub fn new(other: O) -> Self {
Self { other }
}
}
impl<S, O> Layer<S> for GrpcMultiplexLayer<O>
where
O: Clone,
{
type Service = GrpcMultiplexService<S, O>;
fn layer(&self, grpc: S) -> Self::Service {
GrpcMultiplexService {
grpc,
other: self.other.clone(),
grpc_ready: false,
other_ready: false,
}
}
}
fn is_grpc_request<B>(req: &Request<B>) -> bool {
req.headers()
.get(CONTENT_TYPE)
.map(|content_type| content_type.as_bytes())
.filter(|content_type| content_type.starts_with(b"application/grpc"))
.is_some()
}

View File

@ -287,7 +287,11 @@ impl InternalService for Internal {
let tenant_id = if req_key.tenant_id.is_empty() {
None
} else {
Some(Uuid::from_str(&req_key.tenant_id).map_err(|e| e.status())?)
Some(
Uuid::from_str(&req_key.tenant_id)
.map_err(|e| e.status())?
.into(),
)
};
if req_key.is_admin && tenant_id.is_some() {
@ -312,7 +316,7 @@ impl InternalService for Internal {
let ak = api_key::ApiKey {
name: req_key.name.clone(),
is_admin: req_key.is_admin,
tenant_id,
tenant_id: tenant_id.map(|u| u.into()),
..Default::default()
};

View File

@ -1,4 +1,3 @@
use std::convert::Infallible;
use std::time::{Duration, Instant};
use std::{
future::Future,
@ -7,23 +6,27 @@ use std::{
};
use anyhow::Result;
use futures::future::{self, Either, TryFutureExt};
use hyper::{service::make_service_fn, Server};
use axum::{response::IntoResponse, routing::get, Router};
use http::{
header::{self, HeaderMap, HeaderValue},
Request, StatusCode, Uri,
};
use pin_project::pin_project;
use prometheus_client::encoding::EncodeLabelSet;
use prometheus_client::metrics::counter::Counter;
use prometheus_client::metrics::family::Family;
use prometheus_client::metrics::histogram::Histogram;
use rust_embed::RustEmbed;
use tokio::{task, try_join};
use tokio::task;
use tokio::try_join;
use tonic::transport::Server as TonicServer;
use tonic::Code;
use tonic_reflection::server::Builder as TonicReflectionBuilder;
use tonic_web::GrpcWebLayer;
use tower::{Service, ServiceBuilder};
use tower::util::ServiceExt;
use tower::Service;
use tower_http::trace::TraceLayer;
use tracing::{error, info};
use warp::{http::header::HeaderValue, path::Tail, reply::Response, Filter, Rejection, Reply};
use chirpstack_api::api::application_service_server::ApplicationServiceServer;
use chirpstack_api::api::device_profile_service_server::DeviceProfileServiceServer;
@ -51,6 +54,7 @@ pub mod device_profile;
pub mod device_profile_template;
pub mod error;
pub mod gateway;
mod grpc_multiplex;
pub mod helpers;
pub mod internal;
pub mod monitoring;
@ -89,210 +93,125 @@ lazy_static! {
};
}
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
#[derive(RustEmbed)]
#[folder = "../ui/build"]
struct Asset;
type BoxError = Box<dyn std::error::Error + Send + Sync>;
pub async fn setup() -> Result<()> {
let conf = config::get();
let addr = conf.api.bind.parse()?;
let bind = conf.api.bind.parse()?;
info!(bind = %conf.api.bind, "Setting up API interface");
info!(bind = %bind, "Setting up API interface");
// Taken from the tonic hyper_warp_multiplex example:
// https://github.com/hyperium/tonic/blob/master/examples/src/hyper_warp_multiplex/server.rs#L101
let service = make_service_fn(move |_| {
// tonic gRPC service
let tonic_service = TonicServer::builder()
.accept_http1(true)
.layer(GrpcWebLayer::new())
.add_service(
TonicReflectionBuilder::configure()
.register_encoded_file_descriptor_set(chirpstack_api::api::DESCRIPTOR)
.build()
.unwrap(),
)
.add_service(InternalServiceServer::with_interceptor(
internal::Internal::new(
validator::RequestValidator::new(),
conf.api.secret.clone(),
),
auth::auth_interceptor,
))
.add_service(ApplicationServiceServer::with_interceptor(
application::Application::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(DeviceProfileServiceServer::with_interceptor(
device_profile::DeviceProfile::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(DeviceProfileTemplateServiceServer::with_interceptor(
device_profile_template::DeviceProfileTemplate::new(
validator::RequestValidator::new(),
),
auth::auth_interceptor,
))
.add_service(TenantServiceServer::with_interceptor(
tenant::Tenant::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(DeviceServiceServer::with_interceptor(
device::Device::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(UserServiceServer::with_interceptor(
user::User::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(GatewayServiceServer::with_interceptor(
gateway::Gateway::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(MulticastGroupServiceServer::with_interceptor(
multicast::MulticastGroup::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(RelayServiceServer::with_interceptor(
relay::Relay::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.into_service();
let mut tonic_service = ServiceBuilder::new()
.layer(
TraceLayer::new_for_grpc()
.make_span_with(|req: &http::Request<hyper::Body>| {
tracing::info_span!(
"gRPC",
uri = %req.uri().path(),
)
})
.on_request(OnRequest {})
.on_response(OnResponse {}),
)
.layer(ApiLogger {})
.service(tonic_service);
let web = Router::new()
.route("/auth/oidc/login", get(oidc::login_handler))
.route("/auth/oidc/callback", get(oidc::callback_handler))
.route("/auth/oauth2/login", get(oauth2::login_handler))
.route("/auth/oauth2/callback", get(oauth2::callback_handler))
.fallback(service_static_handler)
.into_service()
.map_response(|r| r.map(tonic::body::boxed));
// HTTP service
let warp_service = warp::service(
warp::path!("auth" / "oidc" / "login")
.and_then(oidc::login_handler)
.or(warp::path!("auth" / "oidc" / "callback")
.and(warp::query::<oidc::CallbackArgs>())
.and_then(oidc::callback_handler))
.or(warp::path!("auth" / "oauth2" / "login").and_then(oauth2::login_handler))
.or(warp::path!("auth" / "oauth2" / "callback")
.and(warp::query::<oauth2::CallbackArgs>())
.and_then(oauth2::callback_handler))
.or(warp::path::tail().and_then(http_serve)),
);
let mut warp_service = ServiceBuilder::new()
.layer(
TraceLayer::new_for_http()
.make_span_with(|req: &http::Request<hyper::Body>| {
tracing::info_span!(
"http",
method = req.method().as_str(),
uri = %req.uri().path(),
version = ?req.version(),
)
})
.on_request(OnRequest {})
.on_response(OnResponse {}),
)
.service(warp_service);
future::ok::<_, Infallible>(tower::service_fn(
move |req: hyper::Request<hyper::Body>| match req.method() {
&hyper::Method::GET => Either::Left(
warp_service
.call(req)
.map_ok(|res| res.map(EitherBody::Right))
.map_err(Error::from),
),
_ => Either::Right(
tonic_service
.call(req)
.map_ok(|res| res.map(EitherBody::Left))
.map_err(Error::from),
),
},
let grpc = TonicServer::builder()
.accept_http1(true)
.layer(
TraceLayer::new_for_grpc()
.make_span_with(|req: &Request<_>| {
tracing::info_span!(
"gRPC",
uri = %req.uri().path(),
)
})
.on_request(OnRequest {})
.on_response(OnResponse {}),
)
.layer(grpc_multiplex::GrpcMultiplexLayer::new(web))
.layer(ApiLoggerLayer {})
.layer(GrpcWebLayer::new())
.add_service(
TonicReflectionBuilder::configure()
.register_encoded_file_descriptor_set(chirpstack_api::api::DESCRIPTOR)
.build_v1()
.unwrap(),
)
.add_service(InternalServiceServer::with_interceptor(
internal::Internal::new(validator::RequestValidator::new(), conf.api.secret.clone()),
auth::auth_interceptor,
))
});
.add_service(ApplicationServiceServer::with_interceptor(
application::Application::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(DeviceProfileServiceServer::with_interceptor(
device_profile::DeviceProfile::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(DeviceProfileTemplateServiceServer::with_interceptor(
device_profile_template::DeviceProfileTemplate::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(TenantServiceServer::with_interceptor(
tenant::Tenant::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(DeviceServiceServer::with_interceptor(
device::Device::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(UserServiceServer::with_interceptor(
user::User::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(GatewayServiceServer::with_interceptor(
gateway::Gateway::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(MulticastGroupServiceServer::with_interceptor(
multicast::MulticastGroup::new(validator::RequestValidator::new()),
auth::auth_interceptor,
))
.add_service(RelayServiceServer::with_interceptor(
relay::Relay::new(validator::RequestValidator::new()),
auth::auth_interceptor,
));
let backend_handle = tokio::spawn(backend::setup());
let monitoring_handle = tokio::spawn(monitoring::setup());
let api_handle = tokio::spawn(Server::bind(&addr).serve(service));
let grpc_handle = tokio::spawn(grpc.serve(bind));
let _ = try_join!(api_handle, backend_handle, monitoring_handle)?;
tokio::spawn(async move {
if let Err(e) = try_join!(grpc_handle, backend_handle, monitoring_handle) {
error!(error = %e, "Setup API error");
std::process::exit(-1);
}
});
Ok(())
}
enum EitherBody<A, B> {
Left(A),
Right(B),
}
impl<A, B> http_body::Body for EitherBody<A, B>
where
A: http_body::Body + Send + Unpin,
B: http_body::Body<Data = A::Data> + Send + Unpin,
A::Error: Into<Error>,
B::Error: Into<Error>,
{
type Data = A::Data;
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
fn is_end_stream(&self) -> bool {
match self {
EitherBody::Left(b) => b.is_end_stream(),
EitherBody::Right(b) => b.is_end_stream(),
}
}
fn poll_data(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_data(cx).map(map_option_err),
EitherBody::Right(b) => Pin::new(b).poll_data(cx).map(map_option_err),
}
}
fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
match self.get_mut() {
EitherBody::Left(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
EitherBody::Right(b) => Pin::new(b).poll_trailers(cx).map_err(Into::into),
}
}
}
fn map_option_err<T, U: Into<Error>>(err: Option<Result<T, U>>) -> Option<Result<T, Error>> {
err.map(|e| e.map_err(Into::into))
}
async fn http_serve(path: Tail) -> Result<impl Reply, Rejection> {
let mut path = path.as_str();
async fn service_static_handler(uri: Uri) -> impl IntoResponse {
let mut path = {
let mut chars = uri.path().chars();
chars.next();
chars.as_str()
};
if path.is_empty() {
path = "index.html";
}
let asset = Asset::get(path).ok_or_else(warp::reject::not_found)?;
let mime = mime_guess::from_path(path).first_or_octet_stream();
let mut res = Response::new(asset.data.into());
res.headers_mut().insert(
"content-type",
HeaderValue::from_str(mime.as_ref()).unwrap(),
);
Ok(res)
if let Some(asset) = Asset::get(path) {
let mime = mime_guess::from_path(path).first_or_octet_stream();
let mut headers = HeaderMap::new();
headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_str(mime.as_ref()).unwrap(),
);
(StatusCode::OK, headers, asset.data.into())
} else {
(StatusCode::NOT_FOUND, HeaderMap::new(), vec![])
}
}
#[derive(Debug, Clone)]
@ -320,13 +239,14 @@ struct GrpcLabels {
status_code: String,
}
struct ApiLogger {}
#[derive(Debug, Clone)]
struct ApiLoggerLayer {}
impl<S> tower::Layer<S> for ApiLogger {
impl<S> tower::Layer<S> for ApiLoggerLayer {
type Service = ApiLoggerService<S>;
fn layer(&self, service: S) -> Self::Service {
ApiLoggerService { inner: service }
fn layer(&self, inner: S) -> Self::Service {
ApiLoggerService { inner }
}
}
@ -335,15 +255,15 @@ struct ApiLoggerService<S> {
inner: S,
}
impl<S, ReqBody, ResBody> Service<http::Request<ReqBody>> for ApiLoggerService<S>
impl<ReqBody, ResBody, S> Service<http::Request<ReqBody>> for ApiLoggerService<S>
where
S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
ReqBody: http_body::Body,
ResBody: http_body::Body,
S: Service<http::Request<ReqBody>, Response = http::Response<ResBody>>,
S::Error: Into<BoxError> + Send,
{
type Response = S::Response;
type Error = S::Error;
type Future = ApiLoggerResponseFuture<S::Future>;
type Future = ApiLoggerFuture<S::Future>;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
@ -352,10 +272,10 @@ where
fn call(&mut self, request: http::Request<ReqBody>) -> Self::Future {
let uri = request.uri().path().to_string();
let uri_parts: Vec<&str> = uri.split('/').collect();
let response_future = self.inner.call(request);
let future = self.inner.call(request);
let start = Instant::now();
ApiLoggerResponseFuture {
response_future,
ApiLoggerFuture {
future,
start,
service: uri_parts.get(1).map(|v| v.to_string()).unwrap_or_default(),
method: uri_parts.get(2).map(|v| v.to_string()).unwrap_or_default(),
@ -364,25 +284,26 @@ where
}
#[pin_project]
struct ApiLoggerResponseFuture<F> {
struct ApiLoggerFuture<F> {
#[pin]
response_future: F,
future: F,
start: Instant,
service: String,
method: String,
}
impl<F, ResBody, Error> Future for ApiLoggerResponseFuture<F>
impl<ResBody, F, E> Future for ApiLoggerFuture<F>
where
F: Future<Output = Result<http::Response<ResBody>, Error>>,
ResBody: http_body::Body,
F: Future<Output = Result<http::Response<ResBody>, E>>,
E: Into<BoxError> + Send,
{
type Output = Result<http::Response<ResBody>, Error>;
type Output = Result<http::Response<ResBody>, E>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
match this.response_future.poll(cx) {
match this.future.poll(cx) {
Poll::Ready(result) => {
if let Ok(response) = &result {
let status_code: i32 = match response.headers().get("grpc-status") {

View File

@ -1,51 +1,49 @@
use std::convert::Infallible;
use std::net::SocketAddr;
use anyhow::{Context, Result};
use axum::{
response::{IntoResponse, Response},
routing::get,
Router,
};
use diesel_async::RunQueryDsl;
use http::StatusCode;
use tracing::info;
use warp::{http::Response, http::StatusCode, Filter};
use crate::config;
use crate::monitoring::prometheus;
use crate::storage::{get_async_db_conn, get_async_redis_conn};
pub async fn setup() {
pub async fn setup() -> Result<()> {
let conf = config::get();
if conf.monitoring.bind.is_empty() {
return;
return Ok(());
}
let addr: SocketAddr = conf.monitoring.bind.parse().unwrap();
info!(bind = %conf.monitoring.bind, "Setting up monitoring endpoint");
let prom_endpoint = warp::get()
.and(warp::path!("metrics"))
.and_then(prometheus_handler);
let app = Router::new()
.route("/metrics", get(prometheus_handler))
.route("/health", get(health_handler));
let health_endpoint = warp::get()
.and(warp::path!("health"))
.and_then(health_handler);
let routes = prom_endpoint.or(health_endpoint);
warp::serve(routes).run(addr).await;
axum_server::bind(addr)
.serve(app.into_make_service())
.await?;
Ok(())
}
async fn prometheus_handler() -> Result<impl warp::Reply, Infallible> {
async fn prometheus_handler() -> Response {
let body = prometheus::encode_to_string().unwrap_or_default();
Ok(Response::builder().body(body))
body.into_response()
}
async fn health_handler() -> Result<impl warp::Reply, Infallible> {
async fn health_handler() -> Response {
if let Err(e) = _health_handler().await {
return Ok(warp::reply::with_status(
e.to_string(),
StatusCode::SERVICE_UNAVAILABLE,
));
(StatusCode::SERVICE_UNAVAILABLE, e.to_string()).into_response()
} else {
(StatusCode::OK, "".to_string()).into_response()
}
Ok(warp::reply::with_status("OK".to_string(), StatusCode::OK))
}
async fn _health_handler() -> Result<()> {

View File

@ -47,7 +47,7 @@ impl MulticastGroupService for MulticastGroup {
.await?;
let mg = multicast::MulticastGroup {
application_id: app_id,
application_id: app_id.into(),
name: req_mg.name.clone(),
region: req_mg.region().from_proto(),
mc_addr: DevAddr::from_str(&req_mg.mc_addr).map_err(|e| e.status())?,
@ -154,7 +154,7 @@ impl MulticastGroupService for MulticastGroup {
.await?;
let _ = multicast::update(multicast::MulticastGroup {
id: mg_id,
id: mg_id.into(),
name: req_mg.name.clone(),
region: req_mg.region().from_proto(),
mc_addr: DevAddr::from_str(&req_mg.mc_addr).map_err(|e| e.status())?,
@ -408,9 +408,17 @@ impl MulticastGroupService for MulticastGroup {
.await?;
let f_cnt = downlink::multicast::enqueue(multicast::MulticastGroupQueueItem {
multicast_group_id: mg_id,
multicast_group_id: mg_id.into(),
f_port: req_enq.f_port as i16,
data: req_enq.data.clone(),
expires_at: if let Some(expires_at) = req_enq.expires_at {
let expires_at: std::time::SystemTime = expires_at
.try_into()
.map_err(|e: prost_types::TimestampError| e.status())?;
Some(expires_at.into())
} else {
None
},
..Default::default()
})
.await
@ -478,6 +486,10 @@ impl MulticastGroupService for MulticastGroup {
f_cnt: qi.f_cnt as u32,
f_port: qi.f_port as u32,
data: qi.data.clone(),
expires_at: qi.expires_at.map(|v| {
let v: std::time::SystemTime = v.into();
v.into()
}),
});
}
}
@ -550,7 +562,7 @@ pub mod test {
// create application
let app = application::create(application::Application {
name: "test-app".into(),
tenant_id: t.id.clone(),
tenant_id: t.id,
..Default::default()
})
.await
@ -559,7 +571,7 @@ pub mod test {
// create device-profile
let dp = device_profile::create(device_profile::DeviceProfile {
name: "test-dp".into(),
tenant_id: t.id.clone(),
tenant_id: t.id,
..Default::default()
})
.await
@ -778,6 +790,7 @@ pub mod test {
f_cnt: 31,
f_port: 10,
data: vec![1, 2, 3],
expires_at: None,
},
list_queue_resp.items[0]
);
@ -871,7 +884,7 @@ pub mod test {
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
let mut req = Request::new(req);
req.extensions_mut().insert(AuthID::User(user_id.clone()));
req.extensions_mut().insert(AuthID::User(*user_id));
req
}
}

View File

@ -1,7 +1,10 @@
use std::str::FromStr;
use anyhow::{Context, Result};
use axum::{
extract::Query,
response::{IntoResponse, Redirect, Response},
};
use chrono::Duration;
use http::StatusCode;
use oauth2::basic::BasicClient;
use oauth2::reqwest;
use oauth2::{
@ -11,7 +14,6 @@ use oauth2::{
use reqwest::header::AUTHORIZATION;
use serde::{Deserialize, Serialize};
use tracing::{error, trace};
use warp::{Rejection, Reply};
use crate::config;
use crate::helpers::errors::PrintFullError;
@ -26,7 +28,7 @@ struct ClerkUserinfo {
pub user_id: String,
}
#[derive(Serialize, Deserialize)]
#[derive(Deserialize)]
pub struct CallbackArgs {
pub code: String,
pub state: String,
@ -39,16 +41,12 @@ pub struct User {
pub external_id: String,
}
pub async fn login_handler() -> Result<impl Reply, Rejection> {
pub async fn login_handler() -> Response {
let client = match get_client() {
Ok(v) => v,
Err(e) => {
error!(error = %e.full(), "Get OAuth2 client error");
return Ok(warp::reply::with_status(
"Internal error",
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response());
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response();
}
};
@ -64,26 +62,16 @@ pub async fn login_handler() -> Result<impl Reply, Rejection> {
if let Err(e) = store_verifier(&csrf_token, &pkce_verifier).await {
error!(error = %e.full(), "Store verifier error");
return Ok(warp::reply::with_status(
"Internal error",
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response());
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response();
}
Ok(
warp::redirect::found(warp::http::Uri::from_str(auth_url.as_str()).unwrap())
.into_response(),
)
Redirect::temporary(auth_url.as_str()).into_response()
}
pub async fn callback_handler(args: CallbackArgs) -> Result<impl Reply, Rejection> {
// warp::redirect does not work with '#'.
Ok(warp::reply::with_header(
warp::http::StatusCode::PERMANENT_REDIRECT,
warp::http::header::LOCATION,
format!("/#/login?code={}&state={}", args.code, args.state),
))
pub async fn callback_handler(args: Query<CallbackArgs>) -> Response {
let args: CallbackArgs = args.0;
Redirect::permanent(&format!("/#/login?code={}&state={}", args.code, args.state))
.into_response()
}
fn get_client() -> Result<Client> {

View File

@ -1,8 +1,12 @@
use std::collections::HashMap;
use std::str::FromStr;
use anyhow::{Context, Result};
use axum::{
extract::Query,
response::{IntoResponse, Redirect, Response},
};
use chrono::Duration;
use http::StatusCode;
use openidconnect::core::{
CoreClient, CoreGenderClaim, CoreIdTokenClaims, CoreIdTokenVerifier, CoreProviderMetadata,
CoreResponseType,
@ -15,7 +19,6 @@ use openidconnect::{
use serde::{Deserialize, Serialize};
use serde_json::Value;
use tracing::{error, trace};
use warp::{Rejection, Reply};
use crate::config;
use crate::helpers::errors::PrintFullError;
@ -40,22 +43,18 @@ pub struct CustomClaims {
impl AdditionalClaims for CustomClaims {}
#[derive(Serialize, Deserialize)]
#[derive(Deserialize)]
pub struct CallbackArgs {
pub code: String,
pub state: String,
}
pub async fn login_handler() -> Result<impl Reply, Rejection> {
pub async fn login_handler() -> Response {
let client = match get_client().await {
Ok(v) => v,
Err(e) => {
error!(error = %e.full(), "Get OIDC client error");
return Ok(warp::reply::with_status(
"Internal error",
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response());
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response();
}
};
@ -72,26 +71,16 @@ pub async fn login_handler() -> Result<impl Reply, Rejection> {
if let Err(e) = store_nonce(&csrf_state, &nonce).await {
error!(error = %e.full(), "Store nonce error");
return Ok(warp::reply::with_status(
"Internal error",
warp::http::StatusCode::INTERNAL_SERVER_ERROR,
)
.into_response());
return (StatusCode::INTERNAL_SERVER_ERROR, "Internal error").into_response();
}
Ok(
warp::redirect::found(warp::http::Uri::from_str(auth_url.as_str()).unwrap())
.into_response(),
)
Redirect::temporary(auth_url.as_str()).into_response()
}
pub async fn callback_handler(args: CallbackArgs) -> Result<impl Reply, Rejection> {
// warp::redirect does not work with '#'.
Ok(warp::reply::with_header(
warp::http::StatusCode::PERMANENT_REDIRECT,
warp::http::header::LOCATION,
format!("/#/login?code={}&state={}", args.code, args.state),
))
pub async fn callback_handler(args: Query<CallbackArgs>) -> Response {
let args: CallbackArgs = args.0;
Redirect::permanent(&format!("/#/login?code={}&state={}", args.code, args.state))
.into_response()
}
pub async fn get_user(code: &str, state: &str) -> Result<User> {

View File

@ -315,7 +315,7 @@ pub mod test {
fn get_request<T>(user_id: &Uuid, req: T) -> Request<T> {
let mut req = Request::new(req);
req.extensions_mut().insert(AuthID::User(user_id.clone()));
req.extensions_mut().insert(AuthID::User(*user_id));
req
}
}

View File

@ -122,7 +122,7 @@ impl TenantService for Tenant {
// update
let _ = tenant::update(tenant::Tenant {
id: tenant_id,
id: tenant_id.into(),
name: req_tenant.name.clone(),
description: req_tenant.description.clone(),
can_have_gateways: req_tenant.can_have_gateways,
@ -190,7 +190,7 @@ impl TenantService for Tenant {
let u = user::get(id).await.map_err(|e| e.status())?;
if !u.is_admin {
filters.user_id = Some(u.id);
filters.user_id = Some(u.id.into());
}
}
AuthID::Key(_) => {
@ -258,8 +258,8 @@ impl TenantService for Tenant {
.await?;
let _ = tenant::add_user(tenant::TenantUser {
tenant_id,
user_id,
tenant_id: tenant_id.into(),
user_id: user_id.into(),
is_admin: req_user.is_admin,
is_device_admin: req_user.is_device_admin,
is_gateway_admin: req_user.is_gateway_admin,
@ -342,8 +342,8 @@ impl TenantService for Tenant {
.await?;
tenant::update_user(tenant::TenantUser {
tenant_id,
user_id,
tenant_id: tenant_id.into(),
user_id: user_id.into(),
is_admin: req_user.is_admin,
is_device_admin: req_user.is_device_admin,
is_gateway_admin: req_user.is_gateway_admin,
@ -484,7 +484,7 @@ pub mod test {
let mut create_req = Request::new(create_req);
create_req
.extensions_mut()
.insert(AuthID::User(u.id.clone()));
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let create_resp = service.create(create_req).await.unwrap();
// get
@ -492,7 +492,9 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Tenant {
@ -520,7 +522,9 @@ pub mod test {
}),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id.clone()));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update(up_req).await.unwrap();
// get
@ -528,7 +532,9 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::Tenant {
@ -551,7 +557,9 @@ pub mod test {
user_id: "".into(),
};
let mut list_req = Request::new(list_req);
list_req.extensions_mut().insert(AuthID::User(u.id.clone()));
list_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let list_resp = service.list(list_req).await.unwrap();
assert_eq!(1, list_resp.get_ref().total_count);
assert_eq!(1, list_resp.get_ref().result.len());
@ -561,14 +569,18 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.delete(del_req).await.unwrap();
let del_req = api::DeleteTenantRequest {
id: create_resp.get_ref().id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
}

View File

@ -64,8 +64,8 @@ impl UserService for User {
let tenant_id = Uuid::from_str(&tu.tenant_id).map_err(|e| e.status())?;
tenant::add_user(tenant::TenantUser {
tenant_id,
user_id: u.id,
tenant_id: tenant_id.into(),
user_id: u.id.into(),
is_admin: tu.is_admin,
is_device_admin: tu.is_device_admin,
is_gateway_admin: tu.is_gateway_admin,
@ -138,7 +138,7 @@ impl UserService for User {
// update
let _ = user::update(user::User {
id: user_id,
id: user_id.into(),
is_admin: req_user.is_admin,
is_active: req_user.is_active,
email: req_user.email.clone(),
@ -294,7 +294,7 @@ pub mod test {
let mut create_req = Request::new(create_req);
create_req
.extensions_mut()
.insert(AuthID::User(u.id.clone()));
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let create_resp = service.create(create_req).await.unwrap();
// get
@ -302,7 +302,9 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::User {
@ -328,7 +330,9 @@ pub mod test {
}),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id.clone()));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update(up_req).await.unwrap();
// get
@ -336,7 +340,9 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut get_req = Request::new(get_req);
get_req.extensions_mut().insert(AuthID::User(u.id.clone()));
get_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let get_resp = service.get(get_req).await.unwrap();
assert_eq!(
Some(api::User {
@ -356,7 +362,9 @@ pub mod test {
password: "newpassword".into(),
};
let mut up_req = Request::new(up_req);
up_req.extensions_mut().insert(AuthID::User(u.id.clone()));
up_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.update_password(up_req).await.unwrap();
// list
@ -365,7 +373,9 @@ pub mod test {
limit: 10,
};
let mut list_req = Request::new(list_req);
list_req.extensions_mut().insert(AuthID::User(u.id.clone()));
list_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let list_resp = service.list(list_req).await.unwrap();
// * Admin from migrations
// * User that we created for auth
@ -378,14 +388,18 @@ pub mod test {
id: create_resp.get_ref().id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let _ = service.delete(del_req).await.unwrap();
let del_req = api::DeleteUserRequest {
id: create_resp.get_ref().id.clone(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
@ -393,7 +407,9 @@ pub mod test {
id: u.id.to_string(),
};
let mut del_req = Request::new(del_req);
del_req.extensions_mut().insert(AuthID::User(u.id.clone()));
del_req
.extensions_mut()
.insert(AuthID::User(Into::<uuid::Uuid>::into(u.id).clone()));
let del_resp = service.delete(del_req).await;
assert!(del_resp.is_err());
}

View File

@ -1,9 +1,10 @@
use handlebars::{no_escape, Handlebars};
use handlebars::Handlebars;
use super::super::config;
pub fn run() {
let template = r#"
let template = vec![
r#"
# Logging configuration
[logging]
@ -15,12 +16,13 @@ pub fn run() {
# * INFO
# * WARN
# * ERROR
# * OFF
level="{{ logging.level }}"
# Log as JSON.
json={{ logging.json }}
"#,
#[cfg(feature = "postgres")]
r#"
# PostgreSQL configuration.
[postgresql]
@ -46,8 +48,36 @@ pub fn run() {
# the server-certificate is not signed by a CA in the platform certificate
# store.
ca_cert="{{ postgresql.ca_cert }}"
"#,
#[cfg(feature = "sqlite")]
r#"
# SQLite configuration.
[sqlite]
# Sqlite DB path.
#
# Format example: sqlite:///<DATABASE>.
#
path="{{ sqlite.path }}"
# Max open connections.
#
# This sets the max. number of open connections that are allowed in the
# SQLite connection pool.
max_open_connections={{ sqlite.max_open_connections }}
# PRAGMAs.
#
# This configures the list of PRAGMAs that are executed to prepare the
# SQLite library. For a full list of available PRAGMAs see:
# https://www.sqlite.org/pragma.html
pragmas=[
{{#each sqlite.pragmas}}
"{{this}}",
{{/each}}
]
"#,
r#"
# Redis configuration.
[redis]
@ -944,6 +974,7 @@ pub fn run() {
kek="{{ this.kek }}"
{{/each}}
# UI configuration.
[ui]
# Tileserver URL.
@ -958,14 +989,14 @@ pub fn run() {
# default tileserver_url (OSM). If you configure a different tile-server, you
# might need to update the map_attribution.
map_attribution="{{ui.map_attribution}}"
"#;
"#].join("\n");
let mut reg = Handlebars::new();
reg.register_escape_fn(no_escape);
reg.register_escape_fn(|s| s.to_string().replace('"', r#"\""#));
let conf = config::get();
println!(
"{}",
reg.render_template(template, &conf)
reg.render_template(&template, &conf)
.expect("render configfile error")
);
}

View File

@ -0,0 +1,259 @@
use std::fs;
use std::path::Path;
use anyhow::Result;
use serde::Deserialize;
use tracing::{info, span, Instrument, Level};
use crate::codec::Codec;
use crate::storage::{self, device_profile_template};
use lrwn::region;
#[derive(Deserialize)]
struct VendorConfig {
pub vendor: Vendor,
}
#[derive(Default, Deserialize)]
#[serde(default)]
pub struct Vendor {
pub slug: String,
pub name: String,
pub id: usize,
pub ouis: Vec<String>,
pub devices: Vec<String>,
}
#[derive(Deserialize)]
pub struct DeviceConfig {
pub device: Device,
}
#[derive(Default, Deserialize)]
#[serde(default)]
pub struct Device {
pub slug: String,
pub name: String,
pub description: String,
pub firmware: Vec<DeviceFirmware>,
}
#[derive(Deserialize)]
pub struct DeviceFirmware {
pub version: String,
pub profiles: Vec<String>,
pub codec: Option<String>,
}
#[derive(Deserialize)]
pub struct ProfileConfig {
pub profile: Profile,
}
#[derive(Deserialize)]
#[serde(default)]
pub struct Profile {
pub region: region::CommonName,
pub mac_version: region::MacVersion,
pub reg_params_revision: region::Revision,
pub supports_otaa: bool,
pub supports_class_b: bool,
pub supports_class_c: bool,
pub max_eirp: usize,
pub abp: ProfileAbp,
pub class_b: ProfileClassB,
pub class_c: ProfileClassC,
}
impl Default for Profile {
fn default() -> Self {
Self {
region: region::CommonName::EU868,
mac_version: region::MacVersion::LORAWAN_1_0_4,
reg_params_revision: region::Revision::RP002_1_0_4,
supports_otaa: false,
supports_class_b: false,
supports_class_c: false,
max_eirp: 0,
abp: ProfileAbp::default(),
class_b: ProfileClassB::default(),
class_c: ProfileClassC::default(),
}
}
}
#[derive(Default, Deserialize)]
pub struct ProfileAbp {
pub rx1_delay: usize,
pub rx1_dr_offset: usize,
pub rx2_dr: usize,
pub rx2_freq: usize,
}
#[derive(Default, Deserialize)]
pub struct ProfileClassB {
pub timeout_secs: usize,
pub ping_slot_nb_k: usize,
pub ping_slot_dr: usize,
pub ping_slot_freq: usize,
}
#[derive(Default, Deserialize)]
pub struct ProfileClassC {
pub timeout_secs: usize,
}
pub async fn run(dir: &Path) -> Result<()> {
storage::setup().await?;
info!(path = ?dir, "Import LoRaWAN device profiles");
let vendors_dir = dir.join("vendors");
let vendors = fs::read_dir(vendors_dir)?;
for vendor in vendors {
if let Ok(vendor) = vendor {
if vendor.file_name() == "example-vendor" {
continue;
}
let span = span!(Level::INFO, "", vendor = ?vendor.file_name());
let vendor_dir = vendor.path();
if vendor_dir.is_dir() {
handle_vendor(&vendor_dir).instrument(span).await?;
}
}
}
Ok(())
}
async fn handle_vendor(dir: &Path) -> Result<()> {
let vendor_conf = dir.join("vendor.toml");
info!(path = ?vendor_conf, "Reading vendor configuration");
let mut vendor_conf: VendorConfig = toml::from_str(&fs::read_to_string(vendor_conf)?)?;
vendor_conf.vendor.slug = dir.file_name().unwrap().to_str().unwrap().to_string();
info!(vendor_name = %vendor_conf.vendor.name, "Vendor loaded");
for device in &vendor_conf.vendor.devices {
let span = span!(Level::INFO, "", device = %device);
handle_device(dir, &vendor_conf.vendor, device)
.instrument(span)
.await?;
}
Ok(())
}
async fn handle_device(dir: &Path, vendor: &Vendor, device: &str) -> Result<()> {
let device_conf = dir.join("devices").join(device);
info!(path = ?device_conf, "Reading device configuration");
let mut device_conf: DeviceConfig = toml::from_str(&fs::read_to_string(device_conf)?)?;
device_conf.device.slug = dir
.join("devices")
.join(device)
.file_stem()
.unwrap()
.to_str()
.unwrap()
.to_string();
info!(device_name = %device_conf.device.name, "Device loaded");
for firmware in &device_conf.device.firmware {
let span = span!(Level::INFO, "", firmware = %firmware.version);
handle_firmware(dir, vendor, &device_conf.device, firmware)
.instrument(span)
.await?;
}
Ok(())
}
async fn handle_firmware(
dir: &Path,
vendor: &Vendor,
device: &Device,
firmware: &DeviceFirmware,
) -> Result<()> {
let codec = if let Some(codec) = &firmware.codec {
let codec_path = dir.join("codecs").join(codec);
info!(path = ?codec_path, "Reading codec file");
Some(fs::read_to_string(codec_path)?)
} else {
None
};
for profile in &firmware.profiles {
let span = span!(Level::INFO, "", profile = %profile);
handle_profile(dir, vendor, device, firmware, &codec, profile)
.instrument(span)
.await?;
}
Ok(())
}
async fn handle_profile(
dir: &Path,
vendor: &Vendor,
device: &Device,
firmware: &DeviceFirmware,
codec: &Option<String>,
profile: &str,
) -> Result<()> {
let profile_path = dir.join("profiles").join(profile);
info!(path = ?profile_path, "Reading profile configuration");
let profile_conf: ProfileConfig = toml::from_str(&fs::read_to_string(profile_path)?)?;
let id_regex = regex::Regex::new(r"[^a-zA-Z0-9\-]+").unwrap();
let id = format!(
"{}-{}-{}-{}",
vendor.slug, device.slug, firmware.version, profile_conf.profile.region
);
let id = id_regex.replace_all(&id, "-").to_string();
let dpt = device_profile_template::DeviceProfileTemplate {
id,
name: device.name.clone(),
description: device.description.clone(),
vendor: vendor.name.clone(),
firmware: firmware.version.clone(),
region: profile_conf.profile.region,
mac_version: profile_conf.profile.mac_version,
reg_params_revision: profile_conf.profile.reg_params_revision,
adr_algorithm_id: "default".into(),
payload_codec_runtime: match codec {
Some(_) => Codec::JS,
None => Codec::NONE,
},
payload_codec_script: match codec {
Some(v) => v.into(),
None => "".into(),
},
uplink_interval: 60 * 60,
device_status_req_interval: 1,
flush_queue_on_activate: true,
supports_otaa: profile_conf.profile.supports_otaa,
supports_class_b: profile_conf.profile.supports_class_b,
supports_class_c: profile_conf.profile.supports_class_c,
class_b_timeout: profile_conf.profile.class_b.timeout_secs as i32,
class_b_ping_slot_nb_k: profile_conf.profile.class_b.ping_slot_nb_k as i32,
class_b_ping_slot_dr: profile_conf.profile.class_b.ping_slot_dr as i16,
class_b_ping_slot_freq: profile_conf.profile.class_b.ping_slot_freq as i64,
class_c_timeout: profile_conf.profile.class_c.timeout_secs as i32,
abp_rx1_delay: profile_conf.profile.abp.rx1_delay as i16,
abp_rx1_dr_offset: profile_conf.profile.abp.rx1_dr_offset as i16,
abp_rx2_dr: profile_conf.profile.abp.rx2_dr as i16,
abp_rx2_freq: profile_conf.profile.abp.rx2_freq as i64,
..Default::default()
};
info!(id = %dpt.id, "Creating or updating device-profile template");
device_profile_template::upsert(dpt).await?;
Ok(())
}

View File

@ -43,7 +43,7 @@ pub async fn run() -> Result<()> {
*dev_eui,
&storage::device::DeviceChangeset {
dev_addr: Some(Some(DevAddr::from_slice(&ds.dev_addr)?)),
device_session: Some(Some(ds)),
device_session: Some(Some(ds.into())),
..Default::default()
},
)

View File

@ -1,6 +1,7 @@
pub mod configfile;
pub mod create_api_key;
pub mod import_legacy_lorawan_devices_repository;
pub mod import_lorawan_device_profiles;
pub mod migrate_ds_to_pg;
pub mod print_ds;
pub mod root;

View File

@ -1,5 +1,8 @@
use anyhow::Result;
use tracing::info;
use futures::stream::StreamExt;
use signal_hook::consts::signal::{SIGINT, SIGTERM};
use signal_hook_tokio::Signals;
use tracing::{info, warn};
use crate::gateway;
use crate::{adr, api, backend, downlink, integration, region, storage};
@ -20,5 +23,10 @@ pub async fn run() -> Result<()> {
downlink::setup().await;
api::setup().await?;
let mut signals = Signals::new([SIGINT, SIGTERM]).unwrap();
if let Some(signal) = signals.next().await {
warn!(signal = ?signal, "Signal received, terminating process");
}
Ok(())
}

View File

@ -5,8 +5,11 @@ use std::str::FromStr;
use anyhow::{Context, Result};
use chrono::{DateTime, Utc};
use diesel::backend::Backend;
#[cfg(feature = "postgres")]
use diesel::pg::Pg;
use diesel::sql_types::Text;
#[cfg(feature = "sqlite")]
use diesel::sqlite::Sqlite;
use diesel::{deserialize, serialize};
use serde::{Deserialize, Serialize};
@ -40,6 +43,7 @@ where
}
}
#[cfg(feature = "postgres")]
impl serialize::ToSql<Text, Pg> for Codec
where
str: serialize::ToSql<Text, Pg>,
@ -49,6 +53,14 @@ where
}
}
#[cfg(feature = "sqlite")]
impl serialize::ToSql<Text, Sqlite> for Codec {
fn to_sql(&self, out: &mut serialize::Output<'_, '_, Sqlite>) -> serialize::Result {
out.set_value(self.to_string());
Ok(serialize::IsNull::No)
}
}
impl FromStr for Codec {
type Err = anyhow::Error;

View File

@ -19,6 +19,7 @@ pub struct Configuration {
pub logging: Logging,
pub postgresql: Postgresql,
pub redis: Redis,
pub sqlite: Sqlite,
pub api: Api,
pub gateway: Gateway,
pub network: Network,
@ -90,6 +91,29 @@ impl Default for Redis {
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct Sqlite {
pub path: String,
pub pragmas: Vec<String>,
pub max_open_connections: u32,
}
impl Default for Sqlite {
fn default() -> Self {
Sqlite {
path: "sqlite://chirpstack.sqlite".into(),
pragmas: vec![
// Set busy_timeout to avoid manually managing transaction business/contention
"busy_timeout = 1000".to_string(),
// Enable foreign-keys since it is off by default
"foreign_keys = ON".to_string(),
],
max_open_connections: 4,
}
}
}
#[derive(Serialize, Deserialize, Clone)]
#[serde(default)]
pub struct Api {

View File

@ -127,8 +127,8 @@ pub mod test {
for _ in 0..100000 {
let offset = get_ping_offset(beacon_ts, &dev_addr, ping_nb).unwrap();
assert!(offset <= ping_period - 1);
beacon_ts = beacon_ts + *BEACON_PERIOD;
assert!(offset < ping_period);
beacon_ts += *BEACON_PERIOD;
}
}
}

View File

@ -363,7 +363,7 @@ impl Data {
trace!("Selecting downlink gateway");
let gw_down = helpers::select_downlink_gateway(
Some(self.tenant.id),
Some(self.tenant.id.into()),
&self.device.get_device_session()?.region_config_id,
self.network_conf.gateway_prefer_min_margin,
self.device_gateway_rx_info.as_mut().unwrap(),
@ -464,9 +464,11 @@ impl Data {
// The queue item:
// * should fit within the max payload size
// * should not be pending
// * should not be expired
// * in case encrypted, should have a valid FCntDown
if qi.data.len() <= max_payload_size
&& !qi.is_pending
&& !(qi.expires_at.is_some() && qi.expires_at.unwrap() < Utc::now())
&& !(qi.is_encrypted
&& (qi.f_cnt_down.unwrap_or_default() as u32) < ds.get_a_f_cnt_down())
{
@ -519,12 +521,41 @@ impl Data {
},
};
integration::ack_event(self.application.id, &self.device.variables, &pl).await;
integration::ack_event(self.application.id.into(), &self.device.variables, &pl)
.await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of timeout");
continue;
}
// Handle expired payload.
if let Some(expires_at) = qi.expires_at {
if expires_at < Utc::now() {
device_queue::delete_item(&qi.id)
.await
.context("Delete device queue-item")?;
let pl = integration_pb::LogEvent {
time: Some(Utc::now().into()),
device_info: Some(device_info.clone()),
level: integration_pb::LogLevel::Error.into(),
code: integration_pb::LogCode::Expired.into(),
description: "Device queue-item discarded because it has expired"
.to_string(),
context: [("queue_item_id".to_string(), qi.id.to_string())]
.iter()
.cloned()
.collect(),
};
integration::log_event(self.application.id.into(), &self.device.variables, &pl)
.await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because it has expired");
continue;
}
}
// Handle payload size.
if qi.data.len() > max_payload_size {
device_queue::delete_item(&qi.id)
@ -549,7 +580,8 @@ impl Data {
.collect(),
};
integration::log_event(self.application.id, &self.device.variables, &pl).await;
integration::log_event(self.application.id.into(), &self.device.variables, &pl)
.await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of max. payload size");
continue;
@ -585,7 +617,8 @@ impl Data {
.collect(),
};
integration::log_event(self.application.id, &self.device.variables, &pl).await;
integration::log_event(self.application.id.into(), &self.device.variables, &pl)
.await;
warn!(dev_eui = %self.device.dev_eui, device_queue_item_id = %qi.id, "Device queue-item discarded because of invalid frame-counter");
continue;
@ -1349,7 +1382,7 @@ impl Data {
ds.last_device_status_request = Some(Utc::now().into());
}
Some(ts) => {
let ts: DateTime<Utc> = ts.clone().try_into().map_err(anyhow::Error::msg)?;
let ts: DateTime<Utc> = (*ts).try_into().map_err(anyhow::Error::msg)?;
let req_interval = Duration::from_secs(60 * 60 * 24)
/ self.device_profile.device_status_req_interval as u32;
@ -1711,8 +1744,7 @@ impl Data {
match &rd.w_f_cnt_last_request {
Some(v) => {
let last_req: DateTime<Utc> =
v.clone().try_into().map_err(anyhow::Error::msg)?;
let last_req: DateTime<Utc> = (*v).try_into().map_err(anyhow::Error::msg)?;
if last_req
< Utc::now()
.checked_sub_signed(chrono::Duration::try_hours(24).unwrap())
@ -2729,7 +2761,7 @@ mod test {
name: "max payload size error".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11],
@ -2765,11 +2797,46 @@ mod test {
..Default::default()
}),
},
Test {
name: "item has expired".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3],
expires_at: Some(Utc::now() - chrono::Duration::seconds(10)),
..Default::default()
}],
expected_queue_item: None,
expected_ack_event: None,
expected_log_event: Some(integration_pb::LogEvent {
device_info: Some(integration_pb::DeviceInfo {
tenant_id: t.id.to_string(),
tenant_name: t.name.clone(),
application_id: app.id.to_string(),
application_name: app.name.clone(),
device_profile_id: dp.id.to_string(),
device_profile_name: dp.name.clone(),
device_name: d.name.clone(),
dev_eui: d.dev_eui.to_string(),
..Default::default()
}),
level: integration_pb::LogLevel::Error.into(),
code: integration_pb::LogCode::Expired.into(),
description: "Device queue-item discarded because it has expired".into(),
context: [("queue_item_id".to_string(), qi_id.to_string())]
.iter()
.cloned()
.collect(),
..Default::default()
}),
},
Test {
name: "is pending".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
f_cnt_down: Some(10),
@ -2801,7 +2868,7 @@ mod test {
name: "invalid frame-counter".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3],
@ -2842,14 +2909,14 @@ mod test {
name: "valid payload".into(),
max_payload_size: 10,
queue_items: vec![device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3],
..Default::default()
}],
expected_queue_item: Some(device_queue::DeviceQueueItem {
id: qi_id,
id: qi_id.into(),
dev_eui: d.dev_eui,
f_port: 1,
data: vec![1, 2, 3],
@ -2875,7 +2942,7 @@ mod test {
let d = device::partial_update(
d.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(ds.clone())),
device_session: Some(Some(ds.clone().into())),
..Default::default()
},
)
@ -3419,11 +3486,14 @@ mod test {
dev_addr: Some(*dev_addr),
application_id: app.id,
device_profile_id: dp_ed.id,
device_session: Some(internal::DeviceSession {
dev_addr: dev_addr.to_vec(),
nwk_s_enc_key: vec![0; 16],
..Default::default()
}),
device_session: Some(
internal::DeviceSession {
dev_addr: dev_addr.to_vec(),
nwk_s_enc_key: vec![0; 16],
..Default::default()
}
.into(),
),
..Default::default()
})
.await
@ -3436,7 +3506,7 @@ mod test {
let d_relay = device::partial_update(
d_relay.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(test.device_session.clone())),
device_session: Some(Some(test.device_session.clone().into())),
..Default::default()
},
)
@ -3885,7 +3955,7 @@ mod test {
let d_relay = device::partial_update(
d_relay.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(test.device_session.clone())),
device_session: Some(Some(test.device_session.clone().into())),
..Default::default()
},
)
@ -4016,7 +4086,7 @@ mod test {
application: application::Application::default(),
device_profile: test.device_profile.clone(),
device: device::Device {
device_session: Some(test.device_session.clone()),
device_session: Some(test.device_session.clone().into()),
..Default::default()
},
network_conf: config::get_region_network("eu868").unwrap(),
@ -4127,7 +4197,7 @@ mod test {
application: application::Application::default(),
device_profile: test.device_profile.clone(),
device: device::Device {
device_session: Some(test.device_session.clone()),
device_session: Some(test.device_session.clone().into()),
..Default::default()
},
network_conf: config::get_region_network("eu868").unwrap(),
@ -4248,7 +4318,7 @@ mod test {
application: application::Application::default(),
device_profile: test.device_profile.clone(),
device: device::Device {
device_session: Some(test.device_session.clone()),
device_session: Some(test.device_session.clone().into()),
..Default::default()
},
network_conf: config::get_region_network("eu868").unwrap(),
@ -4505,7 +4575,7 @@ mod test {
let d_relay = device::partial_update(
d_relay.dev_eui,
&device::DeviceChangeset {
device_session: Some(Some(test.device_session.clone())),
device_session: Some(Some(test.device_session.clone().into())),
..Default::default()
},
)

View File

@ -239,7 +239,7 @@ mod tests {
},
// is_private_down is set, first gateway matches tenant.
Test {
tenant_id: Some(t.id),
tenant_id: Some(t.id.into()),
min_snr_margin: 0.0,
rx_info: internal::DeviceGatewayRxInfo {
items: vec![
@ -262,7 +262,7 @@ mod tests {
},
// is_private_down is set, second gateway matches tenant.
Test {
tenant_id: Some(t.id),
tenant_id: Some(t.id.into()),
min_snr_margin: 0.0,
rx_info: internal::DeviceGatewayRxInfo {
items: vec![
@ -319,7 +319,7 @@ mod tests {
for _ in 0..100 {
let out = select_downlink_gateway(
test.tenant_id,
&"eu868",
"eu868",
test.min_snr_margin,
&mut rx_info,
)
@ -328,8 +328,7 @@ mod tests {
}
assert_eq!(test.expected_gws.len(), gw_map.len());
assert_eq!(
true,
assert!(
expected_gws.keys().all(|k| gw_map.contains_key(k)),
"Expected: {:?}, got: {:?}",
expected_gws,

Some files were not shown because too many files have changed in this diff Show More