mirror of
https://github.com/chirpstack/chirpstack.git
synced 2025-07-11 23:48:23 +00:00
Compare commits
1 Commits
api/go/v4.
...
refactor_m
Author | SHA1 | Date | |
---|---|---|---|
bb53821aef |
95
api/proto/gw/gw.proto
vendored
95
api/proto/gw/gw.proto
vendored
@ -104,6 +104,61 @@ enum TxAckStatus {
|
||||
DUTY_CYCLE_OVERFLOW = 11;
|
||||
}
|
||||
|
||||
// Gateway events as reported by the ChirpStack Concentratord ZMQ interface.
|
||||
message Event {
|
||||
oneof event {
|
||||
// Uplink frame.
|
||||
UplinkFrame uplink_frame = 1;
|
||||
|
||||
// Gateway stats.
|
||||
GatewayStats gateway_stats = 2;
|
||||
|
||||
// Gateway Mesh Event.
|
||||
Mesh mesh = 3;
|
||||
}
|
||||
}
|
||||
|
||||
// Commands that can be sent to the ChirpStack Concentratord ZMQ interface.
|
||||
message Command {
|
||||
oneof command {
|
||||
// Downlink frame.
|
||||
DownlinkFrame send_downlink_frame = 1;
|
||||
|
||||
// Gateway configuration.
|
||||
GatewayConfiguration set_gateway_configuration = 2;
|
||||
|
||||
// Get Gateway ID.
|
||||
GetGatewayIdRequest get_gateway_id = 3;
|
||||
|
||||
// Get location.
|
||||
GetLocationRequest get_location = 4;
|
||||
}
|
||||
}
|
||||
|
||||
message Mesh {
|
||||
// Gateway ID (of the Border Gateway).
|
||||
string gateway_id = 1;
|
||||
|
||||
// Relay ID.
|
||||
string relay_id = 2;
|
||||
|
||||
// Timestamp (second precision).
|
||||
google.protobuf.Timestamp time = 3;
|
||||
|
||||
// Mesh events.
|
||||
repeated MeshEvent events = 4;
|
||||
}
|
||||
|
||||
message MeshEvent {
|
||||
oneof event {
|
||||
// Proprietary Mesh event.
|
||||
MeshEventProprietary proprietary = 1;
|
||||
|
||||
// Mesh heartbeat.
|
||||
MeshEventHeartbeat heartbeat = 2;
|
||||
}
|
||||
}
|
||||
|
||||
message Modulation {
|
||||
oneof parameters {
|
||||
// LoRa modulation information.
|
||||
@ -611,6 +666,23 @@ message GatewayConfiguration {
|
||||
google.protobuf.Duration stats_interval = 4;
|
||||
}
|
||||
|
||||
message GetGatewayIdRequest {}
|
||||
|
||||
message GetGatewayIdResponse {
|
||||
// Gateway ID.
|
||||
string gateway_id = 1;
|
||||
}
|
||||
|
||||
message GetLocationRequest {}
|
||||
|
||||
message GetLocationResponse {
|
||||
// Location.
|
||||
common.Location location = 1;
|
||||
|
||||
// Last updated at.
|
||||
google.protobuf.Timestamp updated_at = 2;
|
||||
}
|
||||
|
||||
message ChannelConfiguration {
|
||||
// Frequency (Hz).
|
||||
uint32 frequency = 1;
|
||||
@ -751,21 +823,13 @@ message ConnState {
|
||||
}
|
||||
|
||||
// Gateway Mesh heartbeat (sent periodically by the Relay Gateways).
|
||||
message MeshHeartbeat {
|
||||
// Gateway ID (of the Border Gateway).
|
||||
string gateway_id = 1;
|
||||
|
||||
// Relay ID.
|
||||
string relay_id = 2;
|
||||
|
||||
// Timestamp (second precision).
|
||||
google.protobuf.Timestamp time = 3;
|
||||
message MeshEventHeartbeat {
|
||||
|
||||
// Relay path.
|
||||
repeated MeshHeartbeatRelayPath relay_path = 4;
|
||||
repeated MeshEventHeartbeatRelayPath relay_path = 4;
|
||||
}
|
||||
|
||||
message MeshHeartbeatRelayPath {
|
||||
message MeshEventHeartbeatRelayPath {
|
||||
// Relay ID.
|
||||
string relay_id = 1;
|
||||
|
||||
@ -775,3 +839,12 @@ message MeshHeartbeatRelayPath {
|
||||
// SNR.
|
||||
int32 snr = 3;
|
||||
}
|
||||
|
||||
// Proprietary mesh event.
|
||||
message MeshEventProprietary {
|
||||
// Event type.
|
||||
uint32 event_type = 1;
|
||||
|
||||
// Payload.
|
||||
bytes payload = 2;
|
||||
}
|
||||
|
95
api/rust/proto/chirpstack/gw/gw.proto
vendored
95
api/rust/proto/chirpstack/gw/gw.proto
vendored
@ -104,6 +104,61 @@ enum TxAckStatus {
|
||||
DUTY_CYCLE_OVERFLOW = 11;
|
||||
}
|
||||
|
||||
// Gateway events as reported by the ChirpStack Concentratord ZMQ interface.
|
||||
message Event {
|
||||
oneof event {
|
||||
// Uplink frame.
|
||||
UplinkFrame uplink_frame = 1;
|
||||
|
||||
// Gateway stats.
|
||||
GatewayStats gateway_stats = 2;
|
||||
|
||||
// Gateway Mesh Event.
|
||||
Mesh mesh = 3;
|
||||
}
|
||||
}
|
||||
|
||||
// Commands that can be sent to the ChirpStack Concentratord ZMQ interface.
|
||||
message Command {
|
||||
oneof command {
|
||||
// Downlink frame.
|
||||
DownlinkFrame send_downlink_frame = 1;
|
||||
|
||||
// Gateway configuration.
|
||||
GatewayConfiguration set_gateway_configuration = 2;
|
||||
|
||||
// Get Gateway ID.
|
||||
GetGatewayIdRequest get_gateway_id = 3;
|
||||
|
||||
// Get location.
|
||||
GetLocationRequest get_location = 4;
|
||||
}
|
||||
}
|
||||
|
||||
message Mesh {
|
||||
// Gateway ID (of the Border Gateway).
|
||||
string gateway_id = 1;
|
||||
|
||||
// Relay ID.
|
||||
string relay_id = 2;
|
||||
|
||||
// Timestamp (second precision).
|
||||
google.protobuf.Timestamp time = 3;
|
||||
|
||||
// Mesh events.
|
||||
repeated MeshEvent events = 4;
|
||||
}
|
||||
|
||||
message MeshEvent {
|
||||
oneof event {
|
||||
// Proprietary Mesh event.
|
||||
MeshEventProprietary proprietary = 1;
|
||||
|
||||
// Mesh heartbeat.
|
||||
MeshEventHeartbeat heartbeat = 2;
|
||||
}
|
||||
}
|
||||
|
||||
message Modulation {
|
||||
oneof parameters {
|
||||
// LoRa modulation information.
|
||||
@ -611,6 +666,23 @@ message GatewayConfiguration {
|
||||
google.protobuf.Duration stats_interval = 4;
|
||||
}
|
||||
|
||||
message GetGatewayIdRequest {}
|
||||
|
||||
message GetGatewayIdResponse {
|
||||
// Gateway ID.
|
||||
string gateway_id = 1;
|
||||
}
|
||||
|
||||
message GetLocationRequest {}
|
||||
|
||||
message GetLocationResponse {
|
||||
// Location.
|
||||
common.Location location = 1;
|
||||
|
||||
// Last updated at.
|
||||
google.protobuf.Timestamp updated_at = 2;
|
||||
}
|
||||
|
||||
message ChannelConfiguration {
|
||||
// Frequency (Hz).
|
||||
uint32 frequency = 1;
|
||||
@ -751,21 +823,13 @@ message ConnState {
|
||||
}
|
||||
|
||||
// Gateway Mesh heartbeat (sent periodically by the Relay Gateways).
|
||||
message MeshHeartbeat {
|
||||
// Gateway ID (of the Border Gateway).
|
||||
string gateway_id = 1;
|
||||
|
||||
// Relay ID.
|
||||
string relay_id = 2;
|
||||
|
||||
// Timestamp (second precision).
|
||||
google.protobuf.Timestamp time = 3;
|
||||
message MeshEventHeartbeat {
|
||||
|
||||
// Relay path.
|
||||
repeated MeshHeartbeatRelayPath relay_path = 4;
|
||||
repeated MeshEventHeartbeatRelayPath relay_path = 4;
|
||||
}
|
||||
|
||||
message MeshHeartbeatRelayPath {
|
||||
message MeshEventHeartbeatRelayPath {
|
||||
// Relay ID.
|
||||
string relay_id = 1;
|
||||
|
||||
@ -775,3 +839,12 @@ message MeshHeartbeatRelayPath {
|
||||
// SNR.
|
||||
int32 snr = 3;
|
||||
}
|
||||
|
||||
// Proprietary mesh event.
|
||||
message MeshEventProprietary {
|
||||
// Event type.
|
||||
uint32 event_type = 1;
|
||||
|
||||
// Payload.
|
||||
bytes payload = 2;
|
||||
}
|
||||
|
9
api/rust/src/lib.rs
vendored
9
api/rust/src/lib.rs
vendored
@ -1,14 +1,17 @@
|
||||
pub use prost;
|
||||
pub use prost_types;
|
||||
|
||||
#[cfg(feature = "json")]
|
||||
pub use pbjson_types;
|
||||
pub use prost;
|
||||
#[cfg(feature = "api")]
|
||||
pub use tonic;
|
||||
|
||||
#[cfg(feature = "api")]
|
||||
pub mod api;
|
||||
#[cfg(feature = "internal")]
|
||||
pub mod internal;
|
||||
|
||||
pub mod common;
|
||||
pub mod gw;
|
||||
pub mod integration;
|
||||
#[cfg(feature = "internal")]
|
||||
pub mod internal;
|
||||
pub mod stream;
|
||||
|
@ -1,5 +1,4 @@
|
||||
use std::collections::HashMap;
|
||||
use std::io::Cursor;
|
||||
use std::sync::{LazyLock, RwLock};
|
||||
use std::time::Duration;
|
||||
|
||||
@ -352,7 +351,7 @@ async fn message_callback(
|
||||
.inc();
|
||||
let mut event = match json {
|
||||
true => serde_json::from_slice(&p.payload)?,
|
||||
false => chirpstack_api::gw::UplinkFrame::decode(&mut Cursor::new(&p.payload))?,
|
||||
false => chirpstack_api::gw::UplinkFrame::decode(p.payload.as_ref())?,
|
||||
};
|
||||
|
||||
if v4_migrate {
|
||||
@ -377,7 +376,7 @@ async fn message_callback(
|
||||
.inc();
|
||||
let mut event = match json {
|
||||
true => serde_json::from_slice(&p.payload)?,
|
||||
false => chirpstack_api::gw::GatewayStats::decode(&mut Cursor::new(&p.payload))?,
|
||||
false => chirpstack_api::gw::GatewayStats::decode(p.payload.as_ref())?,
|
||||
};
|
||||
|
||||
if v4_migrate {
|
||||
@ -401,7 +400,7 @@ async fn message_callback(
|
||||
.inc();
|
||||
let mut event = match json {
|
||||
true => serde_json::from_slice(&p.payload)?,
|
||||
false => chirpstack_api::gw::DownlinkTxAck::decode(&mut Cursor::new(&p.payload))?,
|
||||
false => chirpstack_api::gw::DownlinkTxAck::decode(p.payload.as_ref())?,
|
||||
};
|
||||
|
||||
if v4_migrate {
|
||||
@ -410,18 +409,18 @@ async fn message_callback(
|
||||
|
||||
set_gateway_json(&event.gateway_id, json);
|
||||
tokio::spawn(downlink::tx_ack::TxAck::handle(event));
|
||||
} else if topic.ends_with("/mesh-heartbeat") {
|
||||
} else if topic.ends_with("/mesh") {
|
||||
EVENT_COUNTER
|
||||
.get_or_create(&EventLabels {
|
||||
event: "mesh-heartbeat".to_string(),
|
||||
event: "mesh".to_string(),
|
||||
})
|
||||
.inc();
|
||||
let event = match json {
|
||||
true => serde_json::from_slice(&p.payload)?,
|
||||
false => chirpstack_api::gw::MeshHeartbeat::decode(&mut Cursor::new(&p.payload))?,
|
||||
false => chirpstack_api::gw::Mesh::decode(p.payload.as_ref())?,
|
||||
};
|
||||
|
||||
tokio::spawn(uplink::mesh::MeshHeartbeat::handle(event));
|
||||
tokio::spawn(uplink::mesh::Mesh::handle(event));
|
||||
} else {
|
||||
return Err(anyhow!("Unknown event type"));
|
||||
}
|
||||
|
@ -14,14 +14,15 @@ use crate::storage::{
|
||||
};
|
||||
use lrwn::EUI64;
|
||||
|
||||
pub struct MeshHeartbeat {
|
||||
pub struct Mesh {
|
||||
gateway_id: EUI64,
|
||||
relay_id: RelayId,
|
||||
mesh_stats: gw::MeshHeartbeat,
|
||||
time: DateTime<Utc>,
|
||||
mesh_event: gw::Mesh,
|
||||
}
|
||||
|
||||
impl MeshHeartbeat {
|
||||
pub async fn handle(s: gw::MeshHeartbeat) {
|
||||
impl Mesh {
|
||||
pub async fn handle(s: gw::Mesh) {
|
||||
let gateway_id = match EUI64::from_str(&s.gateway_id) {
|
||||
Ok(v) => v,
|
||||
Err(e) => {
|
||||
@ -38,9 +39,9 @@ impl MeshHeartbeat {
|
||||
}
|
||||
};
|
||||
|
||||
let span = span!(Level::INFO, "mesh_stats", gateway_id = %gateway_id, relay_id = %relay_id);
|
||||
let span = span!(Level::INFO, "mesh", gateway_id = %gateway_id, relay_id = %relay_id);
|
||||
|
||||
if let Err(e) = MeshHeartbeat::_handle(gateway_id, relay_id, s)
|
||||
if let Err(e) = Mesh::_handle(gateway_id, relay_id, s)
|
||||
.instrument(span)
|
||||
.await
|
||||
{
|
||||
@ -48,52 +49,61 @@ impl MeshHeartbeat {
|
||||
Some(Error::NotFound(_)) => {
|
||||
let conf = config::get();
|
||||
if !conf.gateway.allow_unknown_gateways {
|
||||
error!(error = %e.full(), "Handle mesh-stats error");
|
||||
error!(error = %e.full(), "Handle mesh error");
|
||||
}
|
||||
}
|
||||
Some(_) | None => {
|
||||
error!(error = %e.full(), "Handle mesh-stats error");
|
||||
error!(error = %e.full(), "Handle mesh error");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn _handle(gateway_id: EUI64, relay_id: RelayId, s: gw::MeshHeartbeat) -> Result<()> {
|
||||
let mut ctx = MeshHeartbeat {
|
||||
async fn _handle(gateway_id: EUI64, relay_id: RelayId, s: gw::Mesh) -> Result<()> {
|
||||
let ctx = Mesh {
|
||||
gateway_id,
|
||||
relay_id,
|
||||
mesh_stats: s,
|
||||
time: s
|
||||
.time
|
||||
.ok_or_else(|| anyhow!("Time field is empty"))?
|
||||
.try_into()
|
||||
.map_err(|e| anyhow!("Covert time error: {}", e))?,
|
||||
mesh_event: s,
|
||||
};
|
||||
|
||||
ctx.update_or_create_relay_gateway().await?;
|
||||
ctx.handle_events().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn update_or_create_relay_gateway(&mut self) -> Result<()> {
|
||||
trace!("Getting Border Gateway");
|
||||
let border_gw = gateway::get(&self.gateway_id).await?;
|
||||
async fn handle_events(&self) -> Result<()> {
|
||||
trace!("Handling mesh events");
|
||||
|
||||
let ts: DateTime<Utc> = match &self.mesh_stats.time {
|
||||
Some(v) => (*v)
|
||||
.try_into()
|
||||
.map_err(|e| anyhow!("Convert time error: {}", e))?,
|
||||
None => {
|
||||
warn!("Stats message does not have time field set");
|
||||
return Ok(());
|
||||
for event in &self.mesh_event.events {
|
||||
match &event.event {
|
||||
Some(gw::mesh_event::Event::Proprietary(_)) | None => continue,
|
||||
Some(gw::mesh_event::Event::Heartbeat(v)) => self._handle_heartbeat(v).await?,
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn _handle_heartbeat(&self, _pl: &gw::MeshEventHeartbeat) -> Result<()> {
|
||||
trace!("Handling heartbeat event");
|
||||
|
||||
let border_gw = gateway::get(&self.gateway_id).await?;
|
||||
|
||||
match gateway::get_relay_gateway(border_gw.tenant_id.into(), self.relay_id).await {
|
||||
Ok(mut v) => {
|
||||
if let Some(last_seen_at) = v.last_seen_at {
|
||||
if last_seen_at > ts {
|
||||
warn!("Time is less than last seen timestamp, ignoring stats");
|
||||
if last_seen_at > self.time {
|
||||
warn!("Time is less than last seen timestamp, ignoring heartbeat");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
v.last_seen_at = Some(ts);
|
||||
v.last_seen_at = Some(self.time);
|
||||
v.region_config_id = border_gw
|
||||
.properties
|
||||
.get("region_config_id")
|
||||
@ -106,7 +116,7 @@ impl MeshHeartbeat {
|
||||
tenant_id: border_gw.tenant_id,
|
||||
relay_id: self.relay_id,
|
||||
name: self.relay_id.to_string(),
|
||||
last_seen_at: Some(ts),
|
||||
last_seen_at: Some(self.time),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
|
Reference in New Issue
Block a user