diff --git a/crates/checkpoint-downloader/src/downloader.rs b/crates/checkpoint-downloader/src/downloader.rs index f8152232a6..849453e781 100644 --- a/crates/checkpoint-downloader/src/downloader.rs +++ b/crates/checkpoint-downloader/src/downloader.rs @@ -463,7 +463,12 @@ impl ParallelCheckpointDownloaderInner { sequence_number += 1; } - let Some(entry) = self.checkpoint_receiver.recv().await else { + let maybe_entry = tokio::select! { + _ = self.cancellation_token.cancelled() => None, + entry = self.checkpoint_receiver.recv() => entry, + }; + let Some(entry) = maybe_entry else { + tracing::info!("no checkpoint received, stopping checkpoint fetcher"); break; }; diff --git a/crates/walrus-service/node_config_example.yaml b/crates/walrus-service/node_config_example.yaml index 3885a4cf4b..08323fa057 100644 --- a/crates/walrus-service/node_config_example.yaml +++ b/crates/walrus-service/node_config_example.yaml @@ -164,6 +164,11 @@ event_processor_config: max_consecutive_pool_monitor_failures: 10 event_stream_catchup_min_checkpoint_lag: 20000 sampled_tracing_interval_secs: 3600 + enable_runtime_catchup: true + runtime_catchup_lag_threshold: 20000 + runtime_lag_check_interval_secs: 300 + catchup_coordination_timeout_secs: 3000 + catchup_processing_timeout_secs: 3000 use_legacy_event_provider: false disable_event_blob_writer: false commission_rate: 6000 diff --git a/crates/walrus-service/src/event/event_processor.rs b/crates/walrus-service/src/event/event_processor.rs index ef8be2e57e..6854bed876 100644 --- a/crates/walrus-service/src/event/event_processor.rs +++ b/crates/walrus-service/src/event/event_processor.rs @@ -8,6 +8,7 @@ pub mod catchup; pub mod checkpoint; pub mod client; pub mod config; +pub mod coordination; pub mod db; pub mod metrics; pub mod package_store; diff --git a/crates/walrus-service/src/event/event_processor/catchup.rs b/crates/walrus-service/src/event/event_processor/catchup.rs index b2ae7bf674..5896e7f4b5 100644 --- a/crates/walrus-service/src/event/event_processor/catchup.rs +++ b/crates/walrus-service/src/event/event_processor/catchup.rs @@ -3,7 +3,7 @@ //! Catchup module for catching up the event processor with the network. -use std::{fs, path::PathBuf}; +use std::{fs, path::PathBuf, sync::Arc, time::Duration}; use sui_types::{ committee::Committee, @@ -22,6 +22,7 @@ use crate::event::{ event_blob::EventBlob, event_processor::{ config::{SuiClientSet, SystemConfig}, + coordination::CatchupCoordinationState, db::EventProcessorStores, }, events::{IndexedStreamEvent, InitState}, @@ -57,6 +58,28 @@ impl std::fmt::Debug for DownloadedBlob { } } +/// Configuration for runtime catchup coordination. +#[derive(Clone, Debug)] +pub struct CatchupRuntimeConfig { + /// The coordination state for the catchup process. + pub coordination_state: Arc, + /// The timeout for the coordination process. + pub coordination_timeout: Duration, + /// The timeout for the processing process. + pub processing_timeout: Duration, +} + +/// Structured error type for catchup. +#[derive(thiserror::Error, Debug)] +pub enum CatchupError { + /// A recoverable error that can be retried. + #[error("recoverable catchup error: {0}")] + Recoverable(#[from] anyhow::Error), + /// A non-recoverable error that cannot be retried. + #[error("non-recoverable catchup error: {0}")] + NonRecoverable(anyhow::Error), +} + /// Manages the catchup process for events in the event processor using event blobs. /// /// This manager handles the process of catching up the local event store with the network state. @@ -71,6 +94,7 @@ pub struct EventBlobCatchupManager { system_config: SystemConfig, recovery_path: PathBuf, metrics: EventCatchupManagerMetrics, + catchup_runtime_config: CatchupRuntimeConfig, } impl std::fmt::Debug for EventBlobCatchupManager { @@ -91,6 +115,7 @@ impl EventBlobCatchupManager { system_config: SystemConfig, recovery_path: PathBuf, registry: &Registry, + catchup_runtime_config: CatchupRuntimeConfig, ) -> Self { let metrics = EventCatchupManagerMetrics::new(registry); Self { @@ -99,31 +124,27 @@ impl EventBlobCatchupManager { system_config, recovery_path, metrics, + catchup_runtime_config, } } /// Checks if the event processor is lagging behind the network and performs catchup if needed. pub async fn catchup(&self, lag_threshold: u64) -> anyhow::Result<()> { - let current_checkpoint = self.get_current_checkpoint()?; - let latest_checkpoint = self.get_latest_network_checkpoint().await; - - let current_lag = self.calculate_lag(current_checkpoint, latest_checkpoint)?; + let current_lag = self.get_current_lag().await?; if current_lag > lag_threshold { - tracing::info!( - current_checkpoint, - latest_checkpoint, - lag_threshold, - "performing catchup - lag is above threshold" - ); - self.perform_catchup().await?; + tracing::info!(lag_threshold, "performing catchup - lag is above threshold"); + match self.perform_catchup().await { + Ok(()) => {} + Err(CatchupError::Recoverable(error)) => { + tracing::warn!(?error, "recoverable error during catchup"); + } + Err(CatchupError::NonRecoverable(error)) => { + return Err(error); + } + } } else { - tracing::info!( - current_checkpoint, - latest_checkpoint, - lag_threshold, - "skipping catchup - lag is below threshold" - ); + tracing::info!(lag_threshold, "skipping catchup - lag is below threshold"); } Ok(()) @@ -184,16 +205,39 @@ impl EventBlobCatchupManager { Ok(lag) } + /// Gets the current lag between the local store and the network + pub async fn get_current_lag(&self) -> anyhow::Result { + let current_checkpoint = self.get_current_checkpoint()?; + let latest_checkpoint = self.get_latest_network_checkpoint().await; + self.calculate_lag(current_checkpoint, latest_checkpoint) + } + /// Performs the catchup operation using event blobs - pub async fn perform_catchup(&self) -> anyhow::Result<()> { - match self.catchup_using_event_blobs().await { + pub async fn perform_catchup(&self) -> Result<(), CatchupError> { + let coordination_state = self.catchup_runtime_config.coordination_state.clone(); + if !coordination_state.try_start_catchup().await { + tracing::info!("runtime catchup already active, skipping new run"); + return Ok(()); + } + + let result = self.catchup_using_event_blobs().await; + coordination_state.mark_catchup_inactive(); + + match result { Ok(()) => { tracing::info!("successfully caught up using event blobs"); Ok(()) } - Err(error) => { - tracing::error!(?error, "failed to catch up using event blobs"); - Err(error) + Err(CatchupError::Recoverable(error)) => { + tracing::warn!(?error, "recoverable error during catchup using event blobs"); + Err(CatchupError::Recoverable(error)) + } + Err(CatchupError::NonRecoverable(error)) => { + tracing::error!( + ?error, + "non-recoverable error during catchup using event blobs" + ); + Err(CatchupError::NonRecoverable(error)) } } } @@ -218,27 +262,99 @@ impl EventBlobCatchupManager { /// starting from `N+1`). If however, the local store is empty, the catch-up will store all /// events from the earliest available event blob (in which case the first stored event index /// could be greater than `0`). - async fn catchup_using_event_blobs(&self) -> anyhow::Result<()> { + async fn catchup_using_event_blobs(&self) -> Result<(), CatchupError> { #[cfg(msim)] sui_macros::fail_point!("fail_point_catchup_using_event_blobs_start"); + let coordination_state = self.catchup_runtime_config.coordination_state.clone(); - let next_checkpoint = self.get_next_checkpoint()?; + let next_checkpoint = self + .get_next_checkpoint() + .map_err(|error| CatchupError::Recoverable(anyhow::Error::from(error)))?; tracing::info!(next_checkpoint, "starting event catchup using event blobs"); - self.ensure_recovery_directory()?; + self.ensure_recovery_directory() + .map_err(CatchupError::Recoverable)?; - let blobs = self - .collect_event_blobs_for_catchup(next_checkpoint) - .await?; - let next_event_index = self.get_next_event_index()?; + let blobs = match self.collect_event_blobs_for_catchup(next_checkpoint).await { + Ok(blobs) => blobs, + Err(error) => { + tracing::error!(error = ?error, "failed to collect event blobs for catchup"); + return Err(CatchupError::Recoverable(anyhow::anyhow!( + "failed to collect event blobs for catchup" + ))); + } + }; - self.process_event_blobs(blobs, next_event_index).await?; + if blobs.is_empty() { + tracing::info!( + "no event blobs collected for catchup; skipping stop of checkpoint tailing" + ); + return Ok(()); + } - Ok(()) + if let Err(e) = coordination_state.start_catchup_processing_phase().await { + tracing::error!( + error = ?e, + "failed to send stop message to checkpoint tailing" + ); + return Err(CatchupError::NonRecoverable(anyhow::anyhow!( + "failed to send stop message to checkpoint tailing" + ))); + } + + tracing::info!("waiting for tailing to stop before processing blobs"); + if !coordination_state + .wait_for_tailing_stopped(self.catchup_runtime_config.coordination_timeout) + .await + { + tracing::error!( + timeout_secs = self.catchup_runtime_config.coordination_timeout.as_secs(), + "timed out waiting for checkpoint tailing to stop" + ); + return Err(CatchupError::NonRecoverable(anyhow::anyhow!( + "timed out waiting for checkpoint tailing to stop" + ))); + } + + let next_event_index = self + .get_next_event_index() + .map_err(|error| CatchupError::NonRecoverable(anyhow::Error::from(error)))?; + let processing_result = tokio::time::timeout( + self.catchup_runtime_config.processing_timeout, + self.process_event_blobs(blobs, next_event_index), + ) + .await; + + let mut recoverable_error: Option = None; + match processing_result { + Ok(Ok(num_events_recovered)) => { + tracing::info!("successfully processed {} events", num_events_recovered); + } + Ok(Err(error)) => { + tracing::error!(error = ?error, "failed to process event blobs"); + recoverable_error = Some(error); + } + Err(_) => { + let timeout_error = anyhow::anyhow!("processing event blobs timed out"); + tracing::error!(error = ?timeout_error, "processing event blobs timed out"); + recoverable_error = Some(timeout_error); + } + } + + if let Err(e) = coordination_state.complete_catchup().await { + tracing::error!(error = ?e, "failed to send restart message to checkpoint tailing"); + Err(CatchupError::NonRecoverable(anyhow::anyhow!( + "failed to send restart message to checkpoint tailing" + ))) + } else if let Some(error) = recoverable_error { + Err(CatchupError::Recoverable(error)) + } else { + Ok(()) + } } /// Gets the next checkpoint sequence number that is after the latest checkpoint in the /// checkpoint store. - fn get_next_checkpoint(&self) -> Result, TypedStoreError> { + pub fn get_next_checkpoint(&self) -> Result, TypedStoreError> { Ok(self .stores .checkpoint_store @@ -249,7 +365,7 @@ impl EventBlobCatchupManager { } /// Gets the next event index that is after the latest event index in the event store. - fn get_next_event_index(&self) -> Result, TypedStoreError> { + pub fn get_next_event_index(&self) -> Result, TypedStoreError> { Ok(self .stores .event_store @@ -260,7 +376,7 @@ impl EventBlobCatchupManager { } /// Ensures the recovery directory exists - fn ensure_recovery_directory(&self) -> anyhow::Result<()> { + pub fn ensure_recovery_directory(&self) -> anyhow::Result<()> { if !self.recovery_path.exists() { fs::create_dir_all(&self.recovery_path)?; } @@ -281,7 +397,7 @@ impl EventBlobCatchupManager { /// This function creates a client to download event blobs up to a specified /// checkpoint. The blobs are stored in the provided recovery path. #[cfg(feature = "client")] - async fn collect_event_blobs_for_catchup( + pub async fn collect_event_blobs_for_catchup( &self, starting_checkpoint_to_process: Option, ) -> anyhow::Result> { @@ -313,11 +429,18 @@ impl EventBlobCatchupManager { Ok(blob_ids) } - async fn process_event_blobs( + /// Processes event blobs and stores them in the event store. + /// + /// This function performs the following steps: + /// 1. Iterates through the event blobs in reverse order (oldest to newest). + /// 2. Processes each blob: + /// - Collects relevant events that maintain a continuous sequence with the local store. + /// - Stores the events in the event store. + pub async fn process_event_blobs( &self, blobs: Vec, next_event_index: Option, - ) -> anyhow::Result<()> { + ) -> anyhow::Result { tracing::info!("starting to process event blobs"); let mut num_events_recovered = 0; @@ -372,7 +495,7 @@ impl EventBlobCatchupManager { } tracing::info!("recovered {} events from event blobs", num_events_recovered); - Ok(()) + Ok(num_events_recovered) } async fn process_single_blob( diff --git a/crates/walrus-service/src/event/event_processor/config.rs b/crates/walrus-service/src/event/event_processor/config.rs index ea03405cac..2011e58b3d 100644 --- a/crates/walrus-service/src/event/event_processor/config.rs +++ b/crates/walrus-service/src/event/event_processor/config.rs @@ -106,6 +106,25 @@ pub struct EventProcessorConfig { #[serde_as(as = "DurationSeconds")] #[serde(rename = "sampled_tracing_interval_secs")] pub sampled_tracing_interval: Duration, + /// Enable runtime catchup functionality. + pub enable_runtime_catchup: bool, + /// Runtime catchup lag threshold for triggering catchup at runtime. + /// When the lag exceeds this threshold during runtime, the catchup manager + /// will be invoked in parallel with the checkpoint downloader to accelerate + /// recovery. Should typically be the same as event_stream_catchup_min_checkpoint_lag. + pub runtime_catchup_lag_threshold: u64, + /// Interval for checking lag during runtime to trigger catchup if needed. + #[serde_as(as = "DurationSeconds")] + #[serde(rename = "runtime_lag_check_interval_secs")] + pub runtime_lag_check_interval: Duration, + /// Timeout for catchup operations to prevent indefinite blocking. + #[serde_as(as = "DurationSeconds")] + #[serde(rename = "catchup_coordination_timeout_secs")] + pub catchup_coordination_timeout: Duration, + /// Timeout for catchup processing to prevent indefinite blocking. + #[serde_as(as = "DurationSeconds")] + #[serde(rename = "catchup_processing_timeout_secs")] + pub catchup_processing_timeout: Duration, } impl Default for EventProcessorConfig { @@ -116,6 +135,11 @@ impl Default for EventProcessorConfig { adaptive_downloader_config: Default::default(), event_stream_catchup_min_checkpoint_lag: 20_000, sampled_tracing_interval: Duration::from_secs(3600), + runtime_catchup_lag_threshold: 20_000, + runtime_lag_check_interval: Duration::from_secs(300), + enable_runtime_catchup: true, + catchup_coordination_timeout: Duration::from_secs(3000), + catchup_processing_timeout: Duration::from_secs(3000), } } } diff --git a/crates/walrus-service/src/event/event_processor/coordination.rs b/crates/walrus-service/src/event/event_processor/coordination.rs new file mode 100644 index 0000000000..30dd652e00 --- /dev/null +++ b/crates/walrus-service/src/event/event_processor/coordination.rs @@ -0,0 +1,267 @@ +// Copyright (c) Walrus Foundation +// SPDX-License-Identifier: Apache-2.0 + +//! Coordination module for managing operation between checkpoint downloader and catchup manager. + +use std::{sync::Arc, time::Instant}; + +use tokio::sync::{Mutex, Notify, mpsc}; + +/// Messages for coordinating between catchup manager and checkpoint tailing +#[derive(Debug, Clone)] +pub enum CoordinationMessage { + /// Stop checkpoint tailing for catchup processing + StopCheckpointTailing, + /// Restart checkpoint tailing after catchup completion + RestartCheckpointTailing, + /// Catchup operation failed, resume normal operation + CatchupFailed, +} + +/// Coordination state between checkpoint downloader and catchup manager. +/// +/// This struct manages the synchronization between the checkpoint downloader and +/// catchup manager during runtime catchup operations using message-passing. +/// The coordination works in phases: +/// 1. Download phase: Event blobs are downloaded in parallel with checkpoint tailing +/// 2. Processing phase: Send StopCheckpointTailing message, process event blobs +/// 3. Resume phase: Send RestartCheckpointTailing message (picks up from new latest checkpoint) +#[derive(Debug)] +pub struct CatchupCoordinationState { + /// Channel for sending coordination messages to checkpoint tailing task + pub coordination_tx: mpsc::UnboundedSender, + /// Whether catchup is currently active + pub catchup_active: Arc, + /// Timestamp when catchup was last started + pub last_catchup_start: Arc>>, + /// Notifier signaled when checkpoint tailing has fully stopped + tailing_stopped_notify: Arc, + /// Flag indicating whether checkpoint tailing is currently stopped + is_tailing_stopped: Arc, +} + +impl CatchupCoordinationState { + /// Attempt to mark catchup as active. Returns true if we successfully + /// transitioned from inactive to active, false if another catchup is already running. + pub async fn try_start_catchup(&self) -> bool { + match self.catchup_active.compare_exchange( + false, + true, + std::sync::atomic::Ordering::AcqRel, + std::sync::atomic::Ordering::Acquire, + ) { + Ok(_) => { + *self.last_catchup_start.lock().await = Some(Instant::now()); + true + } + Err(_) => false, + } + } + + /// Explicitly mark catchup as inactive. + pub fn mark_catchup_inactive(&self) { + self.catchup_active + .store(false, std::sync::atomic::Ordering::Release); + } + + /// Creates a new coordination state instance and returns it with a receiver. + pub fn new() -> (Self, mpsc::UnboundedReceiver) { + let (coordination_tx, coordination_rx) = mpsc::unbounded_channel(); + + ( + Self { + coordination_tx, + catchup_active: Arc::new(std::sync::atomic::AtomicBool::new(false)), + last_catchup_start: Arc::new(Mutex::new(None)), + tailing_stopped_notify: Arc::new(Notify::new()), + is_tailing_stopped: Arc::new(std::sync::atomic::AtomicBool::new(false)), + }, + coordination_rx, + ) + } + + /// Mark that checkpoint tailing has started. + pub fn mark_tailing_started(&self) { + self.is_tailing_stopped + .store(false, std::sync::atomic::Ordering::Release); + } + + /// Notify that checkpoint tailing has fully stopped and wake any waiters. + pub fn notify_tailing_stopped(&self) { + self.is_tailing_stopped + .store(true, std::sync::atomic::Ordering::Release); + self.tailing_stopped_notify.notify_one(); + } + + /// Wait until checkpoint tailing is stopped, with timeout. + /// Returns true if stopped, false on timeout. + pub async fn wait_for_tailing_stopped(&self, timeout: std::time::Duration) -> bool { + if self + .is_tailing_stopped + .load(std::sync::atomic::Ordering::Acquire) + { + return true; + } + match tokio::time::timeout(timeout, self.tailing_stopped_notify.notified()).await { + Ok(()) => true, + Err(_) => false, + } + } + + /// Start the download phase of catchup. + /// During this phase, event blobs are downloaded in parallel with checkpoint tailing. + #[cfg(test)] + pub async fn start_catchup_download_phase(&self) { + tracing::info!( + "starting catchup download phase - checkpoint tailing continues in parallel" + ); + + self.catchup_active + .store(true, std::sync::atomic::Ordering::Release); + *self.last_catchup_start.lock().await = Some(Instant::now()); + } + + /// Transition from download phase to processing phase. + /// This sends a message to stop checkpoint tailing. + pub async fn start_catchup_processing_phase( + &self, + ) -> Result<(), mpsc::error::SendError> { + tracing::info!( + "starting catchup processing phase - sending stop message to checkpoint tailing" + ); + + self.coordination_tx + .send(CoordinationMessage::StopCheckpointTailing)?; + Ok(()) + } + + /// Complete catchup processing and signal for checkpoint tailing restart. + /// This sends a message to restart checkpoint tailing from the new latest checkpoint. + pub async fn complete_catchup( + &self, + ) -> Result<(), mpsc::error::SendError> { + tracing::info!("completing catchup - sending restart message to checkpoint tailing"); + + self.catchup_active + .store(false, std::sync::atomic::Ordering::Release); + self.coordination_tx + .send(CoordinationMessage::RestartCheckpointTailing)?; + Ok(()) + } + + /// Signal catchup failure and resume normal operation. + pub async fn catchup_failed(&self) -> Result<(), mpsc::error::SendError> { + tracing::warn!("catchup failed - sending failure message to checkpoint tailing"); + + self.catchup_active + .store(false, std::sync::atomic::Ordering::Release); + self.coordination_tx + .send(CoordinationMessage::CatchupFailed)?; + Ok(()) + } + + /// Check if catchup is currently active. + pub fn is_catchup_active(&self) -> bool { + self.catchup_active + .load(std::sync::atomic::Ordering::Acquire) + } + + /// Get statistics about the current coordination state. + pub async fn get_stats(&self) -> CoordinationStats { + let last_start = *self.last_catchup_start.lock().await; + + CoordinationStats { + catchup_active: self.is_catchup_active(), + time_since_last_catchup: last_start.map(|t| t.elapsed()), + } + } +} + +impl Default for CatchupCoordinationState { + fn default() -> Self { + Self::new().0 + } +} + +/// Statistics about the current coordination state. +#[derive(Debug, Clone)] +pub struct CoordinationStats { + /// Whether catchup is currently active + pub catchup_active: bool, + /// Time elapsed since last catchup started + pub time_since_last_catchup: Option, +} + +#[cfg(test)] +mod tests { + use std::time::Duration; + + use tokio::time::sleep; + + use super::*; + + #[tokio::test] + async fn test_coordination_message_passing() { + let (state, mut rx) = CatchupCoordinationState::new(); + + // Test sending stop message + state.start_catchup_processing_phase().await.unwrap(); + + match rx.recv().await { + Some(CoordinationMessage::StopCheckpointTailing) => { + // Expected + } + other => panic!("Expected StopCheckpointTailing, got {:?}", other), + } + + // Test sending restart message + state.complete_catchup().await.unwrap(); + + match rx.recv().await { + Some(CoordinationMessage::RestartCheckpointTailing) => { + // Expected + } + other => panic!("Expected RestartCheckpointTailing, got {:?}", other), + } + } + + #[tokio::test] + async fn test_cooldown_management_like_behavior() { + let (state, _rx) = CatchupCoordinationState::new(); + let short_wait = Duration::from_millis(10); + + // Initially no catchup and no timestamp + let stats = state.get_stats().await; + assert!(!stats.catchup_active); + assert!(stats.time_since_last_catchup.is_none()); + + // Start catchup download phase should set active and timestamp + state.start_catchup_download_phase().await; + let stats = state.get_stats().await; + assert!(stats.catchup_active); + assert!(stats.time_since_last_catchup.is_some()); + + // After a short wait, elapsed should still be some (monotonic increase implied) + sleep(short_wait + Duration::from_millis(5)).await; + let stats = state.get_stats().await; + assert!(stats.catchup_active); + assert!(stats.time_since_last_catchup.is_some()); + } + + #[tokio::test] + async fn test_coordination_stats() { + let (state, _rx) = CatchupCoordinationState::new(); + + // Initial stats + let stats = state.get_stats().await; + assert!(!stats.catchup_active); + assert_eq!(stats.time_since_last_catchup, None); + + // After starting catchup + state.start_catchup_download_phase().await; + + let stats = state.get_stats().await; + assert!(stats.catchup_active); + assert!(stats.time_since_last_catchup.is_some()); + } +} diff --git a/crates/walrus-service/src/event/event_processor/metrics.rs b/crates/walrus-service/src/event/event_processor/metrics.rs index a1ef67e5a1..4b8d2c40db 100644 --- a/crates/walrus-service/src/event/event_processor/metrics.rs +++ b/crates/walrus-service/src/event/event_processor/metrics.rs @@ -13,6 +13,8 @@ walrus_utils::metrics::define_metric_set! { event_processor_latest_downloaded_checkpoint: IntGauge[], #[help = "The number of checkpoints downloaded. Useful for computing the download rate"] event_processor_total_downloaded_checkpoints: IntCounter[], + #[help = "Current lag in checkpoints detected during runtime monitoring"] + runtime_lag_current: IntGauge[], } } diff --git a/crates/walrus-service/src/event/event_processor/processor.rs b/crates/walrus-service/src/event/event_processor/processor.rs index 44feed559d..8cc484e0b3 100644 --- a/crates/walrus-service/src/event/event_processor/processor.rs +++ b/crates/walrus-service/src/event/event_processor/processor.rs @@ -12,13 +12,26 @@ use sui_types::{ full_checkpoint_content::CheckpointData, messages_checkpoint::VerifiedCheckpoint, }; -use tokio::{select, sync::Mutex, time::sleep}; +use tokio::{ + select, + sync::{Mutex, mpsc}, + task::JoinHandle, + time::sleep, +}; use tokio_util::sync::CancellationToken; use typed_store::{Map, TypedStoreError}; use walrus_core::ensure; -use walrus_utils::{metrics::Registry, tracing_sampled}; +use walrus_utils::{ + metrics::{Registry, monitored_scope}, + tracing_sampled, +}; -use super::{metrics::EventProcessorMetrics, package_store::LocalDBPackageStore}; +use super::{ + catchup, + coordination::CatchupCoordinationState, + metrics::EventProcessorMetrics, + package_store::LocalDBPackageStore, +}; use crate::event::{ event_processor::{ bootstrap::get_bootstrap_committee_and_checkpoint, @@ -26,6 +39,7 @@ use crate::event::{ checkpoint::CheckpointProcessor, client::ClientManager, config::{EventProcessorConfig, EventProcessorRuntimeConfig, SystemConfig}, + coordination::CoordinationMessage, db::EventProcessorStores, }, events::{IndexedStreamEvent, InitState, StreamEventWithInitState}, @@ -33,6 +47,11 @@ use crate::event::{ /// The maximum number of events to poll per poll. const MAX_EVENTS_PER_POLL: usize = 1000; +/// Startup catchup timeout (seconds) used during initial catchup in EventProcessor::new +const STARTUP_CATCHUP_TIMEOUT_SECS: u64 = 5 * 60; +/// Runtime catchup processing timeout (seconds) used during runtime catchup in +/// EventProcessor::start_runtime_catchup_monitoring +const RUNTIME_CATCHUP_PROCESSING_TIMEOUT_SECS: u64 = 5 * 60 * 60; /// The event processor. #[derive(Clone)] @@ -59,6 +78,14 @@ pub struct EventProcessor { pub checkpoint_processor: CheckpointProcessor, /// The interval at which to sample high-frequency tracing logs. pub sampled_tracing_interval: Duration, + /// Configuration for event processing. + pub config: EventProcessorConfig, + /// System configuration. + pub system_config: SystemConfig, + /// Recovery path for catchup operations. + pub recovery_path: std::path::PathBuf, + /// Metrics registry for creating new metric instances. + pub metrics_registry: Registry, } impl fmt::Debug for EventProcessor { @@ -70,6 +97,10 @@ impl fmt::Debug for EventProcessor { .field("committee_store", &self.stores.committee_store) .field("event_store", &self.stores.event_store) .field("sampled_tracing_interval", &self.sampled_tracing_interval) + .field( + "runtime_catchup_enabled", + &self.config.enable_runtime_catchup, + ) .finish() } } @@ -128,6 +159,10 @@ impl EventProcessor { package_store, checkpoint_processor, sampled_tracing_interval: config.sampled_tracing_interval, + config: config.clone(), + system_config: system_config.clone(), + recovery_path: runtime_config.db_path.join("recovery"), + metrics_registry: metrics_registry.clone(), }; if event_processor.stores.checkpoint_store.is_empty() { @@ -145,43 +180,7 @@ impl EventProcessor { .checkpoint_processor .update_cached_latest_checkpoint_seq_number(current_checkpoint); - let clients = client_manager.into_client_set(); - - let catchup_manager = EventBlobCatchupManager::new( - event_processor.stores.clone(), - clients, - system_config, - runtime_config.db_path.join("recovery"), - metrics_registry, - ); - if let Err(e) = catchup_manager - .catchup(config.event_stream_catchup_min_checkpoint_lag) - .await - { - tracing::error!("failed to catchup using event blobs: {e}"); - } - - if event_processor.stores.checkpoint_store.is_empty() { - let (committee, verified_checkpoint) = get_bootstrap_committee_and_checkpoint( - client_manager.get_sui_client().clone(), - client_manager.get_client().clone(), - event_processor.original_system_pkg_id, - ) - .await?; - event_processor - .stores - .committee_store - .insert(&(), &committee)?; - event_processor - .stores - .checkpoint_store - .insert(&(), verified_checkpoint.serializable_ref())?; - - // Also update the cache with the bootstrap checkpoint sequence number. - event_processor - .checkpoint_processor - .update_cached_latest_checkpoint_seq_number(*verified_checkpoint.sequence_number()); - } + let _clients = client_manager.into_client_set(); Ok(event_processor) } @@ -244,17 +243,89 @@ impl EventProcessor { /// Starts the event processor. This method will run until the cancellation token is cancelled. pub async fn start(&self, cancellation_token: CancellationToken) -> Result<(), anyhow::Error> { - tracing::info!("starting event processor"); + if self.config.enable_runtime_catchup { + tracing::info!("starting event processor with runtime catchup enabled"); + } else { + tracing::info!("starting event processor with runtime catchup disabled"); + } + + let (coordination_state, coordination_rx) = CatchupCoordinationState::new(); + let coordination_state = Arc::new(coordination_state); + + let catchup_manager = EventBlobCatchupManager::new( + self.stores.clone(), + self.client_manager.clone().into_client_set(), + self.system_config.clone(), + self.recovery_path.clone(), + &self.metrics_registry, + catchup::CatchupRuntimeConfig { + coordination_state: coordination_state.clone(), + coordination_timeout: Duration::from_secs(STARTUP_CATCHUP_TIMEOUT_SECS), + processing_timeout: Duration::from_secs(RUNTIME_CATCHUP_PROCESSING_TIMEOUT_SECS), + }, + ); + // Notify tailing stopped to unblock the catchup task + coordination_state.notify_tailing_stopped(); + + if let Err(e) = catchup_manager + .catchup(self.config.event_stream_catchup_min_checkpoint_lag) + .await + { + tracing::error!("failed to catchup using event blobs: {e}"); + } + + if self.stores.checkpoint_store.is_empty() { + let (committee, verified_checkpoint) = get_bootstrap_committee_and_checkpoint( + self.client_manager.get_sui_client().clone(), + self.client_manager.get_client().clone(), + self.original_system_pkg_id, + ) + .await?; + + self.stores.committee_store.insert(&(), &committee)?; + self.stores + .checkpoint_store + .insert(&(), verified_checkpoint.serializable_ref())?; + + self.checkpoint_processor + .update_cached_latest_checkpoint_seq_number(*verified_checkpoint.sequence_number()); + } + let pruning_task = self.start_pruning_events(cancellation_token.clone()); - let tailing_task = self.start_tailing_checkpoints(cancellation_token.clone()); - select! { - pruning_result = pruning_task => { - cancellation_token.cancel(); - pruning_result + let tailing_task = self.start_tailing_checkpoints_with_restart_support( + cancellation_token.clone(), + coordination_state.clone(), + coordination_rx, + ); + if self.config.enable_runtime_catchup { + let catchup_task = self.start_runtime_catchup_monitoring( + cancellation_token.clone(), + coordination_state.clone(), + ); + select! { + pruning_result = pruning_task => { + cancellation_token.cancel(); + pruning_result + } + tailing_result = tailing_task => { + cancellation_token.cancel(); + tailing_result + } + catchup_result = catchup_task => { + cancellation_token.cancel(); + catchup_result + } } - tailing_result = tailing_task => { - cancellation_token.cancel(); - tailing_result + } else { + select! { + pruning_result = pruning_task => { + cancellation_token.cancel(); + pruning_result + } + tailing_result = tailing_task => { + cancellation_token.cancel(); + tailing_result + } } } } @@ -290,6 +361,9 @@ impl EventProcessor { self.sampled_tracing_interval, ); + #[cfg(msim)] + sui_macros::fail_point_async!("pause_checkpoint_tailing_entry"); + while let Some(entry) = rx.recv().await { let Ok(checkpoint) = entry.result else { let error = entry.result.err().unwrap_or(anyhow!("unknown error")); @@ -327,6 +401,75 @@ impl EventProcessor { Ok(()) } + fn start_tailing_task( + &self, + cancel_token: CancellationToken, + coordination_state: Arc, + ) -> JoinHandle> { + let processor = self.clone(); + coordination_state.mark_tailing_started(); + let coordination_state = coordination_state.clone(); + tokio::spawn(async move { + tracing::info!("Starting tailing task"); + let result = processor.start_tailing_checkpoints(cancel_token).await; + tracing::info!("Tailing task exited"); + coordination_state.notify_tailing_stopped(); + result + }) + } + + /// Checkpoint tailing with message-based coordination for catchup + async fn start_tailing_checkpoints_with_restart_support( + &self, + cancel_token: CancellationToken, + coordination_state: Arc, + mut coordination_rx: mpsc::UnboundedReceiver, + ) -> Result<()> { + let mut child_cancel_token = cancel_token.child_token(); + + let mut tailing_task: Option>> = + Some(self.start_tailing_task(child_cancel_token.clone(), coordination_state.clone())); + loop { + tokio::select! { + msg = coordination_rx.recv() => { + match msg { + Some(CoordinationMessage::StopCheckpointTailing) => { + child_cancel_token.cancel(); + if let Some(handle) = tailing_task.take() { + match handle.await { + Ok(Ok(_)) => tracing::info!("tailing task exited"), + Ok(Err(error)) => tracing::error!(?error, "tailing task error"), + Err(error) => tracing::error!(?error, "tailing task panicked"), + } + } else { + tracing::error!("stop requested but tailing task was not running"); + coordination_state.notify_tailing_stopped(); + } + continue; + } + Some(CoordinationMessage::RestartCheckpointTailing | + CoordinationMessage::CatchupFailed) => { + tracing::info!("restarting checkpoint tailing"); + child_cancel_token = cancel_token.child_token(); + tailing_task = Some(self.start_tailing_task( + child_cancel_token.clone(), coordination_state.clone())); + continue; + } + None => { + tracing::info!("tailing task exited"); + break; + } + } + } + _ = cancel_token.cancelled() => { + tracing::info!("tailing task cancelled"); + break; + } + } + } + Ok(()) + } + /// Starts a periodic pruning process for events in the event store. This method will run until /// the cancellation token is cancelled. pub async fn start_pruning_events(&self, cancel_token: CancellationToken) -> Result<()> { @@ -368,4 +511,87 @@ impl EventProcessor { } } } + + /// Runtime catchup monitoring task + async fn start_runtime_catchup_monitoring( + &self, + cancel_token: CancellationToken, + coordination_state: Arc, + ) -> Result<()> { + let mut lag_check_interval = tokio::time::interval(self.config.runtime_lag_check_interval); + lag_check_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + tracing::info!( + check_interval_secs = self.config.runtime_lag_check_interval.as_secs(), + lag_threshold = self.config.runtime_catchup_lag_threshold, + "starting runtime catchup monitoring" + ); + + let coordination_state_clone = coordination_state.clone(); + let catchup_manager = EventBlobCatchupManager::new( + self.stores.clone(), + self.client_manager.clone().into_client_set(), + self.system_config.clone(), + self.recovery_path.clone(), + &self.metrics_registry, + catchup::CatchupRuntimeConfig { + coordination_state: coordination_state_clone, + coordination_timeout: self.config.catchup_coordination_timeout, + processing_timeout: self.config.catchup_processing_timeout, + }, + ); + + loop { + let _scope = monitored_scope::monitored_scope("RuntimeCatchupMonitoring"); + select! { + _ = lag_check_interval.tick() => { + if coordination_state.is_catchup_active() { + continue; + } + + match catchup_manager.get_current_lag().await { + Ok(lag) => { + self.metrics + .runtime_lag_current + .set(i64::try_from(lag).unwrap_or(i64::MAX)); + + if lag > self.config.runtime_catchup_lag_threshold { + tracing::info!( + lag = lag, + threshold = self.config.runtime_catchup_lag_threshold, + "triggering runtime catchup due to high lag" + ); + match catchup_manager.perform_catchup().await { + Ok(()) => {} + Err(catchup::CatchupError::Recoverable(error)) => { + tracing::warn!(?error, "recoverable error in catchup"); + } + Err(catchup::CatchupError::NonRecoverable(error)) => { + return Err(error); + } + } + } else { + tracing::debug!( + lag = lag, + threshold = self.config.runtime_catchup_lag_threshold, + "lag below threshold, no catchup needed" + ); + } + } + Err(error) => { + tracing::warn!( + error = ?error, + "failed to calculate current lag for runtime catchup monitoring" + ); + } + } + } + _ = cancel_token.cancelled() => { + tracing::info!("runtime catchup monitoring shutting down"); + break; + } + } + } + Ok(()) + } } diff --git a/crates/walrus-service/src/test_utils.rs b/crates/walrus-service/src/test_utils.rs index 8c43ba4d80..1e31a613b5 100644 --- a/crates/walrus-service/src/test_utils.rs +++ b/crates/walrus-service/src/test_utils.rs @@ -1147,6 +1147,7 @@ impl StorageNodeHandleBuilder { // in simtest. 200 }, + runtime_catchup_lag_threshold: 200, ..Default::default() }, use_legacy_event_provider: false, diff --git a/crates/walrus-simtest/src/test_utils.rs b/crates/walrus-simtest/src/test_utils.rs index 504b225597..5cc4c5eca9 100644 --- a/crates/walrus-simtest/src/test_utils.rs +++ b/crates/walrus-simtest/src/test_utils.rs @@ -57,6 +57,10 @@ pub mod simtest_utils { node_index: usize, checkpoint_fn: impl Fn(usize) -> u32, ) { + tracing::info!( + "Stopping node {}", + walrus_cluster.nodes[node_index].node_id.unwrap() + ); kill_node(walrus_cluster.nodes[node_index].node_id.unwrap()).await; let node = &mut walrus_cluster.nodes[node_index]; node.node_id = Some( @@ -76,22 +80,32 @@ pub mod simtest_utils { client: &Arc>>, ) -> EventBlob { const TIMEOUT: Duration = Duration::from_secs(10); - let start = Instant::now(); + get_last_certified_event_blob(client, TIMEOUT) + .await + .expect("Timeout waiting for last certified event blob") + } - while start.elapsed() <= TIMEOUT { + /// Gets the last certified event blob from the client. + /// Returns the last certified event blob if it exists, otherwise returns None. + pub async fn get_last_certified_event_blob( + client: &Arc>>, + timeout: Duration, + ) -> Option { + let start = Instant::now(); + while start.elapsed() <= timeout { if let Some(blob) = client .inner .sui_client() .read_client .last_certified_event_blob() .await - .unwrap() + .unwrap_or(None) { - return blob; + return Some(blob); } tokio::time::sleep(Duration::from_secs(1)).await; } - panic!("Timeout waiting for last certified event blob"); + None } /// Helper function to write a random blob, read it back and check that it is the same. diff --git a/crates/walrus-simtest/tests/simtest_event_blob.rs b/crates/walrus-simtest/tests/simtest_event_blob.rs index 84bdaded3c..f1a76b1eee 100644 --- a/crates/walrus-simtest/tests/simtest_event_blob.rs +++ b/crates/walrus-simtest/tests/simtest_event_blob.rs @@ -9,7 +9,10 @@ mod tests { use std::{ fs, - sync::Arc, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, time::{Duration, Instant}, }; @@ -189,7 +192,7 @@ mod tests { } /// This test verifies that the node can correctly recover from a forked event blob. - #[ignore = "ignore integration simtests by default"] + //#[ignore = "ignore integration simtests by default"] #[walrus_simtest] async fn test_event_blob_fork_recovery() { let (_sui_cluster, mut walrus_cluster, client, _) = @@ -291,4 +294,120 @@ mod tests { .await .unwrap(); } + + /// This integration test simulates pausing checkpoint tailing to trigger runtime catchup. + #[ignore = "ignore integration simtests by default"] + #[walrus_simtest] + async fn test_runtime_catchup_triggers_on_tailing_pause() { + use walrus_service::client::ClientCommunicationConfig; + + let (_sui_cluster, mut walrus_cluster, client, _) = + test_cluster::E2eTestSetupBuilder::new() + .with_epoch_duration(Duration::from_secs(15)) + .with_num_checkpoints_per_blob(20) + //.with_event_stream_catchup_min_checkpoint_lag(Some(u64::MAX)) + .with_test_nodes_config(TestNodesConfig { + node_weights: vec![2, 2, 3, 3, 3], + use_legacy_event_processor: false, + ..Default::default() + }) + .with_communication_config( + ClientCommunicationConfig::default_for_test_with_reqwest_timeout( + Duration::from_secs(2), + ), + ) + .build_generic::() + .await + .unwrap(); + + let client_arc = Arc::new(client); + + // Wait until all nodes have non zero latest checkpoint sequence number + loop { + let node_health_info = + simtest_utils::get_nodes_health_info(&walrus_cluster.nodes).await; + if node_health_info + .iter() + .all(|info| info.latest_checkpoint_sequence_number.unwrap() > 0) + { + break; + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + + tracing::info!("All nodes have non zero latest checkpoint sequence number"); + + // Get latest certified event blob + let mut latest_certified_blob = None; + loop { + latest_certified_blob = + simtest_utils::get_last_certified_event_blob(&client_arc, Duration::from_secs(30)) + .await; + if latest_certified_blob.is_some() { + tracing::info!( + "Latest certified blob: {:?}", + latest_certified_blob.clone().unwrap() + ); + break; + } + tracing::info!("Waiting for latest certified blob"); + } + + // Track whether event-blob catchup path is exercised + let saw_event_blob_catchup = Arc::new(AtomicBool::new(false)); + let saw_event_blob_catchup_clone = saw_event_blob_catchup.clone(); + sui_macros::register_fail_point("fail_point_catchup_using_event_blobs_start", move || { + saw_event_blob_catchup_clone.store(true, Ordering::SeqCst); + }); + + // Pause checkpoint tailing to build lag + sui_macros::register_fail_point_async( + "pause_checkpoint_tailing_entry", + move || async move { + // Sleep enough to build lag and trigger catchup monitoring + tracing::info!("Pausing checkpoint tailing"); + tokio::time::sleep(Duration::from_secs(45)).await; + }, + ); + + // Restart node 0 to apply failpoint into its runtime + simtest_utils::restart_node_with_checkpoints(&mut walrus_cluster, 0, |_| 20).await; + + // Wait until latest certified blob is updated + loop { + if let Some(blob) = + simtest_utils::get_last_certified_event_blob(&client_arc, Duration::from_secs(30)) + .await + { + tracing::info!("Latest certified blob: {:?}", blob); + if blob.blob_id != latest_certified_blob.clone().unwrap().blob_id { + break; + } + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + + let mut node_health_info = + simtest_utils::get_nodes_health_info([&walrus_cluster.nodes[0]]).await; + let prev_seq = node_health_info[0] + .latest_checkpoint_sequence_number + .unwrap(); + + // Let lag build and catchup trigger + tokio::time::sleep(Duration::from_secs(90)).await; + + node_health_info = simtest_utils::get_nodes_health_info([&walrus_cluster.nodes[0]]).await; + + // Verify that latest checkpoint advances after the pause window and node is healthy + let latest_seq = node_health_info[0] + .latest_checkpoint_sequence_number + .unwrap(); + assert!(latest_seq > prev_seq); + + // Verify event-blob catchup ran + assert!(saw_event_blob_catchup.load(Ordering::SeqCst)); + + sui_macros::clear_fail_point("pause_checkpoint_tailing_entry"); + sui_macros::clear_fail_point("fail_point_catchup_using_event_blobs_start"); + } } diff --git a/crates/walrus-sui/src/client/retry_client/failover.rs b/crates/walrus-sui/src/client/retry_client/failover.rs index 558413505b..b0eca391fd 100644 --- a/crates/walrus-sui/src/client/retry_client/failover.rs +++ b/crates/walrus-sui/src/client/retry_client/failover.rs @@ -99,12 +99,12 @@ impl + std::fmt::Debug> return Err(anyhow::anyhow!("No clients available")); } - let max_tries = BuilderT::DEFAULT_MAX_TRIES.min(lazy_client_builders.len()); + let max_tries = BuilderT::DEFAULT_MAX_TRIES; // Precondition check (a bit redundant, but just for extra safety) to ensure we have more // clients than we allow failovers. Note that if this condition changes, `fetch_next_client` // will need to be updated to track a failover count separately. - assert!(lazy_client_builders.len() >= max_tries); + //assert!(lazy_client_builders.len() >= max_tries); Ok(Self { lazy_client_builders, @@ -191,6 +191,7 @@ impl + std::fmt::Debug> loop { // PLAN phase. if tried_client_indices.len() >= self.max_tries { + tracing::error!("max failovers exceeded [max_tries={}]", self.max_tries); return Err(FailoverError::FailedToGetClient(format!( "max failovers exceeded [max_tries={}]", self.max_tries @@ -198,6 +199,10 @@ impl + std::fmt::Debug> } if !tried_client_indices.insert(next_index) { + tracing::error!( + "already tried this client, skipping [next_index={}]", + next_index + ); // We've already tried this client, so let's skip it. next_index = (next_index + 1) % self.client_count(); continue; @@ -211,7 +216,7 @@ impl + std::fmt::Debug> Ok(client) => client, Err(error) => { // Log the error and failover to the next client. - tracing::warn!( + tracing::error!( "failed to get client from url {}, error: {}, failover to next client", self.lazy_client_builders[next_index] .get_rpc_url() @@ -325,7 +330,7 @@ impl + std::fmt::Debug> .get_current_rpc_url() .await .unwrap_or_else(|error| error.to_string()); - tracing::warn!( + tracing::error!( "RPC to endpoint {:?} failed with error: {:?}, fetching next client", failed_rpc_url, error diff --git a/crates/walrus-sui/src/client/retry_client/retriable_sui_client.rs b/crates/walrus-sui/src/client/retry_client/retriable_sui_client.rs index 267160f9d7..566c43b098 100644 --- a/crates/walrus-sui/src/client/retry_client/retriable_sui_client.rs +++ b/crates/walrus-sui/src/client/retry_client/retriable_sui_client.rs @@ -196,7 +196,14 @@ impl LazyClientBuilder for LazySuiClientBuilder { "build_sui_client", ) .await - .map_err(|e| FailoverError::FailedToGetClient(e.to_string()))?; + .map_err(|e| { + tracing::info!( + "failed to get sui client from url {}, error: {}", + rpc_url, + e + ); + FailoverError::FailedToGetClient(e.to_string()) + })?; Ok(Arc::new(sui_client)) } }