Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion crates/checkpoint-downloader/src/downloader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -463,7 +463,12 @@ impl ParallelCheckpointDownloaderInner {
sequence_number += 1;
}

let Some(entry) = self.checkpoint_receiver.recv().await else {
let maybe_entry = tokio::select! {
_ = self.cancellation_token.cancelled() => None,
entry = self.checkpoint_receiver.recv() => entry,
};
let Some(entry) = maybe_entry else {
tracing::info!("no checkpoint received, stopping checkpoint fetcher");
break;
};

Expand Down
5 changes: 5 additions & 0 deletions crates/walrus-service/node_config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,11 @@ event_processor_config:
max_consecutive_pool_monitor_failures: 10
event_stream_catchup_min_checkpoint_lag: 20000
sampled_tracing_interval_secs: 3600
enable_runtime_catchup: true
runtime_catchup_lag_threshold: 20000
runtime_lag_check_interval_secs: 300
catchup_coordination_timeout_secs: 3000
catchup_processing_timeout_secs: 3000
use_legacy_event_provider: false
disable_event_blob_writer: false
commission_rate: 6000
Expand Down
1 change: 1 addition & 0 deletions crates/walrus-service/src/event/event_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ pub mod catchup;
pub mod checkpoint;
pub mod client;
pub mod config;
pub mod coordination;
pub mod db;
pub mod metrics;
pub mod package_store;
Expand Down
201 changes: 162 additions & 39 deletions crates/walrus-service/src/event/event_processor/catchup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

//! Catchup module for catching up the event processor with the network.

use std::{fs, path::PathBuf};
use std::{fs, path::PathBuf, sync::Arc, time::Duration};

use sui_types::{
committee::Committee,
Expand All @@ -22,6 +22,7 @@ use crate::event::{
event_blob::EventBlob,
event_processor::{
config::{SuiClientSet, SystemConfig},
coordination::CatchupCoordinationState,
db::EventProcessorStores,
},
events::{IndexedStreamEvent, InitState},
Expand Down Expand Up @@ -57,6 +58,28 @@ impl std::fmt::Debug for DownloadedBlob {
}
}

/// Configuration for runtime catchup coordination.
#[derive(Clone, Debug)]
pub struct CatchupRuntimeConfig {
/// The coordination state for the catchup process.
pub coordination_state: Arc<CatchupCoordinationState>,
/// The timeout for the coordination process.
pub coordination_timeout: Duration,
/// The timeout for the processing process.
pub processing_timeout: Duration,
}

/// Structured error type for catchup.
#[derive(thiserror::Error, Debug)]
pub enum CatchupError {
/// A recoverable error that can be retried.
#[error("recoverable catchup error: {0}")]
Recoverable(#[from] anyhow::Error),
/// A non-recoverable error that cannot be retried.
#[error("non-recoverable catchup error: {0}")]
NonRecoverable(anyhow::Error),
}

/// Manages the catchup process for events in the event processor using event blobs.
///
/// This manager handles the process of catching up the local event store with the network state.
Expand All @@ -71,6 +94,7 @@ pub struct EventBlobCatchupManager {
system_config: SystemConfig,
recovery_path: PathBuf,
metrics: EventCatchupManagerMetrics,
catchup_runtime_config: CatchupRuntimeConfig,
}

impl std::fmt::Debug for EventBlobCatchupManager {
Expand All @@ -91,6 +115,7 @@ impl EventBlobCatchupManager {
system_config: SystemConfig,
recovery_path: PathBuf,
registry: &Registry,
catchup_runtime_config: CatchupRuntimeConfig,
) -> Self {
let metrics = EventCatchupManagerMetrics::new(registry);
Self {
Expand All @@ -99,31 +124,27 @@ impl EventBlobCatchupManager {
system_config,
recovery_path,
metrics,
catchup_runtime_config,
}
}

/// Checks if the event processor is lagging behind the network and performs catchup if needed.
pub async fn catchup(&self, lag_threshold: u64) -> anyhow::Result<()> {
let current_checkpoint = self.get_current_checkpoint()?;
let latest_checkpoint = self.get_latest_network_checkpoint().await;

let current_lag = self.calculate_lag(current_checkpoint, latest_checkpoint)?;
let current_lag = self.get_current_lag().await?;

if current_lag > lag_threshold {
tracing::info!(
current_checkpoint,
latest_checkpoint,
lag_threshold,
"performing catchup - lag is above threshold"
);
self.perform_catchup().await?;
tracing::info!(lag_threshold, "performing catchup - lag is above threshold");
match self.perform_catchup().await {
Ok(()) => {}
Err(CatchupError::Recoverable(error)) => {
tracing::warn!(?error, "recoverable error during catchup");
}
Err(CatchupError::NonRecoverable(error)) => {
return Err(error);
}
}
} else {
tracing::info!(
current_checkpoint,
latest_checkpoint,
lag_threshold,
"skipping catchup - lag is below threshold"
);
tracing::info!(lag_threshold, "skipping catchup - lag is below threshold");
}

Ok(())
Expand Down Expand Up @@ -184,16 +205,39 @@ impl EventBlobCatchupManager {
Ok(lag)
}

/// Gets the current lag between the local store and the network
pub async fn get_current_lag(&self) -> anyhow::Result<u64> {
let current_checkpoint = self.get_current_checkpoint()?;
let latest_checkpoint = self.get_latest_network_checkpoint().await;
self.calculate_lag(current_checkpoint, latest_checkpoint)
}

/// Performs the catchup operation using event blobs
pub async fn perform_catchup(&self) -> anyhow::Result<()> {
match self.catchup_using_event_blobs().await {
pub async fn perform_catchup(&self) -> Result<(), CatchupError> {
let coordination_state = self.catchup_runtime_config.coordination_state.clone();
if !coordination_state.try_start_catchup().await {
tracing::info!("runtime catchup already active, skipping new run");
return Ok(());
}

let result = self.catchup_using_event_blobs().await;
coordination_state.mark_catchup_inactive();

match result {
Ok(()) => {
tracing::info!("successfully caught up using event blobs");
Ok(())
}
Err(error) => {
tracing::error!(?error, "failed to catch up using event blobs");
Err(error)
Err(CatchupError::Recoverable(error)) => {
tracing::warn!(?error, "recoverable error during catchup using event blobs");
Err(CatchupError::Recoverable(error))
}
Err(CatchupError::NonRecoverable(error)) => {
tracing::error!(
?error,
"non-recoverable error during catchup using event blobs"
);
Err(CatchupError::NonRecoverable(error))
}
}
}
Expand All @@ -218,27 +262,99 @@ impl EventBlobCatchupManager {
/// starting from `N+1`). If however, the local store is empty, the catch-up will store all
/// events from the earliest available event blob (in which case the first stored event index
/// could be greater than `0`).
async fn catchup_using_event_blobs(&self) -> anyhow::Result<()> {
async fn catchup_using_event_blobs(&self) -> Result<(), CatchupError> {
#[cfg(msim)]
sui_macros::fail_point!("fail_point_catchup_using_event_blobs_start");
let coordination_state = self.catchup_runtime_config.coordination_state.clone();

let next_checkpoint = self.get_next_checkpoint()?;
let next_checkpoint = self
.get_next_checkpoint()
.map_err(|error| CatchupError::Recoverable(anyhow::Error::from(error)))?;
tracing::info!(next_checkpoint, "starting event catchup using event blobs");
self.ensure_recovery_directory()?;
self.ensure_recovery_directory()
.map_err(CatchupError::Recoverable)?;

let blobs = self
.collect_event_blobs_for_catchup(next_checkpoint)
.await?;
let next_event_index = self.get_next_event_index()?;
let blobs = match self.collect_event_blobs_for_catchup(next_checkpoint).await {
Ok(blobs) => blobs,
Err(error) => {
tracing::error!(error = ?error, "failed to collect event blobs for catchup");
return Err(CatchupError::Recoverable(anyhow::anyhow!(
"failed to collect event blobs for catchup"
)));
}
};

self.process_event_blobs(blobs, next_event_index).await?;
if blobs.is_empty() {
tracing::info!(
"no event blobs collected for catchup; skipping stop of checkpoint tailing"
);
return Ok(());
}

Ok(())
if let Err(e) = coordination_state.start_catchup_processing_phase().await {
tracing::error!(
error = ?e,
"failed to send stop message to checkpoint tailing"
);
return Err(CatchupError::NonRecoverable(anyhow::anyhow!(
"failed to send stop message to checkpoint tailing"
)));
}

tracing::info!("waiting for tailing to stop before processing blobs");
if !coordination_state
.wait_for_tailing_stopped(self.catchup_runtime_config.coordination_timeout)
.await
{
tracing::error!(
timeout_secs = self.catchup_runtime_config.coordination_timeout.as_secs(),
"timed out waiting for checkpoint tailing to stop"
);
return Err(CatchupError::NonRecoverable(anyhow::anyhow!(
"timed out waiting for checkpoint tailing to stop"
)));
}

let next_event_index = self
.get_next_event_index()
.map_err(|error| CatchupError::NonRecoverable(anyhow::Error::from(error)))?;
let processing_result = tokio::time::timeout(
self.catchup_runtime_config.processing_timeout,
self.process_event_blobs(blobs, next_event_index),
)
.await;

let mut recoverable_error: Option<anyhow::Error> = None;
match processing_result {
Ok(Ok(num_events_recovered)) => {
tracing::info!("successfully processed {} events", num_events_recovered);
}
Ok(Err(error)) => {
tracing::error!(error = ?error, "failed to process event blobs");
recoverable_error = Some(error);
}
Err(_) => {
let timeout_error = anyhow::anyhow!("processing event blobs timed out");
tracing::error!(error = ?timeout_error, "processing event blobs timed out");
recoverable_error = Some(timeout_error);
}
}

if let Err(e) = coordination_state.complete_catchup().await {
tracing::error!(error = ?e, "failed to send restart message to checkpoint tailing");
Err(CatchupError::NonRecoverable(anyhow::anyhow!(
"failed to send restart message to checkpoint tailing"
)))
} else if let Some(error) = recoverable_error {
Err(CatchupError::Recoverable(error))
} else {
Ok(())
}
}

/// Gets the next checkpoint sequence number that is after the latest checkpoint in the
/// checkpoint store.
fn get_next_checkpoint(&self) -> Result<Option<u64>, TypedStoreError> {
pub fn get_next_checkpoint(&self) -> Result<Option<u64>, TypedStoreError> {
Ok(self
.stores
.checkpoint_store
Expand All @@ -249,7 +365,7 @@ impl EventBlobCatchupManager {
}

/// Gets the next event index that is after the latest event index in the event store.
fn get_next_event_index(&self) -> Result<Option<u64>, TypedStoreError> {
pub fn get_next_event_index(&self) -> Result<Option<u64>, TypedStoreError> {
Ok(self
.stores
.event_store
Expand All @@ -260,7 +376,7 @@ impl EventBlobCatchupManager {
}

/// Ensures the recovery directory exists
fn ensure_recovery_directory(&self) -> anyhow::Result<()> {
pub fn ensure_recovery_directory(&self) -> anyhow::Result<()> {
if !self.recovery_path.exists() {
fs::create_dir_all(&self.recovery_path)?;
}
Expand All @@ -281,7 +397,7 @@ impl EventBlobCatchupManager {
/// This function creates a client to download event blobs up to a specified
/// checkpoint. The blobs are stored in the provided recovery path.
#[cfg(feature = "client")]
async fn collect_event_blobs_for_catchup(
pub async fn collect_event_blobs_for_catchup(
&self,
starting_checkpoint_to_process: Option<u64>,
) -> anyhow::Result<Vec<BlobId>> {
Expand Down Expand Up @@ -313,11 +429,18 @@ impl EventBlobCatchupManager {
Ok(blob_ids)
}

async fn process_event_blobs(
/// Processes event blobs and stores them in the event store.
///
/// This function performs the following steps:
/// 1. Iterates through the event blobs in reverse order (oldest to newest).
/// 2. Processes each blob:
/// - Collects relevant events that maintain a continuous sequence with the local store.
/// - Stores the events in the event store.
pub async fn process_event_blobs(
&self,
blobs: Vec<BlobId>,
next_event_index: Option<u64>,
) -> anyhow::Result<()> {
) -> anyhow::Result<usize> {
tracing::info!("starting to process event blobs");

let mut num_events_recovered = 0;
Expand Down Expand Up @@ -372,7 +495,7 @@ impl EventBlobCatchupManager {
}

tracing::info!("recovered {} events from event blobs", num_events_recovered);
Ok(())
Ok(num_events_recovered)
}

async fn process_single_blob(
Expand Down
24 changes: 24 additions & 0 deletions crates/walrus-service/src/event/event_processor/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,25 @@ pub struct EventProcessorConfig {
#[serde_as(as = "DurationSeconds")]
#[serde(rename = "sampled_tracing_interval_secs")]
pub sampled_tracing_interval: Duration,
/// Enable runtime catchup functionality.
pub enable_runtime_catchup: bool,
/// Runtime catchup lag threshold for triggering catchup at runtime.
/// When the lag exceeds this threshold during runtime, the catchup manager
/// will be invoked in parallel with the checkpoint downloader to accelerate
/// recovery. Should typically be the same as event_stream_catchup_min_checkpoint_lag.
pub runtime_catchup_lag_threshold: u64,
/// Interval for checking lag during runtime to trigger catchup if needed.
#[serde_as(as = "DurationSeconds")]
#[serde(rename = "runtime_lag_check_interval_secs")]
pub runtime_lag_check_interval: Duration,
/// Timeout for catchup operations to prevent indefinite blocking.
#[serde_as(as = "DurationSeconds")]
#[serde(rename = "catchup_coordination_timeout_secs")]
pub catchup_coordination_timeout: Duration,
/// Timeout for catchup processing to prevent indefinite blocking.
#[serde_as(as = "DurationSeconds")]
#[serde(rename = "catchup_processing_timeout_secs")]
pub catchup_processing_timeout: Duration,
}

impl Default for EventProcessorConfig {
Expand All @@ -116,6 +135,11 @@ impl Default for EventProcessorConfig {
adaptive_downloader_config: Default::default(),
event_stream_catchup_min_checkpoint_lag: 20_000,
sampled_tracing_interval: Duration::from_secs(3600),
runtime_catchup_lag_threshold: 20_000,
runtime_lag_check_interval: Duration::from_secs(300),
enable_runtime_catchup: true,
catchup_coordination_timeout: Duration::from_secs(3000),
catchup_processing_timeout: Duration::from_secs(3000),
}
}
}
Expand Down
Loading
Loading