diff --git a/Cargo.lock b/Cargo.lock index a254298..dae3900 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2267,6 +2267,15 @@ version = "2.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "469fb0b9cefa57e3ef31275ee7cacb78f2fdca44e4765491884a2b119d4eb130" +[[package]] +name = "ipnetwork" +version = "0.18.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4088d739b183546b239688ddbc79891831df421773df95e236daf7867866d355" +dependencies = [ + "serde", +] + [[package]] name = "iri-string" version = "0.7.9" @@ -2573,6 +2582,18 @@ version = "0.8.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" +[[package]] +name = "maxminddb" +version = "0.23.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fe2ba61113f9f7a9f0e87c519682d39c43a6f3f79c2cc42c3ba3dda83b1fa334" +dependencies = [ + "ipnetwork", + "log", + "memchr", + "serde", +] + [[package]] name = "memchr" version = "2.7.6" @@ -4605,6 +4626,7 @@ dependencies = [ "libbpf-rs", "local-ip-address", "log", + "maxminddb", "mimalloc", "multer", "native-tls", diff --git a/Cargo.toml b/Cargo.toml index 7b96b21..1bbf760 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -98,6 +98,7 @@ ctrlc = "3.5.0" arc-swap = "1.7.1" prometheus = "0.14.0" once_cell = "1.21.3" +maxminddb = "0.23" axum-server = { version = "0.7.3", features = ["tls-openssl"] } axum = { version = "0.8.7" } tower-http = { version = "0.6.8", features = ["fs"] } diff --git a/docs/ENVIRONMNET_VARS.md b/docs/ENVIRONMNET_VARS.md index c2e5691..2346d72 100644 --- a/docs/ENVIRONMNET_VARS.md +++ b/docs/ENVIRONMNET_VARS.md @@ -14,6 +14,9 @@ export AX_NETWORK_DISABLE_XDP="false" # Gen0Sec configuration export AX_ARXIGNIS_API_KEY="your-api-key" export AX_ARXIGNIS_BASE_URL="https://api.gen0sec.com/v1" +export AX_ARXIGNIS_THREAT_MMDB_URL="https://s3.amazonaws.com/your-bucket/indicators-latest.mmdb" +export AX_ARXIGNIS_THREAT_MMDB_PATH="/var/cache/synapse/threat.mmdb" +export AX_ARXIGNIS_THREAT_MMDB_REFRESH_SECS="0" # CAPTCHA configuration export AX_CAPTCHA_SITE_KEY="your-site-key" diff --git a/src/cli.rs b/src/cli.rs index fd13836..2b48983 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -136,6 +136,30 @@ pub struct Gen0SecConfig { pub api_key: String, #[serde(default = "default_base_url")] pub base_url: String, + /// URL to the threat-intel MMDB file + #[serde(default)] + pub threat_mmdb_url: String, + /// Optional local path to store/read the threat MMDB file + #[serde(default)] + pub threat_mmdb_path: Option, + /// Optional versions.txt URL to check for the latest threat MMDB version + #[serde(default)] + pub threat_mmdb_versions_url: String, + /// How often to refresh the threat MMDB from the remote URL (seconds). Default: 300 (5 minutes) + #[serde(default = "default_threat_mmdb_refresh_secs")] + pub threat_mmdb_refresh_secs: u64, + /// URL to the GeoIP MMDB file + #[serde(default)] + pub geoip_mmdb_url: String, + /// Optional local path to store/read the GeoIP MMDB file + #[serde(default)] + pub geoip_mmdb_path: Option, + /// Optional versions.txt URL to check for the latest GeoIP MMDB version + #[serde(default)] + pub geoip_mmdb_versions_url: String, + /// How often to refresh the GeoIP MMDB from the remote URL (seconds). Default: 28800 (8 hours) + #[serde(default = "default_geoip_mmdb_refresh_secs")] + pub geoip_mmdb_refresh_secs: u64, #[serde(default = "default_log_sending_enabled")] pub log_sending_enabled: bool, #[serde(default = "default_include_response_body")] @@ -150,6 +174,14 @@ fn default_base_url() -> String { "https://api.gen0sec.com/v1".to_string() } +fn default_threat_mmdb_refresh_secs() -> u64 { + 300 // 5 minutes +} + +fn default_geoip_mmdb_refresh_secs() -> u64 { + 28800 // 8 hours +} + fn default_log_sending_enabled() -> bool { true } @@ -229,6 +261,14 @@ impl Config { arxignis: Gen0SecConfig { api_key: "".to_string(), base_url: "https://api.gen0sec.com/v1".to_string(), + threat_mmdb_url: "".to_string(), + threat_mmdb_path: None, + threat_mmdb_versions_url: "".to_string(), + threat_mmdb_refresh_secs: 300, + geoip_mmdb_url: "".to_string(), + geoip_mmdb_path: None, + geoip_mmdb_versions_url: "".to_string(), + geoip_mmdb_refresh_secs: 28800, log_sending_enabled: true, include_response_body: true, max_body_size: 1024 * 1024, // 1MB diff --git a/src/main.rs b/src/main.rs index 1d33a7c..a139e72 100644 --- a/src/main.rs +++ b/src/main.rs @@ -63,7 +63,6 @@ fn main() -> Result<()> { .map_err(|e| anyhow::anyhow!("Failed to install rustls crypto provider: {:?}", e))?; let args = Args::parse(); - // Handle clear certificate command (runs before loading full config) if let Some(certificate_name) = &args.clear_certificate { // Initialize minimal runtime for async operations @@ -85,7 +84,11 @@ fn main() -> Result<()> { } // Get certificate path from config - let certificate_path = config.pingora.proxy_certificates.unwrap_or_else(|| "/etc/synapse/certs".to_string()); + let certificate_path = config + .pingora + .proxy_certificates + .clone() + .unwrap_or_else(|| "/etc/synapse/certs".to_string()); // Clear the certificate rt.block_on(crate::worker::certificate::clear_certificate( @@ -99,7 +102,9 @@ fn main() -> Result<()> { // Validate required arguments when no config file is provided if args.config.is_none() { if args.arxignis_api_key.is_none() { - return Err(anyhow::anyhow!("--arxignis-api-key is required when no config file is provided")); + return Err(anyhow::anyhow!( + "--arxignis-api-key is required when no config file is provided" + )); } } @@ -415,7 +420,6 @@ async fn async_main(_args: Args, config: Config) -> Result<()> { } else { log::info!("Embedded ACME server disabled (acme.enabled: false)"); } - let (shutdown_tx, shutdown_rx) = watch::channel(false); // Initialize Redis manager if Redis URL is provided @@ -556,48 +560,115 @@ async fn async_main(_args: Args, config: Config) -> Result<()> { if let Err(e) = init_config( config.arxignis.base_url.clone(), config.arxignis.api_key.clone(), - ).await { + ) + .await + { log::error!("Failed to initialize HTTP filter with config: {}", e); log::error!("Aborting startup because WAF config could not be loaded"); return Err(e); } - // Initialize threat intelligence client - if let Err(e) = threat::init_threat_client( - config.arxignis.base_url.clone(), - config.arxignis.api_key.clone(), - ).await { - log::warn!("Failed to initialize threat client: {}", e); + // Initialize threat intelligence client (Threat MMDB → GeoIP MMDB fallback) + if !config.arxignis.threat_mmdb_url.is_empty() || !config.arxignis.geoip_mmdb_url.is_empty() { + if let Err(e) = threat::init_threat_client( + config.arxignis.base_url.clone(), + config.arxignis.api_key.clone(), + config.arxignis.threat_mmdb_path.clone(), + config.arxignis.geoip_mmdb_path.clone(), + ) + .await + { + log::warn!("Failed to initialize threat client: {}", e); + } else { + log::info!("Threat intelligence client initialized"); + + // Register Threat MMDB refresh worker if configured + // Allow empty versions_url for direct MMDB download + if !config.arxignis.threat_mmdb_url.is_empty() + && config.arxignis.threat_mmdb_refresh_secs > 0 + { + let refresh_interval = config.arxignis.threat_mmdb_refresh_secs; + let worker_config = worker::WorkerConfig { + name: "threat_mmdb".to_string(), + interval_secs: refresh_interval, + enabled: true, + }; + let worker = worker::threat_mmdb::ThreatMmdbWorker::new( + refresh_interval, + config.arxignis.threat_mmdb_url.clone(), + config.arxignis.threat_mmdb_versions_url.clone(), + config.arxignis.threat_mmdb_path.clone(), + ); + if let Err(e) = worker_manager.register_worker(worker_config, worker) { + log::error!("Failed to register threat MMDB worker: {}", e); + } else { + log::info!( + "Registered threat MMDB worker (interval: {}s, checks for threat intel updates)", + refresh_interval + ); + } + } + + // Register GeoIP MMDB refresh worker if configured + if !config.arxignis.geoip_mmdb_url.is_empty() + && config.arxignis.geoip_mmdb_refresh_secs > 0 + { + let refresh_interval = config.arxignis.geoip_mmdb_refresh_secs; + let worker_config = worker::WorkerConfig { + name: "geoip_mmdb".to_string(), + interval_secs: refresh_interval, + enabled: true, + }; + let worker = worker::geoip_mmdb::GeoipMmdbWorker::new( + refresh_interval, + config.arxignis.geoip_mmdb_url.clone(), + config.arxignis.geoip_mmdb_versions_url.clone(), + config.arxignis.geoip_mmdb_path.clone(), + ); + if let Err(e) = worker_manager.register_worker(worker_config, worker) { + log::error!("Failed to register GeoIP MMDB worker: {}", e); + } else { + log::info!( + "Registered GeoIP MMDB worker (interval: {}s, checks for GeoIP database updates)", + refresh_interval + ); + } + } + } } else { - log::info!("Threat intelligence client initialized"); + log::warn!("Threat and GeoIP MMDB URLs not configured; threat lookups will use API only"); } // Initialize captcha client if configuration is provided if let (Some(site_key), Some(secret_key), Some(jwt_secret)) = ( &config.arxignis.captcha.site_key, &config.arxignis.captcha.secret_key, - &config.arxignis.captcha.jwt_secret + &config.arxignis.captcha.jwt_secret, ) { let captcha_config = CaptchaConfig { site_key: site_key.clone(), secret_key: secret_key.clone(), jwt_secret: jwt_secret.clone(), - provider: CaptchaProvider::from_str(&config.arxignis.captcha.provider).unwrap_or(CaptchaProvider::HCaptcha), + provider: CaptchaProvider::from_str(&config.arxignis.captcha.provider) + .unwrap_or(CaptchaProvider::HCaptcha), token_ttl_seconds: config.arxignis.captcha.token_ttl, validation_cache_ttl_seconds: config.arxignis.captcha.cache_ttl, }; - if let Err(e) = init_captcha_client( - captcha_config, - ).await { + if let Err(e) = init_captcha_client(captcha_config).await { log::warn!("Failed to initialize captcha client: {}", e); } else { - log::info!("Captcha client initialized with provider: {}", config.arxignis.captcha.provider); + log::info!( + "Captcha client initialized with provider: {}", + config.arxignis.captcha.provider + ); // Start captcha cache cleanup task start_cache_cleanup_task().await; } } else { - log::info!("Captcha client not initialized (missing site_key, secret_key, or jwt_secret)"); + log::info!( + "Captcha client not initialized (missing site_key, secret_key, or jwt_secret)" + ); } } else { log::warn!("No API credentials provided, HTTP filter will not be initialized"); diff --git a/src/threat/mod.rs b/src/threat/mod.rs index 202e8f5..d2f2b7d 100644 --- a/src/threat/mod.rs +++ b/src/threat/mod.rs @@ -1,14 +1,12 @@ -use std::sync::Arc; -use std::time::Duration; +use std::{net::IpAddr, path::PathBuf, sync::Arc, time::Duration}; -use anyhow::{Context, Result}; +use anyhow::{anyhow, Context, Result}; use chrono::{DateTime, Utc}; -use redis::AsyncCommands; -use serde::{Deserialize, Deserializer, Serialize}; -use tokio::sync::OnceCell; +use maxminddb::{geoip2, MaxMindDBError, Reader}; use pingora_memory_cache::MemoryCache; +use serde::{Deserialize, Deserializer, Serialize}; +use tokio::sync::{OnceCell, RwLock}; -use crate::redis::RedisManager; use crate::http_client::get_global_reqwest_client; /// Custom deserializer for optional datetime fields that can be empty strings or missing @@ -16,7 +14,6 @@ fn deserialize_optional_datetime<'de, D>(deserializer: D) -> Result, { - // Try to deserialize as Option first match Option::::deserialize(deserializer)? { Some(s) => { if s.is_empty() { @@ -31,7 +28,7 @@ where } } -/// Threat intelligence response from Gen0Sec API +/// Threat intelligence response (REST shape) #[derive(Debug, Clone, Serialize, Deserialize)] pub struct ThreatResponse { pub schema_version: String, @@ -101,63 +98,79 @@ impl From<&ThreatResponse> for WafFields { } } -/// Threat intelligence client with pingora-memory-cache and Redis caching +/// Threat intel client: Threat MMDB first, then GeoIP MMDB fallback, with in-memory cache pub struct ThreatClient { - base_url: String, + api_base_url: String, api_key: String, + threat_mmdb_path: Option, + geoip_mmdb_path: Option, + threat_reader: RwLock>>>>, + geoip_reader: RwLock>>>>, pingora_cache: Arc>, } impl ThreatClient { pub fn new( - base_url: String, + api_base_url: String, api_key: String, + threat_mmdb_path: Option, + geoip_mmdb_path: Option, ) -> Self { Self { - base_url, + api_base_url, api_key, - pingora_cache: Arc::new(MemoryCache::new(10000)), + threat_mmdb_path, + geoip_mmdb_path, + threat_reader: RwLock::new(None), + geoip_reader: RwLock::new(None), + pingora_cache: Arc::new(MemoryCache::new(10_000)), } } + pub async fn refresh_threat(&self) -> Result<()> { + self.refresh_threat_reader().await.map(|_| ()) + } + + pub async fn refresh_geoip(&self) -> Result<()> { + self.refresh_geoip_reader().await.map(|_| ()) + } + /// Get threat intelligence for an IP address with caching + /// Priority: Cache → Threat MMDB → GeoIP MMDB (REST API disabled) pub async fn get_threat_intel(&self, ip: &str) -> Result> { - // Check pingora-memory-cache - let (cached_data, status) = self.pingora_cache.get(ip); - if let Some(data) = cached_data { + // L1 cache + let (cached, status) = self.pingora_cache.get(ip); + if let Some(data) = cached { if status.is_hit() { log::debug!("Threat data for {} found in pingora-memory-cache", ip); return Ok(Some(data)); } } - // Check L2 cache (Redis) with TTL from API response - if let Some(cached) = self.get_l2_cache(ip).await? { - log::debug!("Threat data for {} found in L2 cache", ip); - // Store in pingora-memory-cache for faster access - self.set_pingora_cache(ip, &cached).await; - return Ok(Some(cached)); - } - - // Fetch from API - match self.fetch_from_api(ip).await { - Ok(Some(threat_data)) => { - log::debug!("Threat data for {} fetched from API", ip); - - // Store in caches - self.set_pingora_cache(ip, &threat_data).await; - if let Err(e) = self.set_l2_cache(ip, &threat_data).await { - log::warn!("Failed to store threat data in L2 cache: {}", e); - } - - Ok(Some(threat_data)) - } - Ok(None) => Ok(None), + let ip_addr: IpAddr = match ip.parse() { + Ok(v) => v, Err(e) => { - log::error!("Failed to fetch threat data for {}: {}", ip, e); - Err(e) + log::warn!("Invalid IP {}: {}", ip, e); + return Ok(None); } + }; + + // Check Threat MMDB first (if configured) + log::info!("🔍 [Threat] Checking Threat MMDB for {}", ip); + if let Some(threat_data) = self.lookup_threat_mmdb(ip, ip_addr).await? { + log::info!("🔍 [Threat] Found threat data in Threat MMDB for {}: score={}", ip, threat_data.intel.score); + self.set_pingora_cache(ip, &threat_data).await; + return Ok(Some(threat_data)); } + + // REST API disabled - skip directly to GeoIP fallback + log::info!("🔍 [Threat] No threat data found for {} in Threat MMDB, using GeoIP fallback", ip); + + // GeoIP fallback + let (geo, asn, org) = self.lookup_geo(ip_addr).await?; + let response = build_no_data_response(ip, ip_addr, geo, asn, org); + self.set_pingora_cache(ip, &response).await; + Ok(Some(response)) } /// Get WAF fields for an IP address @@ -169,11 +182,12 @@ impl ThreatClient { } } - /// Fetch threat data from Gen0Sec API async fn fetch_from_api(&self, ip: &str) -> Result> { - let url = format!("{}/threat?ip={}", self.base_url, ip); + if self.api_base_url.is_empty() || self.api_key.is_empty() { + return Ok(None); + } - // Use shared HTTP client with keepalive instead of creating new client + let url = format!("{}/threat?ip={}", self.api_base_url.trim_end_matches('/'), ip); let client = get_global_reqwest_client() .context("Failed to get global HTTP client")?; @@ -185,86 +199,241 @@ impl ThreatClient { .context("Failed to send HTTP request")?; if response.status() == 404 { - // IP not found in threat database return Ok(None); } - if !response.status().is_success() { let status = response.status(); let body = response.text().await.unwrap_or_else(|_| "Unable to read response body".to_string()); log::error!("API request failed with status: {}, body: {}", status, body); - return Err(anyhow::anyhow!( - "API request failed with status: {}", - status - )); + return Ok(None); } let response_text = response.text().await .context("Failed to read response body")?; - log::debug!("API response body: {}", response_text); - let threat_data: ThreatResponse = serde_json::from_str(&response_text) .context("Failed to parse JSON response")?; Ok(Some(threat_data)) } - /// Get data from L2 cache (Redis) - async fn get_l2_cache(&self, ip: &str) -> Result> { - let redis_manager = match RedisManager::get() { - Ok(manager) => manager, - Err(_) => return Ok(None), - }; + /// Read the MMDB from the configured local path and keep it in memory + async fn refresh_threat_reader(&self) -> Result>>> { + let path = self + .threat_mmdb_path + .clone() + .ok_or_else(|| anyhow!("Threat MMDB path not configured"))?; - let key = format!("{}:threat:{}", redis_manager.create_namespace("threat"), ip); - let mut redis = redis_manager.get_connection(); + let data = tokio::fs::read(&path) + .await + .with_context(|| format!("Failed to read Threat MMDB from {:?}", path))?; - match redis.get::<_, Option>(&key).await { - Ok(Some(data)) => { - match serde_json::from_str::(&data) { - Ok(threat_data) => Ok(Some(threat_data)), - Err(e) => { - log::warn!("Failed to deserialize cached threat data for key {}: {}. Clearing cache entry.", key, e); - // Clear the invalid cache entry - let _: () = redis.del(&key).await.unwrap_or_default(); - Ok(None) - } - } - } - Ok(None) => Ok(None), - Err(e) => { - log::warn!("Redis get error for key {}: {}", key, e); - Ok(None) + let reader = Reader::from_source(data) + .context("Failed to parse Threat MMDB data")?; + + let arc = Arc::new(reader); + + let mut guard = self.threat_reader.write().await; + *guard = Some(arc.clone()); + + log::info!("Threat MMDB loaded from {:?}", path); + + Ok(arc) + } + + async fn refresh_geoip_reader(&self) -> Result>>> { + let path = self + .geoip_mmdb_path + .clone() + .ok_or_else(|| anyhow!("GeoIP MMDB path not configured"))?; + + let data = tokio::fs::read(&path) + .await + .with_context(|| format!("Failed to read GeoIP MMDB from {:?}", path))?; + + let reader = Reader::from_source(data) + .context("Failed to parse GeoIP MMDB data")?; + + let arc = Arc::new(reader); + + let mut guard = self.geoip_reader.write().await; + *guard = Some(arc.clone()); + + log::info!("GeoIP MMDB loaded from {:?}", path); + + Ok(arc) + } + + async fn ensure_threat_reader(&self) -> Result>>> { + { + let guard = self.threat_reader.read().await; + if let Some(existing) = guard.as_ref() { + return Ok(existing.clone()); } } + + // No reader loaded yet – fetch it now + self.refresh_threat_reader().await } - /// Set data in pingora-memory-cache with TTL from API response - async fn set_pingora_cache(&self, ip: &str, data: &ThreatResponse) { - let ttl = Duration::from_secs(data.ttl_s); - self.pingora_cache.put(ip, data.clone(), Some(ttl)); + async fn ensure_geoip_reader(&self) -> Result>>> { + { + let guard = self.geoip_reader.read().await; + if let Some(existing) = guard.as_ref() { + return Ok(existing.clone()); + } + } + + // No reader loaded yet – fetch it now + self.refresh_geoip_reader().await } - /// Set data in L2 cache (Redis) with TTL from API response - async fn set_l2_cache(&self, ip: &str, data: &ThreatResponse) -> Result<()> { - let redis_manager = match RedisManager::get() { - Ok(manager) => manager, - Err(_) => return Ok(()), + /// Look up threat intelligence from the Threat MMDB + async fn lookup_threat_mmdb(&self, ip: &str, ip_addr: IpAddr) -> Result> { + log::info!("🔍 [Threat MMDB] Starting lookup for {}", ip); + + // Check if threat reader is available + let reader_opt = { + let guard = self.threat_reader.read().await; + guard.clone() }; - let key = format!("{}:threat:{}", redis_manager.create_namespace("threat"), ip); - let mut redis = redis_manager.get_connection(); + let reader = match reader_opt { + Some(r) => { + log::info!("🔍 [Threat MMDB] Reader available, performing lookup"); + r + } + None => { + log::warn!("🔍 [Threat MMDB] Reader not loaded, skipping threat lookup for {}", ip); + return Ok(None); + } + }; - let json_data = serde_json::to_string(data) - .context("Failed to serialize threat data")?; + // Perform blocking MMDB lookup in a separate thread + let result = tokio::task::spawn_blocking({ + let reader = reader.clone(); + let ip_addr_clone = ip_addr; + move || -> Result { + reader.lookup(ip_addr_clone) + } + }) + .await; - let _: () = redis - .set_ex(&key, json_data, data.ttl_s) - .await - .context("Failed to store threat data in Redis")?; + match result { + Ok(Ok(threat_data)) => { + log::info!("🔍 [Threat MMDB] Found data for {}: {:?}", ip, threat_data); + Ok(Some(threat_data)) + } + Ok(Err(maxminddb::MaxMindDBError::AddressNotFoundError(_))) => { + log::debug!("🔍 [Threat MMDB] IP {} not found in database", ip); + Ok(None) + } + Ok(Err(e)) => { + log::warn!("🔍 [Threat MMDB] Lookup error for {}: {}", ip, e); + Ok(None) + } + Err(e) => { + log::warn!("🔍 [Threat MMDB] Task error for {}: {}", ip, e); + Ok(None) + } + } + } + + async fn lookup_geo(&self, ip: IpAddr) -> Result<(GeoInfo, u32, String)> { + log::debug!("🔍 [GeoIP] Looking up IP: {}", ip); + + // ASN lookup (extract owned data inside blocking task) + let reader = self.ensure_geoip_reader().await?; + let (asn_num, asn_org) = tokio::task::spawn_blocking({ + let reader = reader.clone(); + let ip_clone = ip; + move || -> Result<(u32, String), MaxMindDBError> { + match reader.lookup::(ip_clone) { + Ok(res) => { + let asn = res.autonomous_system_number.unwrap_or(0); + let org = res.autonomous_system_organization.unwrap_or("").to_string(); + log::info!("🔍 [GeoIP] ASN Lookup Success for {}: ASN={}, Org='{}'", ip_clone, asn, org); + Ok((asn, org)) + } + Err(e) => { + log::warn!("🔍 [GeoIP] ASN Lookup FAILED for {}: {}", ip_clone, e); + Ok((0, String::new())) + } + } + } + }) + .await??; + + // Country lookup (owned iso and name) + let reader2 = self.ensure_geoip_reader().await?; + let (iso, country_name) = tokio::task::spawn_blocking({ + let reader = reader2.clone(); + let ip_clone = ip; + move || -> Result<(String, String), MaxMindDBError> { + match reader.lookup::(ip_clone) { + Ok(country) => { + // Debug: Log the raw country data structure + log::info!("🔍 [GeoIP] RAW Country Data for {}: {:?}", ip_clone, country); + + // Try multiple fields + let iso = country + .country + .as_ref() + .and_then(|c| c.iso_code) + .unwrap_or("") + .to_string(); + + // Also try registered_country if country is empty + let iso = if iso.is_empty() { + country + .registered_country + .as_ref() + .and_then(|c| c.iso_code) + .unwrap_or("") + .to_string() + } else { + iso + }; + + let country_name = country + .country + .as_ref() + .and_then(|c| c.names.as_ref()) + .and_then(|n| n.get("en")) + .map(|s| s.to_string()) + .unwrap_or_default(); + + if iso.is_empty() { + log::warn!("🔍 [GeoIP] Country Lookup for {} returned EMPTY iso_code (IP found but no country data in ANY field)", ip_clone); + } else { + log::info!("🔍 [GeoIP] Country Lookup Success for {}: Country='{}' ({})", ip_clone, iso, country_name); + } + Ok((iso, country_name)) + } + Err(e) => { + log::warn!("🔍 [GeoIP] Country Lookup FAILED for {}: IP NOT FOUND in database - {}", ip_clone, e); + Ok((String::new(), String::new())) + } + } + } + }) + .await??; + + Ok(( + GeoInfo { + country: country_name, + iso_code: iso.clone(), + asn_iso_code: iso, + }, + asn_num, + asn_org, + )) + } - Ok(()) + /// Set data in pingora-memory-cache with TTL from record + async fn set_pingora_cache(&self, ip: &str, data: &ThreatResponse) { + let ttl = Duration::from_secs(data.ttl_s); + self.pingora_cache.put(ip, data.clone(), Some(ttl)); } } @@ -273,17 +442,53 @@ static THREAT_CLIENT: OnceCell> = OnceCell::const_new(); /// Initialize the global threat client pub async fn init_threat_client( - base_url: String, + api_base_url: String, api_key: String, + threat_mmdb_path: Option, + geoip_mmdb_path: Option, ) -> Result<()> { - let client = Arc::new(ThreatClient::new(base_url, api_key)); + let client = Arc::new(ThreatClient::new( + api_base_url, + api_key, + threat_mmdb_path, + geoip_mmdb_path, + )); + + // Best-effort initial load from local MMDB paths. + // If the files are not present yet, the workers can download them later. + if let Err(e) = client.refresh_threat_reader().await { + log::warn!("Initial Threat MMDB load failed: {}", e); + } - THREAT_CLIENT.set(client) + if let Err(e) = client.refresh_geoip_reader().await { + log::warn!("Initial GeoIP MMDB load failed: {}", e); + } + + THREAT_CLIENT + .set(client) .map_err(|_| anyhow::anyhow!("Failed to initialize threat client"))?; Ok(()) } +/// Trigger an immediate Threat MMDB refresh (used by worker) +pub async fn refresh_threat_mmdb() -> Result<()> { + let client = THREAT_CLIENT + .get() + .ok_or_else(|| anyhow::anyhow!("Threat client not initialized"))?; + + client.refresh_threat().await +} + +/// Trigger an immediate GeoIP MMDB refresh (used by worker) +pub async fn refresh_geoip_mmdb() -> Result<()> { + let client = THREAT_CLIENT + .get() + .ok_or_else(|| anyhow::anyhow!("Threat client not initialized"))?; + + client.refresh_geoip().await +} + /// Get threat intelligence for an IP address pub async fn get_threat_intel(ip: &str) -> Result> { let client = THREAT_CLIENT @@ -301,3 +506,39 @@ pub async fn get_waf_fields(ip: &str) -> Result> { client.get_waf_fields(ip).await } + +fn parse_optional_datetime(raw: Option<&str>) -> Option> { + raw.and_then(|s| DateTime::parse_from_rfc3339(s).ok()) + .map(|dt| dt.with_timezone(&Utc)) +} + +fn build_no_data_response(ip: &str, ip_addr: IpAddr, geo: GeoInfo, asn: u32, org: String) -> ThreatResponse { + ThreatResponse { + schema_version: "1.0".to_string(), + tenant_id: "geoip".to_string(), + ip: ip.to_string(), + intel: ThreatIntel { + score: 0, + confidence: 0.0, + score_version: "geoip".to_string(), + categories: vec![], + tags: vec![], + first_seen: None, + last_seen: None, + source_count: 0, + reason_code: "NO_DATA".to_string(), + reason_summary: "No threat data available".to_string(), + rule_id: "none".to_string(), + }, + context: ThreatContext { + asn, + org, + ip_version: if ip_addr.is_ipv4() { 4 } else { 6 }, + geo, + }, + advice: "allow".to_string(), + ttl_s: 300, + generated_at: Utc::now(), + } +} + diff --git a/src/waf/wirefilter.rs b/src/waf/wirefilter.rs index 54aa9cd..0d4de74 100644 --- a/src/waf/wirefilter.rs +++ b/src/waf/wirefilter.rs @@ -432,9 +432,12 @@ impl HttpFilter { // Fetch threat intelligence data for the source IP // Fetch full threat response for access logging, and WAF fields for rule evaluation + log::info!("🔍 [WAF] Looking up GeoIP for: {}", peer_addr.ip()); let threat_response = threat::get_threat_intel(&peer_addr.ip().to_string()).await.ok().flatten(); let _threat_fields = if let Some(ref threat_resp) = threat_response { let waf_fields = threat::WafFields::from(threat_resp); + log::info!("🔍 [WAF] GeoIP Result: Country='{}', ASN={}, ThreatScore={}", + waf_fields.ip_src_country, waf_fields.ip_src_asn, waf_fields.threat_score); // Set threat intelligence fields ctx.set_field_value( self.scheme.get_field("ip.src.country").unwrap(), @@ -463,6 +466,7 @@ impl HttpFilter { Some(waf_fields) } else { // No threat data found, set default values + log::warn!("🔍 [WAF] No GeoIP data found for {}, setting empty country", peer_addr.ip()); ctx.set_field_value( self.scheme.get_field("ip.src.country").unwrap(), "", diff --git a/src/worker/config.rs b/src/worker/config.rs index ca8b39c..940bcb3 100644 --- a/src/worker/config.rs +++ b/src/worker/config.rs @@ -125,9 +125,9 @@ pub async fn fetch_config( .map_err(|e| anyhow::anyhow!("Failed to get global HTTP client: {}", e))?; let url = format!("{}/config", base_url); - + let response = client - .get(url) + .get(&url) .header("Authorization", format!("Bearer {}", api_key)) .header("Accept-Encoding", "gzip") .send() @@ -192,6 +192,7 @@ pub async fn fetch_config( }; format!("Failed to parse JSON response: {}. Response preview: {}", e, preview) })?; + // Update global config snapshot set_global_config(body.config.clone()); Ok(body) diff --git a/src/worker/geoip_mmdb.rs b/src/worker/geoip_mmdb.rs new file mode 100644 index 0000000..1930250 --- /dev/null +++ b/src/worker/geoip_mmdb.rs @@ -0,0 +1,230 @@ +use std::path::PathBuf; + +use anyhow::{anyhow, Context, Result}; +use tokio::sync::watch; +use tokio::task::JoinHandle; +use tokio::time::{interval, Duration, MissedTickBehavior}; + +use crate::http_client::get_global_reqwest_client; +use crate::worker::Worker; + +pub struct GeoipMmdbWorker { + interval_secs: u64, + mmdb_base_url: String, + versions_url: String, + mmdb_path: Option, +} + +impl GeoipMmdbWorker { + pub fn new( + interval_secs: u64, + mmdb_base_url: String, + versions_url: String, + mmdb_path: Option, + ) -> Self { + Self { + interval_secs, + mmdb_base_url, + versions_url, + mmdb_path, + } + } +} + +impl Worker for GeoipMmdbWorker { + fn name(&self) -> &str { + "geoip_mmdb" + } + + fn run(&self, mut shutdown: watch::Receiver) -> JoinHandle<()> { + let interval_secs = self.interval_secs; + let mmdb_base_url = self.mmdb_base_url.clone(); + let versions_url = self.versions_url.clone(); + let mmdb_path = self.mmdb_path.clone(); + + tokio::spawn(async move { + let worker_name = "geoip_mmdb".to_string(); + let mut current_version: Option = None; + + log::info!( + "[{}] Starting GeoIP MMDB worker (interval: {}s)", + worker_name, + interval_secs + ); + + match sync_mmdb( + &mmdb_base_url, + &versions_url, + mmdb_path.clone(), + current_version.clone(), + ) + .await + { + Ok(new_ver) => { + current_version = new_ver; + } + Err(e) => { + log::warn!("[{}] Initial GeoIP MMDB sync failed: {}", worker_name, e); + } + } + + let mut ticker = interval(Duration::from_secs(interval_secs)); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + tokio::select! { + _ = ticker.tick() => { + match sync_mmdb( + &mmdb_base_url, + &versions_url, + mmdb_path.clone(), + current_version.clone(), + ).await { + Ok(new_ver) => { + if new_ver != current_version { + current_version = new_ver; + log::info!("[{}] GeoIP MMDB updated to latest version", worker_name); + } else { + log::debug!("[{}] GeoIP MMDB already at latest version", worker_name); + } + } + Err(e) => { + log::warn!("[{}] Periodic GeoIP MMDB sync failed: {}", worker_name, e); + } + } + } + _ = shutdown.changed() => { + if *shutdown.borrow() { + log::info!("[{}] GeoIP MMDB worker received shutdown signal", worker_name); + break; + } + } + } + } + }) + } +} + +async fn sync_mmdb( + mmdb_base_url: &str, + versions_url: &str, + mmdb_path: Option, + current_version: Option, +) -> Result> { + let local_path = mmdb_path.ok_or_else(|| anyhow!("MMDB path not configured for GeoIP MMDB worker"))?; + + if versions_url.is_empty() { + log::debug!("No versions URL provided, downloading directly from {}", mmdb_base_url); + + let bytes = if mmdb_base_url.starts_with("http://") || mmdb_base_url.starts_with("https://") { + download_mmdb(mmdb_base_url).await? + } else { + let src_path = PathBuf::from(mmdb_base_url); + tokio::fs::read(&src_path) + .await + .with_context(|| format!("Failed to read MMDB from {:?}", src_path))? + }; + + if let Some(parent) = local_path.parent() { + tokio::fs::create_dir_all(parent) + .await + .with_context(|| format!("Failed to create MMDB directory {:?}", parent))?; + } + + tokio::fs::write(&local_path, &bytes) + .await + .with_context(|| format!("Failed to write MMDB to {:?}", local_path))?; + + log::info!("GeoIP MMDB written to {:?}", local_path); + crate::threat::refresh_geoip_mmdb().await?; + return Ok(Some("direct".to_string())); + } + + let versions_text = if versions_url.starts_with("http://") || versions_url.starts_with("https://") { + let client = get_global_reqwest_client() + .context("Failed to get HTTP client for versions download")?; + let resp = client + .get(versions_url) + .send() + .await + .with_context(|| format!("Failed to download versions file from {}", versions_url))?; + let status = resp.status(); + if !status.is_success() { + return Err(anyhow!( + "Versions download failed: status {} from {}", + status, + versions_url + )); + } + resp.text().await.context("Failed to read versions body")? + } else { + tokio::fs::read_to_string(versions_url) + .await + .with_context(|| format!("Failed to read versions file from {}", versions_url))? + }; + + let latest = parse_latest_version(&versions_text) + .ok_or_else(|| anyhow!("Failed to parse latest version from versions file"))?; + + if let Some(ref curr) = current_version { + if curr == &latest { + return Ok(current_version); + } + } + + let base = mmdb_base_url.trim_end_matches('/'); + let bytes = if base.starts_with("http://") || base.starts_with("https://") { + let url = format!("{}/{}", base, latest); + download_mmdb(&url).await? + } else { + let src_path = PathBuf::from(base).join(&latest); + tokio::fs::read(&src_path) + .await + .with_context(|| format!("Failed to read MMDB from {:?}", src_path))? + }; + + if let Some(parent) = local_path.parent() { + tokio::fs::create_dir_all(parent) + .await + .with_context(|| format!("Failed to create MMDB directory {:?}", parent))?; + } + + tokio::fs::write(&local_path, &bytes) + .await + .with_context(|| format!("Failed to write MMDB to {:?}", local_path))?; + + log::info!("GeoIP MMDB written to {:?}", local_path); + crate::threat::refresh_geoip_mmdb().await?; + Ok(Some(latest)) +} + +fn parse_latest_version(text: &str) -> Option { + for line in text.lines() { + if let Some(rest) = line.strip_prefix("latest=") { + return Some(rest.trim().to_string()); + } + } + None +} + +async fn download_mmdb(url: &str) -> Result> { + let client = get_global_reqwest_client() + .context("Failed to get HTTP client for MMDB download")?; + let resp = client + .get(url) + .send() + .await + .with_context(|| format!("Failed to download MMDB from {}", url))?; + let status = resp.status(); + if !status.is_success() { + return Err(anyhow!( + "MMDB download failed: status {} from {}", + status, + url + )); + } + let bytes = resp.bytes().await.context("Failed to read MMDB body")?; + Ok(bytes.to_vec()) +} + + diff --git a/src/worker/mod.rs b/src/worker/mod.rs index 635bf2c..85def86 100644 --- a/src/worker/mod.rs +++ b/src/worker/mod.rs @@ -1,6 +1,8 @@ pub mod certificate; pub mod config; +pub mod geoip_mmdb; pub mod log; pub mod manager; +pub mod threat_mmdb; pub use manager::{Worker, WorkerConfig, WorkerManager}; diff --git a/src/worker/threat_mmdb.rs b/src/worker/threat_mmdb.rs new file mode 100644 index 0000000..26d1d5c --- /dev/null +++ b/src/worker/threat_mmdb.rs @@ -0,0 +1,246 @@ +use std::path::PathBuf; + +use anyhow::{anyhow, Context, Result}; +use tokio::sync::watch; +use tokio::task::JoinHandle; +use tokio::time::{interval, Duration, MissedTickBehavior}; + +use crate::http_client::get_global_reqwest_client; +use crate::worker::Worker; + +/// Periodically checks versions.txt and downloads the latest Threat MMDB, +/// then asks the threat module to reload it from disk. +pub struct ThreatMmdbWorker { + interval_secs: u64, + mmdb_base_url: String, + versions_url: String, + mmdb_path: Option, +} + +impl ThreatMmdbWorker { + pub fn new( + interval_secs: u64, + mmdb_base_url: String, + versions_url: String, + mmdb_path: Option, + ) -> Self { + Self { + interval_secs, + mmdb_base_url, + versions_url, + mmdb_path, + } + } +} + +impl Worker for ThreatMmdbWorker { + fn name(&self) -> &str { + "threat_mmdb" + } + + fn run(&self, mut shutdown: watch::Receiver) -> JoinHandle<()> { + let interval_secs = self.interval_secs; + let mmdb_base_url = self.mmdb_base_url.clone(); + let versions_url = self.versions_url.clone(); + let mmdb_path = self.mmdb_path.clone(); + + tokio::spawn(async move { + let worker_name = "threat_mmdb".to_string(); + let mut current_version: Option = None; + + log::info!( + "[{}] Starting Threat MMDB worker (interval: {}s)", + worker_name, + interval_secs + ); + + // Initial sync on startup + match sync_mmdb( + &mmdb_base_url, + &versions_url, + mmdb_path.clone(), + current_version.clone(), + ) + .await + { + Ok(new_ver) => { + current_version = new_ver; + } + Err(e) => { + log::warn!( + "[{}] Initial Threat MMDB sync failed: {}", + worker_name, + e + ); + } + } + + let mut ticker = interval(Duration::from_secs(interval_secs)); + ticker.set_missed_tick_behavior(MissedTickBehavior::Delay); + + loop { + tokio::select! { + _ = ticker.tick() => { + match sync_mmdb( + &mmdb_base_url, + &versions_url, + mmdb_path.clone(), + current_version.clone(), + ).await { + Ok(new_ver) => { + if new_ver != current_version { + current_version = new_ver; + log::info!("[{}] Threat MMDB updated to latest version", worker_name); + } else { + log::debug!("[{}] Threat MMDB already at latest version", worker_name); + } + } + Err(e) => { + log::warn!("[{}] Periodic Threat MMDB sync failed: {}", worker_name, e); + } + } + } + _ = shutdown.changed() => { + if *shutdown.borrow() { + log::info!("[{}] Threat MMDB worker received shutdown signal", worker_name); + break; + } + } + } + } + }) + } +} + +/// One sync pass: check versions.txt, download a new MMDB if needed, write it +/// to disk, and ask the threat module to reload it. +async fn sync_mmdb( + mmdb_base_url: &str, + versions_url: &str, + mmdb_path: Option, + current_version: Option, +) -> Result> { + // If no versions_url, treat mmdb_base_url as direct file URL + if versions_url.is_empty() { + let local_path = mmdb_path.ok_or_else(|| anyhow!("MMDB path not configured"))?; + + let bytes = if mmdb_base_url.starts_with("http://") || mmdb_base_url.starts_with("https://") { + download_mmdb(mmdb_base_url).await? + } else { + let file_path = mmdb_base_url.strip_prefix("file://").unwrap_or(mmdb_base_url); + tokio::fs::read(file_path) + .await + .with_context(|| format!("Failed to read MMDB from {}", file_path))? + }; + + if let Some(parent) = local_path.parent() { + tokio::fs::create_dir_all(parent) + .await + .with_context(|| format!("Failed to create MMDB directory {:?}", parent))?; + } + + tokio::fs::write(&local_path, &bytes) + .await + .with_context(|| format!("Failed to write MMDB to {:?}", local_path))?; + + log::info!("Threat MMDB written to {:?}", local_path); + crate::threat::refresh_threat_mmdb().await?; + return Ok(Some("direct".to_string())); + } + + let versions_text = if versions_url.starts_with("http://") || versions_url.starts_with("https://") { + let client = get_global_reqwest_client() + .context("Failed to get HTTP client for versions download")?; + let resp = client + .get(versions_url) + .send() + .await + .with_context(|| format!("Failed to download versions file from {}", versions_url))?; + let status = resp.status(); + if !status.is_success() { + return Err(anyhow!( + "Versions download failed: status {} from {}", + status, + versions_url + )); + } + resp.text().await.context("Failed to read versions body")? + } else { + // Strip file:// prefix if present + let file_path = versions_url.strip_prefix("file://").unwrap_or(versions_url); + tokio::fs::read_to_string(file_path) + .await + .with_context(|| format!("Failed to read versions file from {}", file_path))? + }; + + let latest = parse_latest_version(&versions_text) + .ok_or_else(|| anyhow!("Failed to parse latest version from versions file"))?; + + if let Some(ref curr) = current_version { + if curr == &latest { + // Already on latest, nothing to do. + return Ok(current_version); + } + } + + let local_path = mmdb_path.ok_or_else(|| anyhow!("MMDB path not configured for Threat MMDB worker"))?; + + let base = mmdb_base_url.trim_end_matches('/'); + let bytes = if base.starts_with("http://") || base.starts_with("https://") { + let url = format!("{}/{}", base, latest); + download_mmdb(&url).await? + } else { + // Strip file:// prefix if present + let file_base = base.strip_prefix("file://").unwrap_or(base); + let src_path = PathBuf::from(file_base).join(&latest); + tokio::fs::read(&src_path) + .await + .with_context(|| format!("Failed to read MMDB from {:?}", src_path))? + }; + + if let Some(parent) = local_path.parent() { + tokio::fs::create_dir_all(parent) + .await + .with_context(|| format!("Failed to create MMDB directory {:?}", parent))?; + } + + tokio::fs::write(&local_path, &bytes) + .await + .with_context(|| format!("Failed to write MMDB to {:?}", local_path))?; + + log::info!("Threat MMDB written to {:?}", local_path); + + // Ask the threat module to reload from the updated path. + crate::threat::refresh_threat_mmdb().await?; + + Ok(Some(latest)) +} + +fn parse_latest_version(text: &str) -> Option { + for line in text.lines() { + if let Some(rest) = line.strip_prefix("latest=") { + return Some(rest.trim().to_string()); + } + } + None +} + +async fn download_mmdb(url: &str) -> Result> { + let client = get_global_reqwest_client() + .context("Failed to get HTTP client for MMDB download")?; + let resp = client + .get(url) + .send() + .await + .with_context(|| format!("Failed to download MMDB from {}", url))?; + let status = resp.status(); + if !status.is_success() { + return Err(anyhow!( + "MMDB download failed: status {} from {}", + status, + url + )); + } + let bytes = resp.bytes().await.context("Failed to read MMDB body")?; + Ok(bytes.to_vec()) +}