mirror of
https://github.com/chirpstack/chirpstack.git
synced 2025-07-13 21:39:43 +00:00
Compare commits
1 Commits
api/go/v4.
...
refactor_m
Author | SHA1 | Date | |
---|---|---|---|
bb53821aef |
95
api/proto/gw/gw.proto
vendored
95
api/proto/gw/gw.proto
vendored
@ -104,6 +104,61 @@ enum TxAckStatus {
|
|||||||
DUTY_CYCLE_OVERFLOW = 11;
|
DUTY_CYCLE_OVERFLOW = 11;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Gateway events as reported by the ChirpStack Concentratord ZMQ interface.
|
||||||
|
message Event {
|
||||||
|
oneof event {
|
||||||
|
// Uplink frame.
|
||||||
|
UplinkFrame uplink_frame = 1;
|
||||||
|
|
||||||
|
// Gateway stats.
|
||||||
|
GatewayStats gateway_stats = 2;
|
||||||
|
|
||||||
|
// Gateway Mesh Event.
|
||||||
|
Mesh mesh = 3;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commands that can be sent to the ChirpStack Concentratord ZMQ interface.
|
||||||
|
message Command {
|
||||||
|
oneof command {
|
||||||
|
// Downlink frame.
|
||||||
|
DownlinkFrame send_downlink_frame = 1;
|
||||||
|
|
||||||
|
// Gateway configuration.
|
||||||
|
GatewayConfiguration set_gateway_configuration = 2;
|
||||||
|
|
||||||
|
// Get Gateway ID.
|
||||||
|
GetGatewayIdRequest get_gateway_id = 3;
|
||||||
|
|
||||||
|
// Get location.
|
||||||
|
GetLocationRequest get_location = 4;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message Mesh {
|
||||||
|
// Gateway ID (of the Border Gateway).
|
||||||
|
string gateway_id = 1;
|
||||||
|
|
||||||
|
// Relay ID.
|
||||||
|
string relay_id = 2;
|
||||||
|
|
||||||
|
// Timestamp (second precision).
|
||||||
|
google.protobuf.Timestamp time = 3;
|
||||||
|
|
||||||
|
// Mesh events.
|
||||||
|
repeated MeshEvent events = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message MeshEvent {
|
||||||
|
oneof event {
|
||||||
|
// Proprietary Mesh event.
|
||||||
|
MeshEventProprietary proprietary = 1;
|
||||||
|
|
||||||
|
// Mesh heartbeat.
|
||||||
|
MeshEventHeartbeat heartbeat = 2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
message Modulation {
|
message Modulation {
|
||||||
oneof parameters {
|
oneof parameters {
|
||||||
// LoRa modulation information.
|
// LoRa modulation information.
|
||||||
@ -611,6 +666,23 @@ message GatewayConfiguration {
|
|||||||
google.protobuf.Duration stats_interval = 4;
|
google.protobuf.Duration stats_interval = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message GetGatewayIdRequest {}
|
||||||
|
|
||||||
|
message GetGatewayIdResponse {
|
||||||
|
// Gateway ID.
|
||||||
|
string gateway_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetLocationRequest {}
|
||||||
|
|
||||||
|
message GetLocationResponse {
|
||||||
|
// Location.
|
||||||
|
common.Location location = 1;
|
||||||
|
|
||||||
|
// Last updated at.
|
||||||
|
google.protobuf.Timestamp updated_at = 2;
|
||||||
|
}
|
||||||
|
|
||||||
message ChannelConfiguration {
|
message ChannelConfiguration {
|
||||||
// Frequency (Hz).
|
// Frequency (Hz).
|
||||||
uint32 frequency = 1;
|
uint32 frequency = 1;
|
||||||
@ -751,21 +823,13 @@ message ConnState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Gateway Mesh heartbeat (sent periodically by the Relay Gateways).
|
// Gateway Mesh heartbeat (sent periodically by the Relay Gateways).
|
||||||
message MeshHeartbeat {
|
message MeshEventHeartbeat {
|
||||||
// Gateway ID (of the Border Gateway).
|
|
||||||
string gateway_id = 1;
|
|
||||||
|
|
||||||
// Relay ID.
|
|
||||||
string relay_id = 2;
|
|
||||||
|
|
||||||
// Timestamp (second precision).
|
|
||||||
google.protobuf.Timestamp time = 3;
|
|
||||||
|
|
||||||
// Relay path.
|
// Relay path.
|
||||||
repeated MeshHeartbeatRelayPath relay_path = 4;
|
repeated MeshEventHeartbeatRelayPath relay_path = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message MeshHeartbeatRelayPath {
|
message MeshEventHeartbeatRelayPath {
|
||||||
// Relay ID.
|
// Relay ID.
|
||||||
string relay_id = 1;
|
string relay_id = 1;
|
||||||
|
|
||||||
@ -775,3 +839,12 @@ message MeshHeartbeatRelayPath {
|
|||||||
// SNR.
|
// SNR.
|
||||||
int32 snr = 3;
|
int32 snr = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Proprietary mesh event.
|
||||||
|
message MeshEventProprietary {
|
||||||
|
// Event type.
|
||||||
|
uint32 event_type = 1;
|
||||||
|
|
||||||
|
// Payload.
|
||||||
|
bytes payload = 2;
|
||||||
|
}
|
||||||
|
95
api/rust/proto/chirpstack/gw/gw.proto
vendored
95
api/rust/proto/chirpstack/gw/gw.proto
vendored
@ -104,6 +104,61 @@ enum TxAckStatus {
|
|||||||
DUTY_CYCLE_OVERFLOW = 11;
|
DUTY_CYCLE_OVERFLOW = 11;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Gateway events as reported by the ChirpStack Concentratord ZMQ interface.
|
||||||
|
message Event {
|
||||||
|
oneof event {
|
||||||
|
// Uplink frame.
|
||||||
|
UplinkFrame uplink_frame = 1;
|
||||||
|
|
||||||
|
// Gateway stats.
|
||||||
|
GatewayStats gateway_stats = 2;
|
||||||
|
|
||||||
|
// Gateway Mesh Event.
|
||||||
|
Mesh mesh = 3;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commands that can be sent to the ChirpStack Concentratord ZMQ interface.
|
||||||
|
message Command {
|
||||||
|
oneof command {
|
||||||
|
// Downlink frame.
|
||||||
|
DownlinkFrame send_downlink_frame = 1;
|
||||||
|
|
||||||
|
// Gateway configuration.
|
||||||
|
GatewayConfiguration set_gateway_configuration = 2;
|
||||||
|
|
||||||
|
// Get Gateway ID.
|
||||||
|
GetGatewayIdRequest get_gateway_id = 3;
|
||||||
|
|
||||||
|
// Get location.
|
||||||
|
GetLocationRequest get_location = 4;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message Mesh {
|
||||||
|
// Gateway ID (of the Border Gateway).
|
||||||
|
string gateway_id = 1;
|
||||||
|
|
||||||
|
// Relay ID.
|
||||||
|
string relay_id = 2;
|
||||||
|
|
||||||
|
// Timestamp (second precision).
|
||||||
|
google.protobuf.Timestamp time = 3;
|
||||||
|
|
||||||
|
// Mesh events.
|
||||||
|
repeated MeshEvent events = 4;
|
||||||
|
}
|
||||||
|
|
||||||
|
message MeshEvent {
|
||||||
|
oneof event {
|
||||||
|
// Proprietary Mesh event.
|
||||||
|
MeshEventProprietary proprietary = 1;
|
||||||
|
|
||||||
|
// Mesh heartbeat.
|
||||||
|
MeshEventHeartbeat heartbeat = 2;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
message Modulation {
|
message Modulation {
|
||||||
oneof parameters {
|
oneof parameters {
|
||||||
// LoRa modulation information.
|
// LoRa modulation information.
|
||||||
@ -611,6 +666,23 @@ message GatewayConfiguration {
|
|||||||
google.protobuf.Duration stats_interval = 4;
|
google.protobuf.Duration stats_interval = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
message GetGatewayIdRequest {}
|
||||||
|
|
||||||
|
message GetGatewayIdResponse {
|
||||||
|
// Gateway ID.
|
||||||
|
string gateway_id = 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
message GetLocationRequest {}
|
||||||
|
|
||||||
|
message GetLocationResponse {
|
||||||
|
// Location.
|
||||||
|
common.Location location = 1;
|
||||||
|
|
||||||
|
// Last updated at.
|
||||||
|
google.protobuf.Timestamp updated_at = 2;
|
||||||
|
}
|
||||||
|
|
||||||
message ChannelConfiguration {
|
message ChannelConfiguration {
|
||||||
// Frequency (Hz).
|
// Frequency (Hz).
|
||||||
uint32 frequency = 1;
|
uint32 frequency = 1;
|
||||||
@ -751,21 +823,13 @@ message ConnState {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Gateway Mesh heartbeat (sent periodically by the Relay Gateways).
|
// Gateway Mesh heartbeat (sent periodically by the Relay Gateways).
|
||||||
message MeshHeartbeat {
|
message MeshEventHeartbeat {
|
||||||
// Gateway ID (of the Border Gateway).
|
|
||||||
string gateway_id = 1;
|
|
||||||
|
|
||||||
// Relay ID.
|
|
||||||
string relay_id = 2;
|
|
||||||
|
|
||||||
// Timestamp (second precision).
|
|
||||||
google.protobuf.Timestamp time = 3;
|
|
||||||
|
|
||||||
// Relay path.
|
// Relay path.
|
||||||
repeated MeshHeartbeatRelayPath relay_path = 4;
|
repeated MeshEventHeartbeatRelayPath relay_path = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message MeshHeartbeatRelayPath {
|
message MeshEventHeartbeatRelayPath {
|
||||||
// Relay ID.
|
// Relay ID.
|
||||||
string relay_id = 1;
|
string relay_id = 1;
|
||||||
|
|
||||||
@ -775,3 +839,12 @@ message MeshHeartbeatRelayPath {
|
|||||||
// SNR.
|
// SNR.
|
||||||
int32 snr = 3;
|
int32 snr = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Proprietary mesh event.
|
||||||
|
message MeshEventProprietary {
|
||||||
|
// Event type.
|
||||||
|
uint32 event_type = 1;
|
||||||
|
|
||||||
|
// Payload.
|
||||||
|
bytes payload = 2;
|
||||||
|
}
|
||||||
|
9
api/rust/src/lib.rs
vendored
9
api/rust/src/lib.rs
vendored
@ -1,14 +1,17 @@
|
|||||||
|
pub use prost;
|
||||||
|
pub use prost_types;
|
||||||
|
|
||||||
#[cfg(feature = "json")]
|
#[cfg(feature = "json")]
|
||||||
pub use pbjson_types;
|
pub use pbjson_types;
|
||||||
pub use prost;
|
|
||||||
#[cfg(feature = "api")]
|
#[cfg(feature = "api")]
|
||||||
pub use tonic;
|
pub use tonic;
|
||||||
|
|
||||||
#[cfg(feature = "api")]
|
#[cfg(feature = "api")]
|
||||||
pub mod api;
|
pub mod api;
|
||||||
|
#[cfg(feature = "internal")]
|
||||||
|
pub mod internal;
|
||||||
|
|
||||||
pub mod common;
|
pub mod common;
|
||||||
pub mod gw;
|
pub mod gw;
|
||||||
pub mod integration;
|
pub mod integration;
|
||||||
#[cfg(feature = "internal")]
|
|
||||||
pub mod internal;
|
|
||||||
pub mod stream;
|
pub mod stream;
|
||||||
|
@ -1,5 +1,4 @@
|
|||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::Cursor;
|
|
||||||
use std::sync::{LazyLock, RwLock};
|
use std::sync::{LazyLock, RwLock};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
@ -352,7 +351,7 @@ async fn message_callback(
|
|||||||
.inc();
|
.inc();
|
||||||
let mut event = match json {
|
let mut event = match json {
|
||||||
true => serde_json::from_slice(&p.payload)?,
|
true => serde_json::from_slice(&p.payload)?,
|
||||||
false => chirpstack_api::gw::UplinkFrame::decode(&mut Cursor::new(&p.payload))?,
|
false => chirpstack_api::gw::UplinkFrame::decode(p.payload.as_ref())?,
|
||||||
};
|
};
|
||||||
|
|
||||||
if v4_migrate {
|
if v4_migrate {
|
||||||
@ -377,7 +376,7 @@ async fn message_callback(
|
|||||||
.inc();
|
.inc();
|
||||||
let mut event = match json {
|
let mut event = match json {
|
||||||
true => serde_json::from_slice(&p.payload)?,
|
true => serde_json::from_slice(&p.payload)?,
|
||||||
false => chirpstack_api::gw::GatewayStats::decode(&mut Cursor::new(&p.payload))?,
|
false => chirpstack_api::gw::GatewayStats::decode(p.payload.as_ref())?,
|
||||||
};
|
};
|
||||||
|
|
||||||
if v4_migrate {
|
if v4_migrate {
|
||||||
@ -401,7 +400,7 @@ async fn message_callback(
|
|||||||
.inc();
|
.inc();
|
||||||
let mut event = match json {
|
let mut event = match json {
|
||||||
true => serde_json::from_slice(&p.payload)?,
|
true => serde_json::from_slice(&p.payload)?,
|
||||||
false => chirpstack_api::gw::DownlinkTxAck::decode(&mut Cursor::new(&p.payload))?,
|
false => chirpstack_api::gw::DownlinkTxAck::decode(p.payload.as_ref())?,
|
||||||
};
|
};
|
||||||
|
|
||||||
if v4_migrate {
|
if v4_migrate {
|
||||||
@ -410,18 +409,18 @@ async fn message_callback(
|
|||||||
|
|
||||||
set_gateway_json(&event.gateway_id, json);
|
set_gateway_json(&event.gateway_id, json);
|
||||||
tokio::spawn(downlink::tx_ack::TxAck::handle(event));
|
tokio::spawn(downlink::tx_ack::TxAck::handle(event));
|
||||||
} else if topic.ends_with("/mesh-heartbeat") {
|
} else if topic.ends_with("/mesh") {
|
||||||
EVENT_COUNTER
|
EVENT_COUNTER
|
||||||
.get_or_create(&EventLabels {
|
.get_or_create(&EventLabels {
|
||||||
event: "mesh-heartbeat".to_string(),
|
event: "mesh".to_string(),
|
||||||
})
|
})
|
||||||
.inc();
|
.inc();
|
||||||
let event = match json {
|
let event = match json {
|
||||||
true => serde_json::from_slice(&p.payload)?,
|
true => serde_json::from_slice(&p.payload)?,
|
||||||
false => chirpstack_api::gw::MeshHeartbeat::decode(&mut Cursor::new(&p.payload))?,
|
false => chirpstack_api::gw::Mesh::decode(p.payload.as_ref())?,
|
||||||
};
|
};
|
||||||
|
|
||||||
tokio::spawn(uplink::mesh::MeshHeartbeat::handle(event));
|
tokio::spawn(uplink::mesh::Mesh::handle(event));
|
||||||
} else {
|
} else {
|
||||||
return Err(anyhow!("Unknown event type"));
|
return Err(anyhow!("Unknown event type"));
|
||||||
}
|
}
|
||||||
|
@ -14,14 +14,15 @@ use crate::storage::{
|
|||||||
};
|
};
|
||||||
use lrwn::EUI64;
|
use lrwn::EUI64;
|
||||||
|
|
||||||
pub struct MeshHeartbeat {
|
pub struct Mesh {
|
||||||
gateway_id: EUI64,
|
gateway_id: EUI64,
|
||||||
relay_id: RelayId,
|
relay_id: RelayId,
|
||||||
mesh_stats: gw::MeshHeartbeat,
|
time: DateTime<Utc>,
|
||||||
|
mesh_event: gw::Mesh,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MeshHeartbeat {
|
impl Mesh {
|
||||||
pub async fn handle(s: gw::MeshHeartbeat) {
|
pub async fn handle(s: gw::Mesh) {
|
||||||
let gateway_id = match EUI64::from_str(&s.gateway_id) {
|
let gateway_id = match EUI64::from_str(&s.gateway_id) {
|
||||||
Ok(v) => v,
|
Ok(v) => v,
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
@ -38,9 +39,9 @@ impl MeshHeartbeat {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let span = span!(Level::INFO, "mesh_stats", gateway_id = %gateway_id, relay_id = %relay_id);
|
let span = span!(Level::INFO, "mesh", gateway_id = %gateway_id, relay_id = %relay_id);
|
||||||
|
|
||||||
if let Err(e) = MeshHeartbeat::_handle(gateway_id, relay_id, s)
|
if let Err(e) = Mesh::_handle(gateway_id, relay_id, s)
|
||||||
.instrument(span)
|
.instrument(span)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
@ -48,52 +49,61 @@ impl MeshHeartbeat {
|
|||||||
Some(Error::NotFound(_)) => {
|
Some(Error::NotFound(_)) => {
|
||||||
let conf = config::get();
|
let conf = config::get();
|
||||||
if !conf.gateway.allow_unknown_gateways {
|
if !conf.gateway.allow_unknown_gateways {
|
||||||
error!(error = %e.full(), "Handle mesh-stats error");
|
error!(error = %e.full(), "Handle mesh error");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Some(_) | None => {
|
Some(_) | None => {
|
||||||
error!(error = %e.full(), "Handle mesh-stats error");
|
error!(error = %e.full(), "Handle mesh error");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn _handle(gateway_id: EUI64, relay_id: RelayId, s: gw::MeshHeartbeat) -> Result<()> {
|
async fn _handle(gateway_id: EUI64, relay_id: RelayId, s: gw::Mesh) -> Result<()> {
|
||||||
let mut ctx = MeshHeartbeat {
|
let ctx = Mesh {
|
||||||
gateway_id,
|
gateway_id,
|
||||||
relay_id,
|
relay_id,
|
||||||
mesh_stats: s,
|
time: s
|
||||||
|
.time
|
||||||
|
.ok_or_else(|| anyhow!("Time field is empty"))?
|
||||||
|
.try_into()
|
||||||
|
.map_err(|e| anyhow!("Covert time error: {}", e))?,
|
||||||
|
mesh_event: s,
|
||||||
};
|
};
|
||||||
|
|
||||||
ctx.update_or_create_relay_gateway().await?;
|
ctx.handle_events().await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn update_or_create_relay_gateway(&mut self) -> Result<()> {
|
async fn handle_events(&self) -> Result<()> {
|
||||||
trace!("Getting Border Gateway");
|
trace!("Handling mesh events");
|
||||||
let border_gw = gateway::get(&self.gateway_id).await?;
|
|
||||||
|
|
||||||
let ts: DateTime<Utc> = match &self.mesh_stats.time {
|
for event in &self.mesh_event.events {
|
||||||
Some(v) => (*v)
|
match &event.event {
|
||||||
.try_into()
|
Some(gw::mesh_event::Event::Proprietary(_)) | None => continue,
|
||||||
.map_err(|e| anyhow!("Convert time error: {}", e))?,
|
Some(gw::mesh_event::Event::Heartbeat(v)) => self._handle_heartbeat(v).await?,
|
||||||
None => {
|
|
||||||
warn!("Stats message does not have time field set");
|
|
||||||
return Ok(());
|
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn _handle_heartbeat(&self, _pl: &gw::MeshEventHeartbeat) -> Result<()> {
|
||||||
|
trace!("Handling heartbeat event");
|
||||||
|
|
||||||
|
let border_gw = gateway::get(&self.gateway_id).await?;
|
||||||
|
|
||||||
match gateway::get_relay_gateway(border_gw.tenant_id.into(), self.relay_id).await {
|
match gateway::get_relay_gateway(border_gw.tenant_id.into(), self.relay_id).await {
|
||||||
Ok(mut v) => {
|
Ok(mut v) => {
|
||||||
if let Some(last_seen_at) = v.last_seen_at {
|
if let Some(last_seen_at) = v.last_seen_at {
|
||||||
if last_seen_at > ts {
|
if last_seen_at > self.time {
|
||||||
warn!("Time is less than last seen timestamp, ignoring stats");
|
warn!("Time is less than last seen timestamp, ignoring heartbeat");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
v.last_seen_at = Some(ts);
|
v.last_seen_at = Some(self.time);
|
||||||
v.region_config_id = border_gw
|
v.region_config_id = border_gw
|
||||||
.properties
|
.properties
|
||||||
.get("region_config_id")
|
.get("region_config_id")
|
||||||
@ -106,7 +116,7 @@ impl MeshHeartbeat {
|
|||||||
tenant_id: border_gw.tenant_id,
|
tenant_id: border_gw.tenant_id,
|
||||||
relay_id: self.relay_id,
|
relay_id: self.relay_id,
|
||||||
name: self.relay_id.to_string(),
|
name: self.relay_id.to_string(),
|
||||||
last_seen_at: Some(ts),
|
last_seen_at: Some(self.time),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
Reference in New Issue
Block a user