Compare commits

..

41 Commits

Author SHA1 Message Date
5bbd71ab3a Add warnings to fuota deployment job + UI.
In case some devices do not complete a job, this makes it possible
to show a warning in the UI showing the amount of devices that did
not complete the job.
2025-03-19 14:47:47 +00:00
f02256245c lrwn: Align v2 fragmentation fec with LBM stack.
This aligns the forward-error-correction code with the LoRa Basics
Modem stack, which seems to be different from the TS004 MATLAB example
code in that it only calls the matrix_line function for the redundancy
frames and thus the n argument ranges from 1 until (and including) the
number of redundancy frames.

The TS004 MATLAB example calls the matrix_line function for every
fragment, thus the n argument ranges from 1 until (and including) m +
the number of redundancy frames. While n <= m, it returns early.
2025-03-18 13:32:14 +00:00
a0f07b5303 Initial FUOTA v2 implementation.
This implements selecting the v2.0.0 app-layer package in the
device-profile and handling these payloads in the FUOTA flow.
2025-03-18 12:44:15 +00:00
60547ff973 Fix fuota device timeout filter.
We should filter on error_msg = "" instead of != NULL, as we only would
like to update the error_msg for devices that not yet have an error set.
Else we would overwrite an earlier error message.
2025-03-13 15:09:03 +00:00
351406c363 lrwn: Implement v2 app layer fragmentation structs + encoding. 2025-03-13 15:09:03 +00:00
8b59136942 lrwn: Implement v2 app layer multicast setup structs. 2025-03-13 15:09:03 +00:00
b5e562aa64 lrwn: Implement v2 app layer clock sync structs.
These are the same as the v1 struct, buts re-exporting will make the
documentation confusing + will become inconsistent with other app layer
packages that do provide different struct implementations.
2025-03-13 15:09:03 +00:00
5a7694a3a4 Bump version to 4.12.0-test.1 2025-03-13 15:09:03 +00:00
98ba2f3198 Set device tags after FUOTA complete. 2025-03-13 15:09:03 +00:00
bbdf2dd781 Error if there are no fuota devices + cleanup mc group.
In case there are no fuota devices (e.g. all devices failed the previous
step), this will log a warning and the flow will continue with multicast
cleanup and completion steps.
2025-03-13 15:09:03 +00:00
71cc1aca74 Set FUOTA deployment completed_at. 2025-03-13 15:09:03 +00:00
27f6d2cf03 Implement full FUOTA flow + UI components. 2025-03-13 15:09:03 +00:00
b8ab0182de ui: Make app-layer params configurable. 2025-03-13 15:09:03 +00:00
b1e6c97942 Add get_ and update_device to fuota storage + add return_msg.
The return_msg (text) field can be used to capture errors, e.g. when the
end-device failed to setup the multicast-group.
2025-03-13 15:09:03 +00:00
e75b62f335 lrwn: Add function for encrypting McKey. 2025-03-13 15:09:03 +00:00
cac682c245 Implement handling AppTimeReq / AppTimeAns. 2025-03-13 15:09:03 +00:00
b61a684739 Update fuota + device-keys structs / storage.
This add the gen_app_key to the device keys which is needed for FUOTA
and adds a random multicast address + key to the fuota deployment. To
the FUOTA job structure, this adds a return msg such that errors can
be captured in the database.
2025-03-13 15:09:03 +00:00
439a6b0542 lrwn: Fix clocksync time_correction type.
The correct type is i32 instead of u32, as the value can be negative.
2025-03-13 15:09:03 +00:00
f9efed2317 Rename ts00x_port to _f_port.
This is consistent with the naming in the lrwn package.
2025-03-13 15:09:03 +00:00
4984e8556d ui: First part of FUOTA UI implementation.
Currently this allows for creating FUOTA dpeloyments and adding to /
removing from devices and gateways. In its current state, it does not
show the status of the FUOTA deployment.
2025-03-13 15:09:03 +00:00
43753958ef api: List devices by device-profile + expose tags. 2025-03-13 15:09:03 +00:00
1d76fabdb0 Add APIs + functions to get app. device-profiles and tags.
These API methods can be used to given an application id, retrieve
the list of used device-profiles and device tags.
2025-03-13 15:09:03 +00:00
de7e0c619d Update fuota API. Add options for auto-calculation of params.
This adds options to auto-calculate the fragment size (based on max.
payload size available for the given data-rate) and multicast
timeout (based on server settings).
2025-03-13 15:09:03 +00:00
38386b23f2 Add start job + get schedulable jobs functions + API. 2025-03-13 15:09:03 +00:00
a3e27d8b65 Add tests + add fuota jobs functions. 2025-03-13 15:09:03 +00:00
9b735d6521 Add first fuota storage functions / API. 2025-03-13 15:09:03 +00:00
d000cd3385 Add option to filter devices by tags. 2025-03-13 15:09:03 +00:00
ac52cce7ee api: Extend 'limit' field documentation. 2025-03-13 15:09:03 +00:00
bbce25efbf Add app-layer params field to device-profile API. 2025-03-13 15:09:03 +00:00
4e7ab31714 Add app-layer params field to device-profile schema. 2025-03-13 15:09:03 +00:00
3c3c1f125d Refactor device-profile relay fields. 2025-03-13 15:09:03 +00:00
909eaed1ba Refactor device-profile class-c fields. 2025-03-13 15:09:03 +00:00
b8c02b943c Refactor device-profile class-b fields. 2025-03-13 15:09:03 +00:00
82ed66cf09 Refactor device-profile abp fields.
This this puts the ABP parameters into a single JSON(B) field, to reduce
the amount of device-profile fields that currently exist. The same work
will be done for Class-B/C and Relay parameters. Once completed, this
means we can drop the diesel '64-column-tables' feature, which will
reduce compile time.
2025-03-13 15:09:03 +00:00
f3d3262006 lrwn: Implement v1 applayer multicastsetup key functions. 2025-03-13 15:09:03 +00:00
ffe01d387c lrwn: Implement applayer v1 fragmentation encoding func. 2025-03-13 15:09:03 +00:00
d1f4f42a79 lrwn: Implement v1 applayer fragmentation structs. 2025-03-13 15:09:03 +00:00
bf21297a42 lrwn: Replace Duration with u32 in applayer timesync. 2025-03-13 15:09:03 +00:00
bcb8aaad4f lrwn: Implement v1 applayer multicast setup structs. 2025-03-13 15:09:03 +00:00
f43c9154bc lrwn: Implement v1 applayer clock sync structs. 2025-03-13 15:09:03 +00:00
3e7f09db62 Add Yandex ID OAuth provider support. (#622) 2025-03-12 13:03:35 +00:00
26 changed files with 4230 additions and 209 deletions

View File

@ -104,6 +104,9 @@ enum Ts003Version {
// v1.0.0.
TS003_V100 = 1;
// v2.0.0
TS003_v200 = 2;
}
enum Ts004Version {
@ -112,6 +115,9 @@ enum Ts004Version {
// v1.0.0.
TS004_V100 = 1;
// v2.0.0
TS004_V200 = 2;
}
enum Ts005Version {
@ -120,6 +126,9 @@ enum Ts005Version {
// v1.0.0.
TS005_V100 = 1;
// v2.0.0
TS005_V200 = 2;
}
// DeviceProfileService is the service providing API methods for managing

View File

@ -393,6 +393,9 @@ message FuotaDeploymentJob {
// Scheduler run after.
google.protobuf.Timestamp scheduler_run_after = 6;
// Warning message.
string warning_msg = 7;
// Error message.
string error_msg = 7;
string error_msg = 8;
}

View File

@ -104,6 +104,9 @@ enum Ts003Version {
// v1.0.0.
TS003_V100 = 1;
// v2.0.0
TS003_v200 = 2;
}
enum Ts004Version {
@ -112,6 +115,9 @@ enum Ts004Version {
// v1.0.0.
TS004_V100 = 1;
// v2.0.0
TS004_V200 = 2;
}
enum Ts005Version {
@ -120,6 +126,9 @@ enum Ts005Version {
// v1.0.0.
TS005_V100 = 1;
// v2.0.0
TS005_V200 = 2;
}
// DeviceProfileService is the service providing API methods for managing

View File

@ -393,6 +393,9 @@ message FuotaDeploymentJob {
// Scheduler run after.
google.protobuf.Timestamp scheduler_run_after = 6;
// Warning message.
string warning_msg = 7;
// Error message.
string error_msg = 7;
string error_msg = 8;
}

View File

@ -15,6 +15,8 @@ create table fuota_deployment (
multicast_class_b_ping_slot_nb_k smallint not null,
multicast_frequency bigint not null,
multicast_timeout smallint not null,
multicast_session_start timestamp with time zone null,
multicast_session_end timestamp with time zone null,
unicast_max_retry_count smallint not null,
fragmentation_fragment_size smallint not null,
fragmentation_redundancy_percentage smallint not null,
@ -57,6 +59,7 @@ create table fuota_deployment_job (
max_retry_count smallint not null,
attempt_count smallint not null,
scheduler_run_after timestamp with time zone not null,
warning_msg text not null,
error_msg text not null,
primary key (fuota_deployment_id, job)

View File

@ -15,6 +15,8 @@ create table fuota_deployment (
multicast_class_b_ping_slot_nb_k smallint not null,
multicast_frequency bigint not null,
multicast_timeout smallint not null,
multicast_session_start datetime null,
multicast_session_end datetime null,
unicast_max_retry_count smallint not null,
fragmentation_fragment_size smallint not null,
fragmentation_redundancy_percentage smallint not null,
@ -57,6 +59,7 @@ create table fuota_deployment_job (
max_retry_count smallint not null,
attempt_count smallint not null,
scheduler_run_after datetime not null,
warning_msg text not null,
error_msg text not null,
primary key (fuota_deployment_id, job)

View File

@ -630,6 +630,7 @@ impl FuotaService for Fuota {
scheduler_run_after: Some(helpers::datetime_to_prost_timestamp(
&j.scheduler_run_after,
)),
warning_msg: j.warning_msg.clone(),
error_msg: j.error_msg.clone(),
})
.collect(),

View File

@ -292,6 +292,7 @@ impl ToProto<api::Ts003Version> for Option<fields::device_profile::Ts003Version>
match self {
None => api::Ts003Version::Ts003NotImplemented,
Some(fields::device_profile::Ts003Version::V100) => api::Ts003Version::Ts003V100,
Some(fields::device_profile::Ts003Version::V200) => api::Ts003Version::Ts003V200,
}
}
}
@ -301,6 +302,7 @@ impl FromProto<Option<fields::device_profile::Ts003Version>> for api::Ts003Versi
match self {
api::Ts003Version::Ts003NotImplemented => None,
api::Ts003Version::Ts003V100 => Some(fields::device_profile::Ts003Version::V100),
api::Ts003Version::Ts003V200 => Some(fields::device_profile::Ts003Version::V200),
}
}
}
@ -310,6 +312,7 @@ impl ToProto<api::Ts004Version> for Option<fields::device_profile::Ts004Version>
match self {
None => api::Ts004Version::Ts004NotImplemented,
Some(fields::device_profile::Ts004Version::V100) => api::Ts004Version::Ts004V100,
Some(fields::device_profile::Ts004Version::V200) => api::Ts004Version::Ts004V200,
}
}
}
@ -319,6 +322,7 @@ impl FromProto<Option<fields::device_profile::Ts004Version>> for api::Ts004Versi
match self {
api::Ts004Version::Ts004NotImplemented => None,
api::Ts004Version::Ts004V100 => Some(fields::device_profile::Ts004Version::V100),
api::Ts004Version::Ts004V200 => Some(fields::device_profile::Ts004Version::V200),
}
}
}
@ -328,6 +332,7 @@ impl ToProto<api::Ts005Version> for Option<fields::device_profile::Ts005Version>
match self {
None => api::Ts005Version::Ts005NotImplemented,
Some(fields::device_profile::Ts005Version::V100) => api::Ts005Version::Ts005V100,
Some(fields::device_profile::Ts005Version::V200) => api::Ts005Version::Ts005V200,
}
}
}
@ -337,6 +342,7 @@ impl FromProto<Option<fields::device_profile::Ts005Version>> for api::Ts005Versi
match self {
api::Ts005Version::Ts005NotImplemented => None,
api::Ts005Version::Ts005V100 => Some(fields::device_profile::Ts005Version::V100),
api::Ts005Version::Ts005V200 => Some(fields::device_profile::Ts005Version::V200),
}
}
}

View File

@ -28,6 +28,12 @@ struct ClerkUserinfo {
pub user_id: String,
}
#[derive(Deserialize)]
struct YandexUserinfo {
pub default_email: String,
pub id: String,
}
#[derive(Deserialize)]
pub struct CallbackArgs {
pub code: String,
@ -129,9 +135,11 @@ pub async fn get_user(code: &str, state: &str) -> Result<User> {
let conf = config::get();
let provider = conf.user_authentication.oauth2.provider.clone();
let userinfo_url = conf.user_authentication.oauth2.userinfo_url.clone();
let assume_email_verified = conf.user_authentication.oauth2.assume_email_verified;
match provider.as_ref() {
"clerk" => get_clerk_user(access_token, &userinfo_url).await,
"yandex" => get_yandex_user(access_token, &userinfo_url, assume_email_verified).await,
_ => Err(anyhow!("Unsupported OAuth2 provider: {}", provider)),
}
}
@ -155,6 +163,25 @@ async fn get_clerk_user(token: &str, url: &str) -> Result<User> {
})
}
async fn get_yandex_user(token: &str, url: &str, assume_email_verified: bool) -> Result<User> {
let client = reqwest::Client::new();
let auth_header = format!("Bearer {}", token);
let resp: YandexUserinfo = client
.get(url)
.header(AUTHORIZATION, auth_header)
.send()
.await?
.json()
.await?;
Ok(User {
email: resp.default_email,
email_verified: assume_email_verified,
external_id: resp.id,
})
}
async fn store_verifier(
token: &oauth2::CsrfToken,
verifier: &oauth2::PkceCodeVerifier,

View File

@ -21,6 +21,7 @@ pub async fn handle_uplink(
match version {
Ts003Version::V100 => handle_uplink_v100(dev, dp, rx_info, data).await,
Ts003Version::V200 => handle_uplink_v200(dev, dp, rx_info, data).await,
}
}
@ -42,6 +43,24 @@ async fn handle_uplink_v100(
Ok(())
}
async fn handle_uplink_v200(
dev: &device::Device,
dp: &device_profile::DeviceProfile,
rx_info: &[gw::UplinkRxInfo],
data: &[u8],
) -> Result<()> {
let pl = clocksync::v2::Payload::from_slice(true, data)?;
match pl {
clocksync::v2::Payload::AppTimeReq(pl) => {
handle_v2_app_time_req(dev, dp, rx_info, pl).await?
}
_ => {}
}
Ok(())
}
async fn handle_v1_app_time_req(
dev: &device::Device,
dp: &device_profile::DeviceProfile,
@ -91,6 +110,55 @@ async fn handle_v1_app_time_req(
Ok(())
}
async fn handle_v2_app_time_req(
dev: &device::Device,
dp: &device_profile::DeviceProfile,
rx_info: &[gw::UplinkRxInfo],
pl: clocksync::v2::AppTimeReqPayload,
) -> Result<()> {
info!("Handling AppTimeReq");
let now_time_since_gps = if let Some(t) = helpers::get_time_since_gps_epoch(rx_info) {
chrono::Duration::from_std(t)?
} else {
helpers::get_rx_timestamp_chrono(rx_info).to_gps_time()
};
let dev_time_since_gps = chrono::Duration::seconds(pl.device_time.into());
let time_diff = (now_time_since_gps - dev_time_since_gps).num_seconds();
let time_correction: i32 = if time_diff < 0 {
time_diff.try_into().unwrap_or(i32::MIN)
} else {
time_diff.try_into().unwrap_or(i32::MAX)
};
if time_diff == 0 && !pl.param.ans_required {
return Ok(());
}
info!(
time_correcrtion = time_correction,
"Responding with AppTimeAns"
);
let ans = clocksync::v2::Payload::AppTimeAns(clocksync::v2::AppTimeAnsPayload {
time_correction,
param: clocksync::v2::AppTimeAnsPayloadParam {
token_ans: pl.param.token_req,
},
});
device_queue::enqueue_item(device_queue::DeviceQueueItem {
dev_eui: dev.dev_eui,
f_port: dp.app_layer_params.ts003_f_port.into(),
data: ans.to_vec()?,
..Default::default()
})
.await?;
Ok(())
}
#[cfg(test)]
mod test {
use super::*;
@ -248,4 +316,153 @@ mod test {
}
}
}
#[tokio::test]
async fn test_handle_v2_app_time_req() {
struct Test {
name: String,
rx_info: gw::UplinkRxInfo,
req: clocksync::v2::AppTimeReqPayload,
expected: Option<clocksync::v2::AppTimeAnsPayload>,
}
let tests = vec![
Test {
name: "device synced".into(),
rx_info: gw::UplinkRxInfo {
time_since_gps_epoch: Some(Duration::from_secs(1234).try_into().unwrap()),
..Default::default()
},
req: clocksync::v2::AppTimeReqPayload {
device_time: 1234,
param: clocksync::v2::AppTimeReqPayloadParam {
token_req: 8,
ans_required: false,
},
},
expected: None,
},
Test {
name: "device synced - ans required".into(),
rx_info: gw::UplinkRxInfo {
time_since_gps_epoch: Some(Duration::from_secs(1234).try_into().unwrap()),
..Default::default()
},
req: clocksync::v2::AppTimeReqPayload {
device_time: 1234,
param: clocksync::v2::AppTimeReqPayloadParam {
token_req: 8,
ans_required: true,
},
},
expected: Some(clocksync::v2::AppTimeAnsPayload {
time_correction: 0,
param: clocksync::v2::AppTimeAnsPayloadParam { token_ans: 8 },
}),
},
Test {
name: "device not synced (positive correction)".into(),
rx_info: gw::UplinkRxInfo {
time_since_gps_epoch: Some(Duration::from_secs(1234).try_into().unwrap()),
..Default::default()
},
req: clocksync::v2::AppTimeReqPayload {
device_time: 1200,
param: clocksync::v2::AppTimeReqPayloadParam {
token_req: 8,
ans_required: false,
},
},
expected: Some(clocksync::v2::AppTimeAnsPayload {
time_correction: 34,
param: clocksync::v2::AppTimeAnsPayloadParam { token_ans: 8 },
}),
},
Test {
name: "device not synced (negative correction)".into(),
rx_info: gw::UplinkRxInfo {
time_since_gps_epoch: Some(Duration::from_secs(1200).try_into().unwrap()),
..Default::default()
},
req: clocksync::v2::AppTimeReqPayload {
device_time: 1234,
param: clocksync::v2::AppTimeReqPayloadParam {
token_req: 8,
ans_required: false,
},
},
expected: Some(clocksync::v2::AppTimeAnsPayload {
time_correction: -34,
param: clocksync::v2::AppTimeAnsPayloadParam { token_ans: 8 },
}),
},
];
let _guard = test::prepare().await;
let t = tenant::create(tenant::Tenant {
name: "test-tenant".into(),
..Default::default()
})
.await
.unwrap();
let app = application::create(application::Application {
name: "test-app".into(),
tenant_id: t.id,
..Default::default()
})
.await
.unwrap();
let dp = device_profile::create(device_profile::DeviceProfile {
name: "test-dp".into(),
tenant_id: t.id,
app_layer_params: fields::AppLayerParams {
ts003_version: Some(Ts003Version::V200),
..Default::default()
},
..Default::default()
})
.await
.unwrap();
let d = device::create(device::Device {
name: "test-dev".into(),
dev_eui: EUI64::from_be_bytes([1, 2, 3, 4, 5, 6, 7, 8]),
application_id: app.id,
device_profile_id: dp.id,
..Default::default()
})
.await
.unwrap();
for tst in &tests {
println!("> {}", tst.name);
device_queue::flush_for_dev_eui(&d.dev_eui).await.unwrap();
let pl = clocksync::v2::Payload::AppTimeReq(tst.req.clone());
handle_uplink(
&d,
&dp,
&[tst.rx_info.clone()],
dp.app_layer_params.ts003_f_port,
&pl.to_vec().unwrap(),
)
.await;
let queue_items = device_queue::get_for_dev_eui(&d.dev_eui).await.unwrap();
if let Some(expected_pl) = &tst.expected {
assert_eq!(1, queue_items.len());
let qi = queue_items.first().unwrap();
assert_eq!(dp.app_layer_params.ts003_f_port as i16, qi.f_port);
let qi_pl = clocksync::v2::Payload::from_slice(false, &qi.data).unwrap();
let expected_pl = clocksync::v2::Payload::AppTimeAns(expected_pl.clone());
assert_eq!(expected_pl, qi_pl);
} else {
assert!(queue_items.is_empty());
}
}
}
}

View File

@ -18,6 +18,7 @@ pub async fn handle_uplink(
match version {
Ts004Version::V100 => handle_uplink_v100(dev, data).await,
Ts004Version::V200 => handle_uplink_v200(dev, data).await,
}
}
@ -37,6 +38,22 @@ async fn handle_uplink_v100(dev: &device::Device, data: &[u8]) -> Result<()> {
Ok(())
}
async fn handle_uplink_v200(dev: &device::Device, data: &[u8]) -> Result<()> {
let pl = fragmentation::v2::Payload::from_slice(true, data)?;
match pl {
fragmentation::v2::Payload::FragSessionSetupAns(pl) => {
handle_v2_frag_session_setup_ans(dev, pl).await?
}
fragmentation::v2::Payload::FragSessionStatusAns(pl) => {
handle_v2_frag_session_status_ans(dev, pl).await?
}
_ => {}
}
Ok(())
}
async fn handle_v1_frag_session_setup_ans(
dev: &device::Device,
pl: fragmentation::v1::FragSessionSetupAnsPayload,
@ -68,6 +85,39 @@ async fn handle_v1_frag_session_setup_ans(
Ok(())
}
async fn handle_v2_frag_session_setup_ans(
dev: &device::Device,
pl: fragmentation::v2::FragSessionSetupAnsPayload,
) -> Result<()> {
info!("Handling FragSessionSetupAns");
let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?;
if pl.frag_algo_unsupported
| pl.not_enough_memory
| pl.frag_index_unsupported
| pl.wrong_descriptor
| pl.session_cnt_replay
{
warn!(
frag_index = pl.frag_index,
frag_algo_unsupported = pl.frag_algo_unsupported,
not_enough_memory = pl.not_enough_memory,
frag_index_unsupported = pl.frag_index_unsupported,
wrong_descriptor = pl.wrong_descriptor,
session_cnt_replay = pl.session_cnt_replay,
"FragSessionAns contains errors"
);
fuota_dev.error_msg = format!("Error: FragSessionAns response frag_algo_unsupported={}, not_enough_memory={}, frag_index_unsupported={}, wrong_descriptor={}, session_cnt_replay={}", pl.frag_algo_unsupported, pl.not_enough_memory, pl.frag_index_unsupported, pl.wrong_descriptor, pl.session_cnt_replay);
} else {
fuota_dev.frag_session_setup_completed_at = Some(Utc::now());
}
let _ = fuota::update_device(fuota_dev).await?;
Ok(())
}
async fn handle_v1_frag_session_status_ans(
dev: &device::Device,
pl: fragmentation::v1::FragSessionStatusAnsPayload,
@ -94,3 +144,36 @@ async fn handle_v1_frag_session_status_ans(
Ok(())
}
async fn handle_v2_frag_session_status_ans(
dev: &device::Device,
pl: fragmentation::v2::FragSessionStatusAnsPayload,
) -> Result<()> {
info!("Handling FragSessionStatusAnsPayload");
let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?;
if pl.missing_frag != 0
|| pl.status.memory_error
|| pl.status.mic_error
|| pl.status.session_does_not_exist
{
warn!(
frag_index = pl.received_and_index.frag_index,
nb_frag_received = pl.received_and_index.nb_frag_received,
missing_frag = pl.missing_frag,
memory_error = pl.status.memory_error,
mic_error = pl.status.mic_error,
session_does_not_exist = pl.status.session_does_not_exist,
"FragSessionStatusAns contains errors"
);
fuota_dev.error_msg = format!("Error: FragSessionStatusAns response nb_frag_received={}, missing_frag={}, memory_error={}, mic_error={}, session_does_not_exist={}", pl.received_and_index.nb_frag_received, pl.missing_frag, pl.status.memory_error, pl.status.mic_error, pl.status.session_does_not_exist);
} else {
fuota_dev.frag_status_completed_at = Some(Utc::now());
}
let _ = fuota::update_device(fuota_dev).await?;
Ok(())
}

View File

@ -11,7 +11,10 @@ use lrwn::region::MacVersion;
use crate::config;
use crate::downlink;
use crate::gpstime::ToGpsTime;
use crate::storage::fields::{FuotaJob, RequestFragmentationSessionStatus};
use crate::storage::fields::{
device_profile::Ts004Version, device_profile::Ts005Version, FuotaJob,
RequestFragmentationSessionStatus,
};
use crate::storage::{device, device_keys, device_profile, device_queue, fuota, multicast};
pub struct Flow {
@ -109,14 +112,30 @@ impl Flow {
self.job.attempt_count += 1;
// Get McAppSKey + McNwkSKey.
let mc_app_s_key = multicastsetup::v1::get_mc_app_s_key(
self.fuota_deployment.multicast_key,
self.fuota_deployment.multicast_addr,
)?;
let mc_nwk_s_key = multicastsetup::v1::get_mc_net_s_key(
self.fuota_deployment.multicast_key,
self.fuota_deployment.multicast_addr,
)?;
let (mc_app_s_key, mc_nwk_s_key) = match self.device_profile.app_layer_params.ts005_version
{
Some(Ts005Version::V100) => (
multicastsetup::v1::get_mc_app_s_key(
self.fuota_deployment.multicast_key,
self.fuota_deployment.multicast_addr,
)?,
multicastsetup::v1::get_mc_net_s_key(
self.fuota_deployment.multicast_key,
self.fuota_deployment.multicast_addr,
)?,
),
Some(Ts005Version::V200) => (
multicastsetup::v2::get_mc_app_s_key(
self.fuota_deployment.multicast_key,
self.fuota_deployment.multicast_addr,
)?,
multicastsetup::v2::get_mc_net_s_key(
self.fuota_deployment.multicast_key,
self.fuota_deployment.multicast_addr,
)?,
),
None => return Err(anyhow!("Device-profile does not support TS005")),
};
let _ = multicast::create(multicast::MulticastGroup {
id: self.fuota_deployment.id,
@ -178,61 +197,119 @@ impl Flow {
}
async fn multicast_group_setup(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
return Ok(Some((FuotaJob::FragSessionSetup, Utc::now())));
}
info!("Sending McGroupSetupReq commands to devices");
self.job.attempt_count += 1;
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
// Filter on devices that have not completed the McGroupSetup.
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
.into_iter()
.filter(|d| d.mc_group_setup_completed_at.is_none())
.collect();
if fuota_devices.is_empty() {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(None);
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
info!("Set timeout error to devices that did not respond to McGroupSetupReq");
fuota::set_device_timeout_error(
self.fuota_deployment.id.into(),
true,
false,
false,
false,
)
.await?;
if !fuota_devices.is_empty() {
self.job.warning_msg = format!(
"{} devices did not complete the multicast group setup",
fuota_devices.len()
);
}
return Ok(Some((FuotaJob::FragSessionSetup, Utc::now())));
}
info!("Sending McGroupSetupReq commands to devices");
self.job.attempt_count += 1;
for fuota_dev in &fuota_devices {
let dev_keys = device_keys::get(&fuota_dev.dev_eui).await?;
let mc_root_key = match self.device_profile.mac_version {
MacVersion::LORAWAN_1_0_0
| MacVersion::LORAWAN_1_0_1
| MacVersion::LORAWAN_1_0_2
| MacVersion::LORAWAN_1_0_3
| MacVersion::LORAWAN_1_0_4 => {
multicastsetup::v1::get_mc_root_key_for_gen_app_key(dev_keys.gen_app_key)?
let pl = match self.device_profile.app_layer_params.ts005_version {
Some(Ts005Version::V100) => {
let mc_root_key = match self.device_profile.mac_version {
MacVersion::LORAWAN_1_0_0
| MacVersion::LORAWAN_1_0_1
| MacVersion::LORAWAN_1_0_2
| MacVersion::LORAWAN_1_0_3
| MacVersion::LORAWAN_1_0_4 => {
multicastsetup::v1::get_mc_root_key_for_gen_app_key(
dev_keys.gen_app_key,
)?
}
MacVersion::LORAWAN_1_1_0 | MacVersion::Latest => {
multicastsetup::v1::get_mc_root_key_for_app_key(dev_keys.app_key)?
}
};
let mc_ke_key = multicastsetup::v1::get_mc_ke_key(mc_root_key)?;
let mc_key_encrypted = multicastsetup::v1::encrypt_mc_key(
mc_ke_key,
self.fuota_deployment.multicast_key,
);
multicastsetup::v1::Payload::McGroupSetupReq(
multicastsetup::v1::McGroupSetupReqPayload {
mc_group_id_header:
multicastsetup::v1::McGroupSetupReqPayloadMcGroupIdHeader {
mc_group_id: 0,
},
mc_addr: self.fuota_deployment.multicast_addr,
mc_key_encrypted,
min_mc_f_count: 0,
max_mc_f_count: u32::MAX,
},
)
.to_vec()?
}
MacVersion::LORAWAN_1_1_0 | MacVersion::Latest => {
multicastsetup::v1::get_mc_root_key_for_app_key(dev_keys.app_key)?
Some(Ts005Version::V200) => {
let mc_root_key = match self.device_profile.mac_version {
MacVersion::LORAWAN_1_0_0
| MacVersion::LORAWAN_1_0_1
| MacVersion::LORAWAN_1_0_2
| MacVersion::LORAWAN_1_0_3
| MacVersion::LORAWAN_1_0_4 => {
multicastsetup::v2::get_mc_root_key_for_gen_app_key(
dev_keys.gen_app_key,
)?
}
MacVersion::LORAWAN_1_1_0 | MacVersion::Latest => {
multicastsetup::v2::get_mc_root_key_for_app_key(dev_keys.app_key)?
}
};
let mc_ke_key = multicastsetup::v2::get_mc_ke_key(mc_root_key)?;
let mc_key_encrypted = multicastsetup::v2::encrypt_mc_key(
mc_ke_key,
self.fuota_deployment.multicast_key,
);
multicastsetup::v2::Payload::McGroupSetupReq(
multicastsetup::v2::McGroupSetupReqPayload {
mc_group_id_header:
multicastsetup::v2::McGroupSetupReqPayloadMcGroupIdHeader {
mc_group_id: 0,
},
mc_addr: self.fuota_deployment.multicast_addr,
mc_key_encrypted,
min_mc_f_count: 0,
max_mc_f_count: u32::MAX,
},
)
.to_vec()?
}
None => return Err(anyhow!("Device-profile does not support TS005")),
};
let mc_ke_key = multicastsetup::v1::get_mc_ke_key(mc_root_key)?;
let mc_key_encrypted =
multicastsetup::v1::encrypt_mc_key(mc_ke_key, self.fuota_deployment.multicast_key);
let pl = multicastsetup::v1::Payload::McGroupSetupReq(
multicastsetup::v1::McGroupSetupReqPayload {
mc_group_id_header: multicastsetup::v1::McGroupSetupReqPayloadMcGroupIdHeader {
mc_group_id: 0,
},
mc_addr: self.fuota_deployment.multicast_addr,
mc_key_encrypted,
min_mc_f_count: 0,
max_mc_f_count: u32::MAX,
},
);
device_queue::enqueue_item(device_queue::DeviceQueueItem {
let _ = device_queue::enqueue_item(device_queue::DeviceQueueItem {
dev_eui: fuota_dev.dev_eui,
f_port: self.device_profile.app_layer_params.ts005_f_port.into(),
data: pl.to_vec()?,
data: pl,
..Default::default()
})
.await?;
@ -250,25 +327,11 @@ impl Flow {
}
async fn fragmentation_session_setup(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
return Ok(Some((FuotaJob::McSession, Utc::now())));
}
info!("Set timeout error to devices that did not respond to McGroupSetupReq");
fuota::set_device_timeout_error(self.fuota_deployment.id.into(), true, false, false, false)
.await?;
info!("Sending FragSessionSetupReq commands to devices");
self.job.attempt_count += 1;
let fragment_size = self.fuota_deployment.fragmentation_fragment_size as usize;
let fragments =
(self.fuota_deployment.payload.len() as f32 / fragment_size as f32).ceil() as usize;
let padding =
(fragment_size - (self.fuota_deployment.payload.len() % fragment_size)) % fragment_size;
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
let fuota_devices_completed_mc_group_setup_count = fuota_devices
.iter()
.filter(|d| d.mc_group_setup_completed_at.is_some())
.count();
// Filter on devices that have completed the previous step, but not yet the FragSessionSetup.
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
@ -279,33 +342,110 @@ impl Flow {
})
.collect();
if fuota_devices.is_empty() {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
info!("Set timeout error to devices that did not respond to FragSessionSetupReq");
fuota::set_device_timeout_error(
self.fuota_deployment.id.into(),
false,
false,
true,
false,
)
.await?;
if !fuota_devices.is_empty() {
self.job.warning_msg = format!(
"{} devices did not complete the fragmentation session setup",
fuota_devices.len()
);
}
return Ok(Some((FuotaJob::McSession, Utc::now())));
}
info!("Sending FragSessionSetupReq commands to devices");
self.job.attempt_count += 1;
if fuota_devices_completed_mc_group_setup_count == 0 {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
for fuota_dev in &fuota_devices {
let pl = fragmentation::v1::Payload::FragSessionSetupReq(
fragmentation::v1::FragSessionSetupReqPayload {
frag_session: fragmentation::v1::FragSessionSetuReqPayloadFragSession {
mc_group_bit_mask: [true, false, false, false],
frag_index: 0,
},
nb_frag: fragments as u16,
frag_size: fragment_size as u8,
padding: padding as u8,
control: fragmentation::v1::FragSessionSetuReqPayloadControl {
block_ack_delay: 0,
fragmentation_matrix: 0,
},
descriptor: [0, 0, 0, 0],
},
);
let fragment_size = self.fuota_deployment.fragmentation_fragment_size as usize;
let fragments =
(self.fuota_deployment.payload.len() as f32 / fragment_size as f32).ceil() as usize;
let padding =
(fragment_size - (self.fuota_deployment.payload.len() % fragment_size)) % fragment_size;
device_queue::enqueue_item(device_queue::DeviceQueueItem {
for fuota_dev in &fuota_devices {
let pl = match self.device_profile.app_layer_params.ts004_version {
Some(Ts004Version::V100) => fragmentation::v1::Payload::FragSessionSetupReq(
fragmentation::v1::FragSessionSetupReqPayload {
frag_session: fragmentation::v1::FragSessionSetuReqPayloadFragSession {
mc_group_bit_mask: [true, false, false, false],
frag_index: 0,
},
nb_frag: fragments as u16,
frag_size: fragment_size as u8,
padding: padding as u8,
control: fragmentation::v1::FragSessionSetuReqPayloadControl {
block_ack_delay: 0,
fragmentation_matrix: 0,
},
descriptor: [0, 0, 0, 0],
},
)
.to_vec()?,
Some(Ts004Version::V200) => {
let dev_keys = device_keys::get(&fuota_dev.dev_eui).await?;
let data_block_int_key = match self.device_profile.mac_version {
MacVersion::LORAWAN_1_0_0
| MacVersion::LORAWAN_1_0_1
| MacVersion::LORAWAN_1_0_2
| MacVersion::LORAWAN_1_0_3
| MacVersion::LORAWAN_1_0_4 => {
fragmentation::v2::get_data_block_int_key(dev_keys.gen_app_key)?
}
MacVersion::LORAWAN_1_1_0 | MacVersion::Latest => {
fragmentation::v2::get_data_block_int_key(dev_keys.app_key)?
}
};
let mic = fragmentation::v2::calculate_mic(
data_block_int_key,
0,
0,
[0, 0, 0, 0],
&self.fuota_deployment.payload,
)?;
fragmentation::v2::Payload::FragSessionSetupReq(
fragmentation::v2::FragSessionSetupReqPayload {
frag_session: fragmentation::v2::FragSessionSetuReqPayloadFragSession {
mc_group_bit_mask: [true, false, false, false],
frag_index: 0,
},
nb_frag: fragments as u16,
frag_size: fragment_size as u8,
padding: padding as u8,
control: fragmentation::v2::FragSessionSetuReqPayloadControl {
block_ack_delay: 0,
frag_algo: 0,
ack_reception: false,
},
descriptor: [0, 0, 0, 0],
mic,
session_cnt: 0,
},
)
.to_vec()?
}
None => return Err(anyhow!("Device-profile does not support TS004")),
};
let _ = device_queue::enqueue_item(device_queue::DeviceQueueItem {
dev_eui: fuota_dev.dev_eui,
f_port: self.device_profile.app_layer_params.ts004_f_port.into(),
data: pl.to_vec()?,
data: pl,
..Default::default()
})
.await?;
@ -323,19 +463,11 @@ impl Flow {
}
async fn multicast_session_setup(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
return Ok(Some((FuotaJob::Enqueue, Utc::now())));
}
info!("Set timeout error to devices that did not respond to FragSessionSetupReq");
fuota::set_device_timeout_error(self.fuota_deployment.id.into(), false, false, true, false)
.await?;
info!("Sending McClassB/McClassCSessionReq commands to devices");
self.job.attempt_count += 1;
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
let fuota_devices_completed_frag_session_setup_count = fuota_devices
.iter()
.filter(|d| d.frag_session_setup_completed_at.is_some())
.count();
// Filter on devices that have completed the previous step, but not yet the McSession.
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
@ -345,79 +477,189 @@ impl Flow {
})
.collect();
if fuota_devices.is_empty() {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
info!("Set timeout error to devices that did not respond to McSessionReq");
fuota::set_device_timeout_error(
self.fuota_deployment.id.into(),
false,
true,
false,
false,
)
.await?;
if !fuota_devices.is_empty() {
self.job.warning_msg = format!(
"{} devices did not complete the multicast session setup",
fuota_devices.len()
);
}
return Ok(Some((
FuotaJob::Enqueue,
self.fuota_deployment
.multicast_session_start
.unwrap_or_else(|| Utc::now()),
)));
}
info!("Sending McClassB/McClassCSessionReq commands to devices");
self.job.attempt_count += 1;
if fuota_devices_completed_frag_session_setup_count == 0 {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
for fuota_dev in &fuota_devices {
// Calculate the session start and end dates the first time this job is executed.
if self.fuota_deployment.multicast_session_start.is_none()
&& self.fuota_deployment.multicast_session_end.is_none()
{
// We want to start the session (retry_count + 1) x the uplink_interval.
// Note that retry_count=0 means only one attempt.
let session_start = (Utc::now()
let session_start = Utc::now()
+ TimeDelta::seconds(
(self.job.max_retry_count as i64 + 1)
* self.device_profile.uplink_interval as i64,
))
);
let session_end = {
let timeout = match self.fuota_deployment.multicast_group_type.as_ref() {
"B" => Duration::from_secs(
128 * (1 << self.fuota_deployment.multicast_timeout as u64),
),
"C" => Duration::from_secs(1 << self.fuota_deployment.multicast_timeout as u64),
_ => return Err(anyhow!("Invalid multicast-group type")),
};
session_start + timeout
};
self.fuota_deployment.multicast_session_start = Some(session_start);
self.fuota_deployment.multicast_session_end = Some(session_end);
self.fuota_deployment = fuota::update_deployment(self.fuota_deployment.clone()).await?;
}
let session_start = self
.fuota_deployment
.multicast_session_start
.ok_or_else(|| anyhow!("multicast_session_start is None"))?
.to_gps_time()
.num_seconds()
% (1 << 32);
% (1 << 32);
let pl = match self.fuota_deployment.multicast_group_type.as_ref() {
"B" => multicastsetup::v1::Payload::McClassBSessionReq(
multicastsetup::v1::McClassBSessionReqPayload {
mc_group_id_header:
multicastsetup::v1::McClassBSessionReqPayloadMcGroupIdHeader {
mc_group_id: 0,
for fuota_dev in &fuota_devices {
let pl = match self.device_profile.app_layer_params.ts005_version {
Some(Ts005Version::V100) => {
match self.fuota_deployment.multicast_group_type.as_ref() {
"B" => multicastsetup::v1::Payload::McClassBSessionReq(
multicastsetup::v1::McClassBSessionReqPayload {
mc_group_id_header:
multicastsetup::v1::McClassBSessionReqPayloadMcGroupIdHeader {
mc_group_id: 0,
},
session_time: (session_start - (session_start % 128)) as u32,
time_out_periodicity:
multicastsetup::v1::McClassBSessionReqPayloadTimeOutPeriodicity {
time_out: self.fuota_deployment.multicast_timeout as u8,
periodicity: self.fuota_deployment.multicast_class_b_ping_slot_nb_k
as u8,
},
dl_frequ: self.fuota_deployment.multicast_frequency as u32,
dr: self.fuota_deployment.multicast_dr as u8,
},
session_time: (session_start - (session_start % 128)) as u32,
time_out_periodicity:
multicastsetup::v1::McClassBSessionReqPayloadTimeOutPeriodicity {
time_out: self.fuota_deployment.multicast_timeout as u8,
periodicity: self.fuota_deployment.multicast_class_b_ping_slot_nb_k
as u8,
).to_vec()?,
"C" => multicastsetup::v1::Payload::McClassCSessionReq(
multicastsetup::v1::McClassCSessionReqPayload {
mc_group_id_header:
multicastsetup::v1::McClassCSessionReqPayloadMcGroupIdHeader {
mc_group_id: 0,
},
session_time: session_start as u32,
session_time_out:
multicastsetup::v1::McClassCSessionReqPayloadSessionTimeOut {
time_out: self.fuota_deployment.multicast_timeout as u8,
},
dl_frequ: self.fuota_deployment.multicast_frequency as u32,
dr: self.fuota_deployment.multicast_dr as u8,
},
dl_frequ: self.fuota_deployment.multicast_frequency as u32,
dr: self.fuota_deployment.multicast_dr as u8,
},
),
"C" => multicastsetup::v1::Payload::McClassCSessionReq(
multicastsetup::v1::McClassCSessionReqPayload {
mc_group_id_header:
multicastsetup::v1::McClassCSessionReqPayloadMcGroupIdHeader {
mc_group_id: 0,
},
session_time: session_start as u32,
session_time_out:
multicastsetup::v1::McClassCSessionReqPayloadSessionTimeOut {
time_out: self.fuota_deployment.multicast_timeout as u8,
},
dl_frequ: self.fuota_deployment.multicast_frequency as u32,
dr: self.fuota_deployment.multicast_dr as u8,
},
),
_ => {
return Err(anyhow!(
"Unsupported group-type: {}",
self.fuota_deployment.multicast_group_type
))
).to_vec()?,
_ => {
return Err(anyhow!(
"Unsupported group-type: {}",
self.fuota_deployment.multicast_group_type
))
}
}
}
Some(Ts005Version::V200) => {
match self.fuota_deployment.multicast_group_type.as_ref() {
"B" => multicastsetup::v2::Payload::McClassBSessionReq(
multicastsetup::v2::McClassBSessionReqPayload {
mc_group_id_header:
multicastsetup::v2::McClassBSessionReqPayloadMcGroupIdHeader {
mc_group_id: 0,
},
session_time: (session_start - (session_start % 128)) as u32,
time_out_periodicity:
multicastsetup::v2::McClassBSessionReqPayloadTimeOutPeriodicity {
time_out: self.fuota_deployment.multicast_timeout as u8,
periodicity: self.fuota_deployment.multicast_class_b_ping_slot_nb_k
as u8,
},
dl_frequ: self.fuota_deployment.multicast_frequency as u32,
dr: self.fuota_deployment.multicast_dr as u8,
},
).to_vec()?,
"C" => multicastsetup::v2::Payload::McClassCSessionReq(
multicastsetup::v2::McClassCSessionReqPayload {
mc_group_id_header:
multicastsetup::v2::McClassCSessionReqPayloadMcGroupIdHeader {
mc_group_id: 0,
},
session_time: session_start as u32,
session_time_out:
multicastsetup::v2::McClassCSessionReqPayloadSessionTimeOut {
time_out: self.fuota_deployment.multicast_timeout as u8,
},
dl_frequ: self.fuota_deployment.multicast_frequency as u32,
dr: self.fuota_deployment.multicast_dr as u8,
},
).to_vec()?,
_ => {
return Err(anyhow!(
"Unsupported group-type: {}",
self.fuota_deployment.multicast_group_type
))
}
}
}
None => return Err(anyhow!("Device-profile does not support TS005")),
};
device_queue::enqueue_item(device_queue::DeviceQueueItem {
dev_eui: fuota_dev.dev_eui,
f_port: self.device_profile.app_layer_params.ts005_f_port.into(),
data: pl.to_vec()?,
data: pl,
..Default::default()
})
.await?;
}
// In this case we need to exactly try the max. attempts, because this is what the
// session-start time calculation is based on. If we continue with enqueueing too
// early, the multicast-session hasn't started yet.
let scheduler_run_after =
Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64);
Ok(Some((FuotaJob::McSession, scheduler_run_after)))
if !fuota_devices.is_empty() {
// There are devices pending setup, we need to re-run this job.
let scheduler_run_after =
Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64);
Ok(Some((FuotaJob::McSession, scheduler_run_after)))
} else {
Ok(Some((
FuotaJob::Enqueue,
self.fuota_deployment
.multicast_session_start
.unwrap_or_else(|| Utc::now()),
)))
}
}
async fn enqueue(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
@ -426,16 +668,12 @@ impl Flow {
return Ok(Some((FuotaJob::FragStatus, Utc::now())));
}
info!("Set timeout error to devices that did not respond to McSessionReq");
fuota::set_device_timeout_error(self.fuota_deployment.id.into(), false, true, false, false)
.await?;
info!("Enqueueing fragmented payload to multicast group");
self.job.attempt_count += 1;
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
// Filter on devices that have completed the previous step, but not yet the McSession.
// Filter on devices that have completed the previous step.
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
.into_iter()
.filter(|d| d.mc_session_completed_at.is_some())
@ -459,57 +697,85 @@ impl Flow {
let mut payload = self.fuota_deployment.payload.clone();
payload.extend_from_slice(&vec![0; padding]);
let encoded_fragments = fragmentation::v1::encode(&payload, fragment_size, redundancy)?;
for (i, frag) in encoded_fragments.iter().enumerate() {
let pl =
fragmentation::v1::Payload::DataFragment(fragmentation::v1::DataFragmentPayload {
index_and_n: fragmentation::v1::DataFragmentPayloadIndexAndN {
frag_index: 0,
n: (i + 1) as u16,
},
data: frag.clone(),
});
let payloads = match self.device_profile.app_layer_params.ts004_version {
Some(Ts004Version::V100) => {
let mut payloads = Vec::new();
let encoded_fragments =
fragmentation::v1::encode(&payload, fragment_size, redundancy)?;
for (i, frag) in encoded_fragments.iter().enumerate() {
payloads.push(
fragmentation::v1::Payload::DataFragment(
fragmentation::v1::DataFragmentPayload {
index_and_n: fragmentation::v1::DataFragmentPayloadIndexAndN {
frag_index: 0,
n: (i + 1) as u16,
},
data: frag.clone(),
},
)
.to_vec()?,
);
}
payloads
}
Some(Ts004Version::V200) => {
let mut payloads = Vec::new();
let encoded_fragments =
fragmentation::v2::encode(&payload, fragment_size, redundancy)?;
for (i, frag) in encoded_fragments.iter().enumerate() {
payloads.push(
fragmentation::v2::Payload::DataFragment(
fragmentation::v2::DataFragmentPayload {
index_and_n: fragmentation::v2::DataFragmentPayloadIndexAndN {
frag_index: 0,
n: (i + 1) as u16,
},
data: frag.clone(),
},
)
.to_vec()?,
);
}
payloads
}
None => return Err(anyhow!("Device-profile does not support TS004")),
};
for pl in payloads {
let _ = downlink::multicast::enqueue(multicast::MulticastGroupQueueItem {
multicast_group_id: self.fuota_deployment.id,
f_port: self.device_profile.app_layer_params.ts004_f_port as i16,
data: pl.to_vec()?,
data: pl,
..Default::default()
})
.await?;
}
match self.fuota_deployment.request_fragmentation_session_status {
RequestFragmentationSessionStatus::NoRequest => {
Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())))
}
RequestFragmentationSessionStatus::NoRequest => Ok(Some((
FuotaJob::DeleteMcGroup,
self.fuota_deployment
.multicast_session_end
.unwrap_or_else(|| Utc::now()),
))),
RequestFragmentationSessionStatus::AfterFragEnqueue => {
Ok(Some((FuotaJob::FragStatus, Utc::now())))
}
RequestFragmentationSessionStatus::AfterSessTimeout => {
let timeout = match self.fuota_deployment.multicast_group_type.as_ref() {
"B" => Duration::from_secs(
128 * (1 << self.fuota_deployment.multicast_timeout as u64),
),
"C" => Duration::from_secs(1 << self.fuota_deployment.multicast_timeout as u64),
_ => return Err(anyhow!("Invalid multicast-group type")),
};
Ok(Some((FuotaJob::FragStatus, Utc::now() + timeout)))
}
RequestFragmentationSessionStatus::AfterSessTimeout => Ok(Some((
FuotaJob::FragStatus,
self.fuota_deployment
.multicast_session_end
.unwrap_or_else(|| Utc::now()),
))),
}
}
async fn fragmentation_status(&mut self) -> Result<Option<(FuotaJob, DateTime<Utc>)>> {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
info!("Enqueue FragSessionStatusReq");
self.job.attempt_count += 1;
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
let fuota_devices_completed_mc_session_count = fuota_devices
.iter()
.filter(|d| d.mc_session_completed_at.is_some())
.count();
// Filter on devices that have completed the multicast-session setup but
// not yet responded to the FragSessionStatusReq.
@ -518,23 +784,59 @@ impl Flow {
.filter(|d| d.mc_session_completed_at.is_some() && d.frag_status_completed_at.is_none())
.collect();
if fuota_devices.is_empty() {
// Proceed with next step after reaching the max attempts.
if self.job.attempt_count > self.job.max_retry_count {
info!("Set timeout error to devices that did not respond to FragSessionStatusReq");
fuota::set_device_timeout_error(
self.fuota_deployment.id.into(),
false,
false,
false,
true,
)
.await?;
if !fuota_devices.is_empty() {
self.job.warning_msg = format!(
"{} devices did not complete the fragmentation status",
fuota_devices.len()
);
}
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
info!("Enqueue FragSessionStatusReq");
self.job.attempt_count += 1;
if fuota_devices_completed_mc_session_count == 0 {
self.job.error_msg = "There are no devices available to complete this step".into();
return Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())));
}
for fuota_dev in &fuota_devices {
let pl = fragmentation::v1::Payload::FragSessionStatusReq(
fragmentation::v1::FragSessionStatusReqPayload {
participants: true,
frag_index: 0,
},
);
let pl = match self.device_profile.app_layer_params.ts004_version {
Some(Ts004Version::V100) => fragmentation::v1::Payload::FragSessionStatusReq(
fragmentation::v1::FragSessionStatusReqPayload {
participants: true,
frag_index: 0,
},
)
.to_vec()?,
Some(Ts004Version::V200) => fragmentation::v2::Payload::FragSessionStatusReq(
fragmentation::v2::FragSessionStatusReqPayload {
participants: true,
frag_index: 0,
},
)
.to_vec()?,
None => return Err(anyhow!("Device-profile does not support TS004")),
};
device_queue::enqueue_item(device_queue::DeviceQueueItem {
dev_eui: fuota_dev.dev_eui,
f_port: self.device_profile.app_layer_params.ts004_f_port.into(),
data: pl.to_vec()?,
data: pl,
..Default::default()
})
.await?;
@ -546,7 +848,12 @@ impl Flow {
Utc::now() + TimeDelta::seconds(self.device_profile.uplink_interval as i64);
Ok(Some((FuotaJob::FragStatus, scheduler_run_after)))
} else {
Ok(Some((FuotaJob::DeleteMcGroup, Utc::now())))
Ok(Some((
FuotaJob::DeleteMcGroup,
self.fuota_deployment
.multicast_session_end
.unwrap_or_else(|| Utc::now()),
)))
}
}
@ -581,21 +888,15 @@ impl Flow {
} else {
fuota::set_device_completed(self.fuota_deployment.id.into(), true, true, true, true)
.await?;
fuota::set_device_timeout_error(
self.fuota_deployment.id.into(),
false,
false,
false,
true,
)
.await?;
}
let fuota_devices = fuota::get_devices(self.job.fuota_deployment_id.into(), -1, 0).await?;
let fuota_devices_count = fuota_devices.len();
let fuota_devices: Vec<fuota::FuotaDeploymentDevice> = fuota_devices
.into_iter()
.filter(|d| d.completed_at.is_some() && d.error_msg.is_empty())
.collect();
let fuota_devices_completed_count = fuota_devices.len();
for fuota_device in &fuota_devices {
let mut d = device::get(&fuota_device.dev_eui).await?;
@ -605,9 +906,15 @@ impl Flow {
let _ = device::update(d).await?;
}
let mut d = self.fuota_deployment.clone();
d.completed_at = Some(Utc::now());
let _ = fuota::update_deployment(d).await?;
if fuota_devices_count != fuota_devices_completed_count {
self.job.warning_msg = format!(
"{} devices did not complete the FUOTA deployment",
fuota_devices_count - fuota_devices_completed_count
);
}
self.fuota_deployment.completed_at = Some(Utc::now());
self.fuota_deployment = fuota::update_deployment(self.fuota_deployment.clone()).await?;
Ok(None)
}

View File

@ -18,6 +18,7 @@ pub async fn handle_uplink(
match version {
Ts005Version::V100 => handle_uplink_v100(dev, data).await,
Ts005Version::V200 => handle_uplink_v200(dev, data).await,
}
}
@ -40,6 +41,25 @@ async fn handle_uplink_v100(dev: &device::Device, data: &[u8]) -> Result<()> {
Ok(())
}
async fn handle_uplink_v200(dev: &device::Device, data: &[u8]) -> Result<()> {
let pl = multicastsetup::v2::Payload::from_slice(true, data)?;
match pl {
multicastsetup::v2::Payload::McGroupSetupAns(pl) => {
handle_v2_mc_group_setup_ans(dev, pl).await?
}
multicastsetup::v2::Payload::McClassBSessionAns(pl) => {
handle_v2_mc_class_b_session_ans(dev, pl).await?
}
multicastsetup::v2::Payload::McClassCSessionAns(pl) => {
handle_v2_mc_class_c_session_ans(dev, pl).await?
}
_ => {}
}
Ok(())
}
async fn handle_v1_mc_group_setup_ans(
dev: &device::Device,
pl: multicastsetup::v1::McGroupSetupAnsPayload,
@ -64,6 +84,30 @@ async fn handle_v1_mc_group_setup_ans(
Ok(())
}
async fn handle_v2_mc_group_setup_ans(
dev: &device::Device,
pl: multicastsetup::v2::McGroupSetupAnsPayload,
) -> Result<()> {
info!("Handling McGroupSetupAns");
let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?;
if pl.mc_group_id_header.id_error {
warn!(
mc_group_id = pl.mc_group_id_header.mc_group_id,
id_error = true,
"McGroupSetupAns contains errors"
);
fuota_dev.error_msg = "Error: McGroupSetupAns response id_error=true".into();
} else {
fuota_dev.mc_group_setup_completed_at = Some(Utc::now());
}
let _ = fuota::update_device(fuota_dev).await?;
Ok(())
}
async fn handle_v1_mc_class_b_session_ans(
dev: &device::Device,
pl: multicastsetup::v1::McClassBSessionAnsPayload,
@ -101,6 +145,46 @@ async fn handle_v1_mc_class_b_session_ans(
Ok(())
}
async fn handle_v2_mc_class_b_session_ans(
dev: &device::Device,
pl: multicastsetup::v2::McClassBSessionAnsPayload,
) -> Result<()> {
info!("Handling McClassBSessionAns");
let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?;
if pl.status_and_mc_group_id.dr_error
| pl.status_and_mc_group_id.freq_error
| pl.status_and_mc_group_id.mc_group_undefined
| pl.status_and_mc_group_id.start_missed
{
warn!(
dr_error = pl.status_and_mc_group_id.dr_error,
freq_error = pl.status_and_mc_group_id.freq_error,
mc_group_undefined = pl.status_and_mc_group_id.mc_group_undefined,
start_missed = pl.status_and_mc_group_id.start_missed,
"McClassBSessionAns contains errors"
);
fuota_dev.error_msg= format!("Error: McClassBSessionAns response dr_error: {}, freq_error: {}, mc_group_undefined: {}, start_missed: {}",
pl.status_and_mc_group_id.dr_error,
pl.status_and_mc_group_id.freq_error,
pl.status_and_mc_group_id.mc_group_undefined,
pl.status_and_mc_group_id.start_missed,
);
} else {
info!(
time_to_start = pl.time_to_start.unwrap_or_default(),
"McClassBSessionAns OK"
);
fuota_dev.mc_session_completed_at = Some(Utc::now());
}
let _ = fuota::update_device(fuota_dev).await?;
Ok(())
}
async fn handle_v1_mc_class_c_session_ans(
dev: &device::Device,
pl: multicastsetup::v1::McClassCSessionAnsPayload,
@ -137,3 +221,43 @@ async fn handle_v1_mc_class_c_session_ans(
Ok(())
}
async fn handle_v2_mc_class_c_session_ans(
dev: &device::Device,
pl: multicastsetup::v2::McClassCSessionAnsPayload,
) -> Result<()> {
info!("Handling McClassCSessionAns");
let mut fuota_dev = fuota::get_latest_device_by_dev_eui(dev.dev_eui).await?;
if pl.status_and_mc_group_id.dr_error
| pl.status_and_mc_group_id.freq_error
| pl.status_and_mc_group_id.mc_group_undefined
| pl.status_and_mc_group_id.start_missed
{
warn!(
dr_error = pl.status_and_mc_group_id.dr_error,
freq_error = pl.status_and_mc_group_id.freq_error,
mc_group_undefined = pl.status_and_mc_group_id.mc_group_undefined,
start_missed = pl.status_and_mc_group_id.start_missed,
"McClassCSessionAns contains errors"
);
fuota_dev.error_msg = format!("Error: McClassCSessionAns response dr_error: {}, freq_error: {}, mc_group_undefined: {}, start_missed: {}",
pl.status_and_mc_group_id.dr_error,
pl.status_and_mc_group_id.freq_error,
pl.status_and_mc_group_id.mc_group_undefined,
pl.status_and_mc_group_id.start_missed,
);
} else {
info!(
time_to_start = pl.time_to_start.unwrap_or_default(),
"McClassCSessionAns OK"
);
fuota_dev.mc_session_completed_at = Some(Utc::now());
}
let _ = fuota::update_device(fuota_dev).await?;
Ok(())
}

View File

@ -314,16 +314,19 @@ impl serialize::ToSql<Text, Sqlite> for AppLayerParams {
pub enum Ts003Version {
#[default]
V100,
V200,
}
#[derive(Default, Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
pub enum Ts004Version {
#[default]
V100,
V200,
}
#[derive(Default, Debug, PartialEq, Eq, Clone, Copy, Serialize, Deserialize)]
pub enum Ts005Version {
#[default]
V100,
V200,
}

View File

@ -36,6 +36,8 @@ pub struct FuotaDeployment {
pub multicast_class_b_ping_slot_nb_k: i16,
pub multicast_frequency: i64,
pub multicast_timeout: i16,
pub multicast_session_start: Option<DateTime<Utc>>,
pub multicast_session_end: Option<DateTime<Utc>>,
pub unicast_max_retry_count: i16,
pub fragmentation_fragment_size: i16,
pub fragmentation_redundancy_percentage: i16,
@ -69,6 +71,8 @@ impl Default for FuotaDeployment {
multicast_class_b_ping_slot_nb_k: 0,
multicast_frequency: 0,
multicast_timeout: 0,
multicast_session_start: None,
multicast_session_end: None,
unicast_max_retry_count: 0,
fragmentation_fragment_size: 0,
fragmentation_redundancy_percentage: 0,
@ -154,6 +158,7 @@ pub struct FuotaDeploymentJob {
pub max_retry_count: i16,
pub attempt_count: i16,
pub scheduler_run_after: DateTime<Utc>,
pub warning_msg: String,
pub error_msg: String,
}
@ -169,6 +174,7 @@ impl Default for FuotaDeploymentJob {
max_retry_count: 0,
attempt_count: 0,
scheduler_run_after: now,
warning_msg: "".into(),
error_msg: "".into(),
}
}
@ -220,6 +226,8 @@ pub async fn update_deployment(d: FuotaDeployment) -> Result<FuotaDeployment, Er
.eq(&d.multicast_class_b_ping_slot_nb_k),
fuota_deployment::multicast_frequency.eq(&d.multicast_frequency),
fuota_deployment::multicast_timeout.eq(&d.multicast_timeout),
fuota_deployment::multicast_session_start.eq(&d.multicast_session_start),
fuota_deployment::multicast_session_end.eq(&d.multicast_session_end),
fuota_deployment::unicast_max_retry_count.eq(&d.unicast_max_retry_count),
fuota_deployment::fragmentation_fragment_size.eq(&d.fragmentation_fragment_size),
fuota_deployment::fragmentation_redundancy_percentage
@ -441,7 +449,7 @@ pub async fn set_device_timeout_error(
let mut q = diesel::update(fuota_deployment_device::table)
.set(fuota_deployment_device::dsl::error_msg.eq(&error_msg))
.filter(fuota_deployment_device::dsl::fuota_deployment_id.eq(&fuota_deployment_id))
.filter(fuota_deployment_device::dsl::error_msg.is_not_null())
.filter(fuota_deployment_device::dsl::error_msg.eq(""))
.into_boxed();
if mc_group_setup_timeout {
@ -642,6 +650,7 @@ pub async fn update_job(j: FuotaDeploymentJob) -> Result<FuotaDeploymentJob, Err
fuota_deployment_job::completed_at.eq(&j.completed_at),
fuota_deployment_job::attempt_count.eq(&j.attempt_count),
fuota_deployment_job::scheduler_run_after.eq(&j.scheduler_run_after),
fuota_deployment_job::warning_msg.eq(&j.warning_msg),
fuota_deployment_job::error_msg.eq(&j.error_msg),
))
.get_result(&mut get_async_db_conn().await?)

View File

@ -203,6 +203,8 @@ diesel::table! {
multicast_class_b_ping_slot_nb_k -> Int2,
multicast_frequency -> Int8,
multicast_timeout -> Int2,
multicast_session_start -> Nullable<Timestamptz>,
multicast_session_end -> Nullable<Timestamptz>,
unicast_max_retry_count -> Int2,
fragmentation_fragment_size -> Int2,
fragmentation_redundancy_percentage -> Int2,
@ -249,6 +251,7 @@ diesel::table! {
max_retry_count -> Int2,
attempt_count -> Int2,
scheduler_run_after -> Timestamptz,
warning_msg -> Text,
error_msg -> Text,
}
}

View File

@ -180,6 +180,8 @@ diesel::table! {
multicast_class_b_ping_slot_nb_k -> SmallInt,
multicast_frequency -> BigInt,
multicast_timeout -> SmallInt,
multicast_session_start -> Nullable<TimestamptzSqlite>,
multicast_session_end -> Nullable<TimestamptzSqlite>,
unicast_max_retry_count -> SmallInt,
fragmentation_fragment_size -> SmallInt,
fragmentation_redundancy_percentage -> SmallInt,
@ -224,6 +226,7 @@ diesel::table! {
max_retry_count -> SmallInt,
attempt_count -> SmallInt,
scheduler_run_after -> TimestamptzSqlite,
warning_msg -> Text,
error_msg -> Text,
}
}

View File

@ -1 +1,2 @@
pub mod v1;
pub mod v2;

View File

@ -0,0 +1,557 @@
use anyhow::Result;
use crate::applayer::PayloadCodec;
pub enum Cid {
PackageVersionReq,
PackageVersionAns,
AppTimeReq,
AppTimeAns,
DeviceAppTimePeropdicityReq,
DeviceAppTimePeropdicityAns,
ForceDeviceResyncCmd,
}
impl Cid {
pub fn from_u8(uplink: bool, value: u8) -> Result<Cid> {
Ok(match uplink {
true => match value {
0x00 => Cid::PackageVersionAns,
0x01 => Cid::AppTimeReq,
0x02 => Cid::DeviceAppTimePeropdicityAns,
_ => return Err(anyhow!("Invalid CID: {}", value)),
},
false => match value {
0x00 => Cid::PackageVersionReq,
0x01 => Cid::AppTimeAns,
0x02 => Cid::DeviceAppTimePeropdicityReq,
0x03 => Cid::ForceDeviceResyncCmd,
_ => return Err(anyhow!("Invalid CID: {}", value)),
},
})
}
pub fn to_u8(&self) -> u8 {
match self {
Cid::PackageVersionReq | Cid::PackageVersionAns => 0x00,
Cid::AppTimeReq | Cid::AppTimeAns => 0x01,
Cid::DeviceAppTimePeropdicityReq | Cid::DeviceAppTimePeropdicityAns => 0x02,
Cid::ForceDeviceResyncCmd => 0x03,
}
}
}
#[derive(Debug, PartialEq)]
pub enum Payload {
PackageVersionReq,
PackageVersionAns(PackageVersionAnsPayload),
AppTimeReq(AppTimeReqPayload),
AppTimeAns(AppTimeAnsPayload),
DeviceAppTimePeropdicityReq(DeviceAppTimePeriodicityReqPayload),
DeviceAppTimePeropdicityAns(DeviceAppTimePeriodicityAnsPayload),
ForceDeviceResyncCmd(ForceDeviceResyncCmdPayload),
}
impl Payload {
pub fn cid(&self) -> Cid {
match self {
Self::PackageVersionReq => Cid::PackageVersionReq,
Self::PackageVersionAns(_) => Cid::PackageVersionAns,
Self::AppTimeReq(_) => Cid::AppTimeReq,
Self::AppTimeAns(_) => Cid::AppTimeAns,
Self::DeviceAppTimePeropdicityReq(_) => Cid::DeviceAppTimePeropdicityReq,
Self::DeviceAppTimePeropdicityAns(_) => Cid::DeviceAppTimePeropdicityAns,
Self::ForceDeviceResyncCmd(_) => Cid::ForceDeviceResyncCmd,
}
}
pub fn from_slice(uplink: bool, b: &[u8]) -> Result<Self> {
if b.is_empty() {
return Err(anyhow!("At least one byte is expected"));
}
let cid = Cid::from_u8(uplink, b[0])?;
Ok(match cid {
Cid::PackageVersionReq => Payload::PackageVersionReq,
Cid::PackageVersionAns => {
Payload::PackageVersionAns(PackageVersionAnsPayload::decode(&b[1..])?)
}
Cid::AppTimeReq => Payload::AppTimeReq(AppTimeReqPayload::decode(&b[1..])?),
Cid::AppTimeAns => Payload::AppTimeAns(AppTimeAnsPayload::decode(&b[1..])?),
Cid::DeviceAppTimePeropdicityReq => Payload::DeviceAppTimePeropdicityReq(
DeviceAppTimePeriodicityReqPayload::decode(&b[1..])?,
),
Cid::DeviceAppTimePeropdicityAns => Payload::DeviceAppTimePeropdicityAns(
DeviceAppTimePeriodicityAnsPayload::decode(&b[1..])?,
),
Cid::ForceDeviceResyncCmd => {
Payload::ForceDeviceResyncCmd(ForceDeviceResyncCmdPayload::decode(&b[1..])?)
}
})
}
pub fn to_vec(&self) -> Result<Vec<u8>> {
let mut out = vec![self.cid().to_u8()];
match self {
Self::PackageVersionReq => {}
Self::PackageVersionAns(pl) => out.extend_from_slice(&pl.encode()?),
Self::AppTimeReq(pl) => out.extend_from_slice(&pl.encode()?),
Self::AppTimeAns(pl) => out.extend_from_slice(&pl.encode()?),
Self::DeviceAppTimePeropdicityReq(pl) => out.extend_from_slice(&pl.encode()?),
Self::DeviceAppTimePeropdicityAns(pl) => out.extend_from_slice(&pl.encode()?),
Self::ForceDeviceResyncCmd(pl) => out.extend_from_slice(&pl.encode()?),
};
Ok(out)
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct PackageVersionAnsPayload {
pub package_identifier: u8,
pub package_version: u8,
}
impl PayloadCodec for PackageVersionAnsPayload {
fn decode(b: &[u8]) -> Result<Self> {
if b.len() != 2 {
return Err(anyhow!("Expected 2 bytes"));
}
Ok(PackageVersionAnsPayload {
package_identifier: b[0],
package_version: b[1],
})
}
fn encode(&self) -> Result<Vec<u8>> {
Ok(vec![self.package_identifier, self.package_version])
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct AppTimeReqPayload {
pub device_time: u32,
pub param: AppTimeReqPayloadParam,
}
impl PayloadCodec for AppTimeReqPayload {
fn decode(b: &[u8]) -> Result<Self> {
if b.len() != 5 {
return Err(anyhow!("Expected 5 bytes"));
}
Ok(AppTimeReqPayload {
device_time: {
let mut bytes = [0; 4];
bytes.copy_from_slice(&b[0..4]);
u32::from_le_bytes(bytes)
},
param: AppTimeReqPayloadParam {
token_req: b[4] & 0x0f,
ans_required: b[4] & 0x10 != 0,
},
})
}
fn encode(&self) -> Result<Vec<u8>> {
if self.param.token_req > 15 {
return Err(anyhow!("Max token_req value is 15"));
}
let mut b = vec![0; 5];
b[0..4].copy_from_slice(&self.device_time.to_le_bytes());
b[4] = self.param.token_req;
if self.param.ans_required {
b[4] |= 0x10;
}
Ok(b)
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct AppTimeReqPayloadParam {
pub token_req: u8,
pub ans_required: bool,
}
#[derive(Debug, PartialEq, Clone)]
pub struct AppTimeAnsPayload {
pub time_correction: i32,
pub param: AppTimeAnsPayloadParam,
}
impl PayloadCodec for AppTimeAnsPayload {
fn decode(b: &[u8]) -> Result<Self> {
if b.len() != 5 {
return Err(anyhow!("Expected 5 bytes"));
}
Ok(AppTimeAnsPayload {
time_correction: {
let mut bytes = [0; 4];
bytes.copy_from_slice(&b[0..4]);
i32::from_le_bytes(bytes)
},
param: AppTimeAnsPayloadParam {
token_ans: b[4] & 0x0f,
},
})
}
fn encode(&self) -> Result<Vec<u8>> {
if self.param.token_ans > 15 {
return Err(anyhow!("Max token_ans value is 15"));
}
let mut b = vec![0; 5];
b[0..4].copy_from_slice(&self.time_correction.to_le_bytes());
b[4] = self.param.token_ans;
Ok(b)
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct AppTimeAnsPayloadParam {
pub token_ans: u8,
}
#[derive(Debug, PartialEq, Clone)]
pub struct DeviceAppTimePeriodicityReqPayload {
pub period: u8,
}
impl PayloadCodec for DeviceAppTimePeriodicityReqPayload {
fn decode(b: &[u8]) -> Result<Self> {
if b.len() != 1 {
return Err(anyhow!("Expected 1 byte"));
}
Ok(DeviceAppTimePeriodicityReqPayload {
period: b[0] & 0x0f,
})
}
fn encode(&self) -> Result<Vec<u8>> {
if self.period > 15 {
return Err(anyhow!("Max period value is 15"));
}
Ok(vec![self.period])
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct DeviceAppTimePeriodicityAnsPayload {
pub status: DeviceAppTimePeriodicityAnsPayloadStatus,
pub time: u32,
}
impl PayloadCodec for DeviceAppTimePeriodicityAnsPayload {
fn decode(b: &[u8]) -> Result<Self> {
if b.len() != 5 {
return Err(anyhow!("Expected 5 bytes"));
}
Ok(DeviceAppTimePeriodicityAnsPayload {
status: DeviceAppTimePeriodicityAnsPayloadStatus {
not_supported: b[0] & 0x01 != 0,
},
time: {
let mut bytes = [0; 4];
bytes.copy_from_slice(&b[1..5]);
u32::from_le_bytes(bytes)
},
})
}
fn encode(&self) -> Result<Vec<u8>> {
let mut b = vec![0; 5];
if self.status.not_supported {
b[0] |= 0x01;
}
b[1..5].copy_from_slice(&self.time.to_le_bytes());
Ok(b)
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct DeviceAppTimePeriodicityAnsPayloadStatus {
pub not_supported: bool,
}
#[derive(Debug, PartialEq, Clone)]
pub struct ForceDeviceResyncCmdPayload {
pub force_conf: ForceDeviceResyncCmdPayloadForceConf,
}
impl PayloadCodec for ForceDeviceResyncCmdPayload {
fn decode(b: &[u8]) -> Result<Self> {
if b.len() != 1 {
return Err(anyhow!("Expected 1 byte"));
}
Ok(ForceDeviceResyncCmdPayload {
force_conf: ForceDeviceResyncCmdPayloadForceConf {
nb_transmissions: b[0] & 0x07,
},
})
}
fn encode(&self) -> Result<Vec<u8>> {
if self.force_conf.nb_transmissions > 7 {
return Err(anyhow!("Max nb_transmissions is 7"));
}
Ok(vec![self.force_conf.nb_transmissions])
}
}
#[derive(Debug, PartialEq, Clone)]
pub struct ForceDeviceResyncCmdPayloadForceConf {
pub nb_transmissions: u8,
}
#[cfg(test)]
mod test {
use super::*;
struct CommandTest {
name: String,
uplink: bool,
command: Payload,
bytes: Vec<u8>,
expected_error: Option<String>,
}
#[test]
fn test_package_version_req() {
let encode_tests = [CommandTest {
name: "encode PackageVersionReq".into(),
uplink: false,
command: Payload::PackageVersionReq,
bytes: vec![0x00],
expected_error: None,
}];
let decode_tests = [CommandTest {
name: "decode PackageVersionReq".into(),
uplink: false,
command: Payload::PackageVersionReq,
bytes: vec![0x00],
expected_error: None,
}];
run_tests_encode(&encode_tests);
run_tests_decode(&decode_tests);
}
#[test]
fn test_package_version_ans() {
let encode_tests = [CommandTest {
name: "encode PackageVersionAns".into(),
uplink: true,
command: Payload::PackageVersionAns(PackageVersionAnsPayload {
package_identifier: 1,
package_version: 1,
}),
bytes: vec![0x00, 0x01, 0x01],
expected_error: None,
}];
let decode_tests = [CommandTest {
name: "decode PackageVersionAns".into(),
uplink: true,
command: Payload::PackageVersionAns(PackageVersionAnsPayload {
package_identifier: 1,
package_version: 1,
}),
bytes: vec![0x00, 0x01, 0x01],
expected_error: None,
}];
run_tests_encode(&encode_tests);
run_tests_decode(&decode_tests);
}
#[test]
fn test_app_time_req() {
let encode_tests = [CommandTest {
name: "encode AppTimeReq".into(),
uplink: true,
command: Payload::AppTimeReq(AppTimeReqPayload {
device_time: 1024,
param: AppTimeReqPayloadParam {
token_req: 15,
ans_required: true,
},
}),
bytes: vec![0x01, 0x00, 0x04, 0x00, 0x00, 0x1f],
expected_error: None,
}];
let decode_tests = [CommandTest {
name: "decode AppTimeReq".into(),
uplink: true,
command: Payload::AppTimeReq(AppTimeReqPayload {
device_time: 1024,
param: AppTimeReqPayloadParam {
token_req: 15,
ans_required: true,
},
}),
bytes: vec![0x01, 0x00, 0x04, 0x00, 0x00, 0x1f],
expected_error: None,
}];
run_tests_encode(&encode_tests);
run_tests_decode(&decode_tests);
}
#[test]
fn test_app_time_ans() {
let encode_tests = [CommandTest {
name: "encode AppTimeAns".into(),
uplink: false,
command: Payload::AppTimeAns(AppTimeAnsPayload {
time_correction: 1024,
param: AppTimeAnsPayloadParam { token_ans: 15 },
}),
bytes: vec![0x01, 0x00, 0x04, 0x00, 0x00, 0x0f],
expected_error: None,
}];
let decode_tests = [CommandTest {
name: "decode AppTimeAns".into(),
uplink: false,
command: Payload::AppTimeAns(AppTimeAnsPayload {
time_correction: 1024,
param: AppTimeAnsPayloadParam { token_ans: 15 },
}),
bytes: vec![0x01, 0x00, 0x04, 0x00, 0x00, 0x0f],
expected_error: None,
}];
run_tests_encode(&encode_tests);
run_tests_decode(&decode_tests);
}
#[test]
fn test_device_app_time_periodicity_req() {
let encode_tests = [CommandTest {
name: "encode DeviceAppTimePeropdicityReq".into(),
uplink: false,
command: Payload::DeviceAppTimePeropdicityReq(DeviceAppTimePeriodicityReqPayload {
period: 15,
}),
bytes: vec![0x02, 0x0f],
expected_error: None,
}];
let decode_tests = [CommandTest {
name: "decode DeviceAppTimePeropdicityReq".into(),
uplink: false,
command: Payload::DeviceAppTimePeropdicityReq(DeviceAppTimePeriodicityReqPayload {
period: 15,
}),
bytes: vec![0x02, 0x0f],
expected_error: None,
}];
run_tests_encode(&encode_tests);
run_tests_decode(&decode_tests);
}
#[test]
fn test_device_app_time_periodicity_ans() {
let encode_tests = [CommandTest {
name: "encode DeviceAppTimePeropdicityAns".into(),
uplink: true,
command: Payload::DeviceAppTimePeropdicityAns(DeviceAppTimePeriodicityAnsPayload {
status: DeviceAppTimePeriodicityAnsPayloadStatus {
not_supported: true,
},
time: 1024,
}),
bytes: vec![0x02, 0x01, 0x00, 0x04, 0x00, 0x00],
expected_error: None,
}];
let decode_tests = [CommandTest {
name: "decode DeviceAppTimePeropdicityAns".into(),
uplink: true,
command: Payload::DeviceAppTimePeropdicityAns(DeviceAppTimePeriodicityAnsPayload {
status: DeviceAppTimePeriodicityAnsPayloadStatus {
not_supported: true,
},
time: 1024,
}),
bytes: vec![0x02, 0x01, 0x00, 0x04, 0x00, 0x00],
expected_error: None,
}];
run_tests_encode(&encode_tests);
run_tests_decode(&decode_tests);
}
#[test]
fn test_force_device_resync_req() {
let encode_tests = [CommandTest {
name: "encode ForceDeviceResyncCmd".into(),
uplink: false,
command: Payload::ForceDeviceResyncCmd(ForceDeviceResyncCmdPayload {
force_conf: ForceDeviceResyncCmdPayloadForceConf {
nb_transmissions: 7,
},
}),
bytes: vec![0x03, 0x07],
expected_error: None,
}];
let decode_tests = [CommandTest {
name: "decode ForceDeviceResyncCmd".into(),
uplink: false,
command: Payload::ForceDeviceResyncCmd(ForceDeviceResyncCmdPayload {
force_conf: ForceDeviceResyncCmdPayloadForceConf {
nb_transmissions: 7,
},
}),
bytes: vec![0x03, 0x07],
expected_error: None,
}];
run_tests_encode(&encode_tests);
run_tests_decode(&decode_tests);
}
fn run_tests_encode(tests: &[CommandTest]) {
for tst in tests {
println!("> {}", tst.name);
let resp = tst.command.to_vec();
if let Some(e) = &tst.expected_error {
assert!(resp.is_err());
assert_eq!(e, &resp.err().unwrap().to_string());
} else {
assert_eq!(tst.bytes, resp.unwrap());
}
}
}
fn run_tests_decode(tests: &[CommandTest]) {
for tst in tests {
println!("> {}", tst.name);
let resp = Payload::from_slice(tst.uplink, &tst.bytes);
if let Some(e) = &tst.expected_error {
assert!(resp.is_err());
assert_eq!(e, &resp.err().unwrap().to_string());
} else {
assert_eq!(tst.command, resp.unwrap());
}
}
}
}

View File

@ -1 +1,2 @@
pub mod v1;
pub mod v2;

File diff suppressed because it is too large Load Diff

View File

@ -1 +1,2 @@
pub mod v1;
pub mod v2;

View File

@ -1,6 +1,9 @@
use aes::cipher::BlockDecrypt;
use aes::cipher::{generic_array::GenericArray, BlockEncrypt, KeyInit};
use aes::{Aes128, Block};
#[cfg(feature = "crypto")]
use aes::{
cipher::BlockDecrypt,
cipher::{generic_array::GenericArray, BlockEncrypt, KeyInit},
Aes128, Block,
};
use anyhow::Result;
use crate::applayer::PayloadCodec;

File diff suppressed because it is too large Load Diff

View File

@ -1077,6 +1077,7 @@ function DeviceProfileForm(props: IProps) {
<Select disabled={props.disabled}>
<Select.Option value={Ts003Version.TS003_NOT_IMPLEMENTED}>Not implemented</Select.Option>
<Select.Option value={Ts003Version.TS003_V100}>v1.0.0</Select.Option>
<Select.Option value={Ts003Version.TS003_V200}>v2.0.0</Select.Option>
</Select>
</Form.Item>
</Col>
@ -1096,6 +1097,7 @@ function DeviceProfileForm(props: IProps) {
<Select disabled={props.disabled}>
<Select.Option value={Ts004Version.TS004_NOT_IMPLEMENTED}>Not implemented</Select.Option>
<Select.Option value={Ts004Version.TS004_V100}>v1.0.0</Select.Option>
<Select.Option value={Ts004Version.TS004_V200}>v2.0.0</Select.Option>
</Select>
</Form.Item>
</Col>
@ -1115,6 +1117,7 @@ function DeviceProfileForm(props: IProps) {
<Select disabled={props.disabled}>
<Select.Option value={Ts005Version.TS005_NOT_IMPLEMENTED}>Not implemented</Select.Option>
<Select.Option value={Ts005Version.TS005_V100}>v1.0.0</Select.Option>
<Select.Option value={Ts005Version.TS005_V200}>v2.0.0</Select.Option>
</Select>
</Form.Item>
</Col>

View File

@ -63,6 +63,12 @@ function FuotaDeploymentDashboard(props: IProps) {
);
} else if (!record.completedAt) {
return <Spin indicator={<LoadingOutlined spin />} size="small" />;
} else if (record.warningMsg !== "") {
return (
<Popover content={record.warningMsg} placement="right">
<Tag color="orange">warning</Tag>
</Popover>
);
} else {
return <Tag color="green">ok</Tag>;
}