Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 65 additions & 47 deletions Cargo.lock

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,10 @@ aws-smithy-types-convert = { version = "0.60.9", features = [
url = "2.5.4"

### Protobuf
helium-proto = { git = "https://github.com/helium/proto", branch = "master", features = [
helium-proto = { git = "https://github.com/helium/proto", branch = "macpie/info_stream_v2", features = [
"services",
] }
beacon = { git = "https://github.com/helium/proto", branch = "master" }
beacon = { git = "https://github.com/helium/proto", branch = "macpie/info_stream_v2" }
# Pickup versions from above
prost = "*"
tonic = { version = "*", features = ["tls-aws-lc", "tls-native-roots"] }
Expand Down
1 change: 1 addition & 0 deletions file_store_oracles/src/traits/msg_verify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ impl_msg_verify!(iot_config::AdminLoadRegionReqV1, signature);
impl_msg_verify!(iot_config::AdminRemoveKeyReqV1, signature);
impl_msg_verify!(iot_config::GatewayInfoReqV1, signature);
impl_msg_verify!(iot_config::GatewayInfoStreamReqV1, signature);
impl_msg_verify!(iot_config::GatewayInfoStreamReqV2, signature);
impl_msg_verify!(iot_config::RegionParamsReqV1, signature);
impl_msg_verify!(iot_config::GatewayInfoResV1, signature);
impl_msg_verify!(iot_config::GatewayInfoStreamResV1, signature);
Expand Down
3 changes: 2 additions & 1 deletion iot_config/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -51,5 +51,6 @@ poc-metrics = { path = "../metrics" }
task-manager = { path = "../task_manager" }

[dev-dependencies]
rand = { workspace = true }
backon = { version = "0", features = ["tokio-sleep"] }
h3o = { workspace = true }
rand = { workspace = true }
21 changes: 21 additions & 0 deletions iot_config/migrations/20251027000000_gateways.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
CREATE TABLE IF NOT EXISTS gateways (
address BYTEA PRIMARY KEY,
created_at TIMESTAMPTZ NOT NULL,
elevation INTEGER,
gain INTEGER,
hash TEXT,
is_active BOOLEAN,
is_full_hotspot BOOLEAN,
last_changed_at TIMESTAMPTZ NOT NULL,
location BIGINT,
location_asserts INTEGER,
location_changed_at TIMESTAMPTZ,
refreshed_at TIMESTAMPTZ NOT NULL,
updated_at TIMESTAMPTZ NOT NULL
);

CREATE INDEX IF NOT EXISTS gateways_last_changed_idx ON gateways (last_changed_at DESC);

CREATE INDEX IF NOT EXISTS gateways_location_changed_idx ON gateways (location_changed_at DESC)
WHERE
location IS NOT NULL;
99 changes: 99 additions & 0 deletions iot_config/src/cli/daemon.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::sync::Arc;

use crate::gateway::tracker::Tracker;
use crate::grpc_server::GrpcServer;
use crate::sub_dao_service::SubDaoService;
use crate::{
admin::AuthCache, admin_service::AdminService, db_cleaner::DbCleaner,
gateway::service::GatewayService, org, org_service::OrgService, region_map::RegionMapReader,
route_service::RouteService, settings::Settings, telemetry,
};
use task_manager::TaskManager;

#[derive(Debug, clap::Args)]
pub struct Daemon;

impl Daemon {
pub async fn run(&self, settings: &Settings) -> anyhow::Result<()> {
custom_tracing::init(settings.log.clone(), settings.custom_tracing.clone()).await?;

// Install prometheus metrics exporter
poc_metrics::start_metrics(&settings.metrics)?;
telemetry::initialize();

// Create database pool
let pool = settings.database.connect("iot-config-store").await?;
sqlx::migrate!().run(&pool).await?;

// Create on-chain metadata pool
let metadata_pool = settings.metadata.connect("iot-config-metadata").await?;

let (auth_updater, auth_cache) = AuthCache::new(settings.admin_pubkey()?, &pool).await?;
let (region_updater, region_map) = RegionMapReader::new(&pool).await?;
let (delegate_key_updater, delegate_key_cache) = org::delegate_keys_cache(&pool).await?;

let signing_keypair = Arc::new(settings.signing_keypair()?);

let gateway_svc = GatewayService::new(
signing_keypair.clone(),
pool.clone(),
region_map.clone(),
auth_cache.clone(),
delegate_key_cache,
)?;

let route_svc =
RouteService::new(signing_keypair.clone(), auth_cache.clone(), pool.clone());

let org_svc = OrgService::new(
signing_keypair.clone(),
auth_cache.clone(),
pool.clone(),
route_svc.clone_update_channel(),
delegate_key_updater,
)?;

let admin_svc = AdminService::new(
settings,
auth_cache.clone(),
auth_updater,
pool.clone(),
region_map.clone(),
region_updater,
)?;

let subdao_svc = SubDaoService::new(settings, auth_cache, metadata_pool.clone())?;

let listen_addr = settings.listen;
let pubkey = settings
.signing_keypair()
.map(|keypair| keypair.public_key().to_string())?;
tracing::debug!("listening on {listen_addr}");
tracing::debug!("signing as {pubkey}");

let tracker = Tracker::new(
pool.clone(),
metadata_pool.clone(),
settings.gateway_tracker_interval,
);

let grpc_server = GrpcServer::new(
listen_addr,
gateway_svc,
route_svc,
org_svc,
admin_svc,
subdao_svc,
);

let db_cleaner = DbCleaner::new(pool.clone(), settings.deleted_entry_retention);

TaskManager::builder()
.add_task(tracker)
.add_task(grpc_server)
.add_task(db_cleaner)
.build()
.start()
.await
}
}
34 changes: 34 additions & 0 deletions iot_config/src/cli/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use crate::{cli::daemon::Daemon, Settings};
use std::path::PathBuf;

pub mod daemon;

#[derive(Debug, clap::Parser)]
#[clap(version = env!("CARGO_PKG_VERSION"))]
#[clap(about = "Helium IoT Config Service")]
pub struct Cli {
/// Optional configuration file to use. If present, the toml file at the
/// given path will be loaded. Environment variables can override the
/// settings in the given file.
#[clap(short = 'c')]
config: Option<PathBuf>,

#[clap(subcommand)]
cmd: Cmd,
}

impl Cli {
pub async fn run(self) -> anyhow::Result<()> {
match self.cmd {
Cmd::Server(server) => {
let settings = Settings::new(self.config)?;
server.run(&settings).await
}
}
}
}

#[derive(Debug, clap::Subcommand)]
pub enum Cmd {
Server(Daemon),
}
2 changes: 1 addition & 1 deletion iot_config/src/client/mod.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::gateway_info::{self, GatewayInfo, GatewayInfoStream};
use crate::gateway::service::info::{self as gateway_info, GatewayInfo, GatewayInfoStream};
use file_store_oracles::traits::MsgVerify;
use futures::stream::{self, StreamExt};
use helium_crypto::{Keypair, PublicKey, PublicKeyBinary, Sign};
Expand Down
186 changes: 186 additions & 0 deletions iot_config/src/gateway/db.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,186 @@
use chrono::{DateTime, Utc};
use futures::{Stream, StreamExt, TryStreamExt};
use helium_crypto::PublicKeyBinary;
use sqlx::{postgres::PgRow, FromRow, PgExecutor, PgPool, Postgres, QueryBuilder, Row};

#[derive(Debug, Clone)]
pub struct Gateway {
pub address: PublicKeyBinary,
// When the record was first created from metadata DB
pub created_at: DateTime<Utc>,
pub elevation: Option<u32>,
pub gain: Option<u32>,
pub hash: String,
pub is_active: Option<bool>,
pub is_full_hotspot: Option<bool>,
// When location or hash last changed, set to refreshed_at (updated via SQL query see Gateway::insert)
pub last_changed_at: DateTime<Utc>,
pub location: Option<u64>,
pub location_asserts: Option<u32>,
// When location last changed, set to refreshed_at (updated via SQL query see Gateway::insert)
pub location_changed_at: Option<DateTime<Utc>>,
// When record was last updated from metadata DB (could be set to now if no metadata DB info)
pub refreshed_at: Option<DateTime<Utc>>,
// When record was last updated
pub updated_at: DateTime<Utc>,
}

impl Gateway {
pub async fn insert_bulk(pool: &PgPool, rows: &[Gateway]) -> anyhow::Result<u64> {
if rows.is_empty() {
return Ok(0);
}
let mut qb = QueryBuilder::<Postgres>::new(
"INSERT INTO gateways (
address,
created_at,
elevation,
gain,
hash,
is_active,
is_full_hotspot,
last_changed_at,
location,
location_asserts,
location_changed_at,
refreshed_at,
updated_at
) ",
);

qb.push_values(rows, |mut b, g| {
b.push_bind(g.address.as_ref())
.push_bind(g.created_at)
.push_bind(g.elevation.map(|v| v as i32))
.push_bind(g.gain.map(|v| v as i32))
.push_bind(g.hash.clone())
.push_bind(g.is_active)
.push_bind(g.is_full_hotspot)
.push_bind(g.last_changed_at)
.push_bind(g.location.map(|v| v as i64))
.push_bind(g.location_asserts.map(|v| v as i32))
.push_bind(g.location_changed_at)
.push_bind(g.refreshed_at)
.push_bind(g.updated_at);
});

qb.push(
" ON CONFLICT (address) DO UPDATE SET
created_at = EXCLUDED.created_at,
elevation = EXCLUDED.elevation,
gain = EXCLUDED.gain,
hash = EXCLUDED.hash,
is_active = EXCLUDED.is_active,
is_full_hotspot = EXCLUDED.is_full_hotspot,
last_changed_at = CASE
WHEN gateways.location IS DISTINCT FROM EXCLUDED.location
OR gateways.hash IS DISTINCT FROM EXCLUDED.hash
THEN EXCLUDED.refreshed_at
ELSE gateways.last_changed_at
END,
location = EXCLUDED.location,
location_asserts = EXCLUDED.location_asserts,
location_changed_at = CASE
WHEN gateways.location IS DISTINCT FROM EXCLUDED.location
THEN EXCLUDED.refreshed_at
ELSE gateways.location_changed_at
END,
refreshed_at = EXCLUDED.refreshed_at,
updated_at = EXCLUDED.updated_at",
);

let res = qb.build().execute(pool).await?;
Ok(res.rows_affected())
}

pub async fn get_by_address<'a>(
db: impl PgExecutor<'a>,
address: &PublicKeyBinary,
) -> anyhow::Result<Option<Self>> {
let gateway = sqlx::query_as::<_, Self>(
r#"
SELECT
address,
created_at,
elevation,
gain,
hash,
is_active,
is_full_hotspot,
last_changed_at,
location,
location_asserts,
location_changed_at,
refreshed_at,
updated_at
FROM gateways
WHERE address = $1
"#,
)
.bind(address.as_ref())
.fetch_optional(db)
.await?;

Ok(gateway)
}

pub fn stream<'a>(
db: impl PgExecutor<'a> + 'a,
min_last_changed_at: DateTime<Utc>,
min_location_changed_at: Option<DateTime<Utc>>,
) -> impl Stream<Item = Self> + 'a {
sqlx::query_as::<_, Self>(
r#"
SELECT
address,
created_at,
elevation,
gain,
hash,
is_active,
is_full_hotspot,
last_changed_at,
location,
location_asserts,
location_changed_at,
refreshed_at,
updated_at
FROM gateways
WHERE last_changed_at >= $1
AND (
$2::timestamptz IS NULL
OR (location IS NOT NULL AND location_changed_at >= $2)
)
"#,
)
.bind(min_last_changed_at)
.bind(min_location_changed_at)
.fetch(db)
.map_err(anyhow::Error::from)
.filter_map(|res| async move { res.ok() })
}
}

impl FromRow<'_, PgRow> for Gateway {
fn from_row(row: &PgRow) -> sqlx::Result<Self> {
// helpers to map Option<i64> -> Option<u32/u64>
let to_u64 = |v: Option<i64>| -> Option<u64> { v.map(|x| x as u64) };
let to_u32 = |v: Option<i32>| -> Option<u32> { v.map(|x| x as u32) };

Ok(Self {
address: PublicKeyBinary::from(row.try_get::<Vec<u8>, _>("address")?),
created_at: row.try_get("created_at")?,
elevation: to_u32(row.try_get("elevation")?),
gain: to_u32(row.try_get("gain")?),
hash: row.try_get("hash")?,
is_active: row.try_get("is_active")?,
is_full_hotspot: row.try_get("is_full_hotspot")?,
last_changed_at: row.try_get("last_changed_at")?,
location: to_u64(row.try_get("location")?),
location_asserts: to_u32(row.try_get("location_asserts")?),
location_changed_at: row.try_get("location_changed_at")?,
refreshed_at: row.try_get("refreshed_at")?,
updated_at: row.try_get("updated_at")?,
})
}
}
Loading