Skip to content

Commit e162731

Browse files
committed
feat: Allow event blob catchup at runtime
1 parent 1889956 commit e162731

File tree

13 files changed

+893
-99
lines changed

13 files changed

+893
-99
lines changed

crates/checkpoint-downloader/src/downloader.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -463,7 +463,12 @@ impl ParallelCheckpointDownloaderInner {
463463
sequence_number += 1;
464464
}
465465

466-
let Some(entry) = self.checkpoint_receiver.recv().await else {
466+
let maybe_entry = tokio::select! {
467+
_ = self.cancellation_token.cancelled() => None,
468+
entry = self.checkpoint_receiver.recv() => entry,
469+
};
470+
let Some(entry) = maybe_entry else {
471+
tracing::info!("no checkpoint received, stopping checkpoint fetcher");
467472
break;
468473
};
469474

crates/walrus-service/node_config_example.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -166,6 +166,11 @@ event_processor_config:
166166
max_consecutive_pool_monitor_failures: 10
167167
event_stream_catchup_min_checkpoint_lag: 20000
168168
sampled_tracing_interval_secs: 3600
169+
enable_runtime_catchup: true
170+
runtime_catchup_lag_threshold: 20000
171+
runtime_lag_check_interval_secs: 300
172+
catchup_coordination_timeout_secs: 3000
173+
catchup_processing_timeout_secs: 3000
169174
use_legacy_event_provider: false
170175
disable_event_blob_writer: false
171176
commission_rate: 6000

crates/walrus-service/src/event/event_processor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ pub mod catchup;
88
pub mod checkpoint;
99
pub mod client;
1010
pub mod config;
11+
pub mod coordination;
1112
pub mod db;
1213
pub mod metrics;
1314
pub mod package_store;

crates/walrus-service/src/event/event_processor/catchup.rs

Lines changed: 162 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33

44
//! Catchup module for catching up the event processor with the network.
55
6-
use std::{fs, path::PathBuf};
6+
use std::{fs, path::PathBuf, sync::Arc, time::Duration};
77

88
use sui_types::{
99
committee::Committee,
@@ -22,6 +22,7 @@ use crate::event::{
2222
event_blob::EventBlob,
2323
event_processor::{
2424
config::{SuiClientSet, SystemConfig},
25+
coordination::CatchupCoordinationState,
2526
db::EventProcessorStores,
2627
},
2728
events::{IndexedStreamEvent, InitState},
@@ -57,6 +58,28 @@ impl std::fmt::Debug for DownloadedBlob {
5758
}
5859
}
5960

61+
/// Configuration for runtime catchup coordination.
62+
#[derive(Clone, Debug)]
63+
pub struct CatchupRuntimeConfig {
64+
/// The coordination state for the catchup process.
65+
pub coordination_state: Arc<CatchupCoordinationState>,
66+
/// The timeout for the coordination process.
67+
pub coordination_timeout: Duration,
68+
/// The timeout for the processing process.
69+
pub processing_timeout: Duration,
70+
}
71+
72+
/// Structured error type for catchup.
73+
#[derive(thiserror::Error, Debug)]
74+
pub enum CatchupError {
75+
/// A recoverable error that can be retried.
76+
#[error("recoverable catchup error: {0}")]
77+
Recoverable(#[from] anyhow::Error),
78+
/// A non-recoverable error that cannot be retried.
79+
#[error("non-recoverable catchup error: {0}")]
80+
NonRecoverable(anyhow::Error),
81+
}
82+
6083
/// Manages the catchup process for events in the event processor using event blobs.
6184
///
6285
/// This manager handles the process of catching up the local event store with the network state.
@@ -71,6 +94,7 @@ pub struct EventBlobCatchupManager {
7194
system_config: SystemConfig,
7295
recovery_path: PathBuf,
7396
metrics: EventCatchupManagerMetrics,
97+
catchup_runtime_config: CatchupRuntimeConfig,
7498
}
7599

76100
impl std::fmt::Debug for EventBlobCatchupManager {
@@ -91,6 +115,7 @@ impl EventBlobCatchupManager {
91115
system_config: SystemConfig,
92116
recovery_path: PathBuf,
93117
registry: &Registry,
118+
catchup_runtime_config: CatchupRuntimeConfig,
94119
) -> Self {
95120
let metrics = EventCatchupManagerMetrics::new(registry);
96121
Self {
@@ -99,31 +124,27 @@ impl EventBlobCatchupManager {
99124
system_config,
100125
recovery_path,
101126
metrics,
127+
catchup_runtime_config,
102128
}
103129
}
104130

105131
/// Checks if the event processor is lagging behind the network and performs catchup if needed.
106132
pub async fn catchup(&self, lag_threshold: u64) -> anyhow::Result<()> {
107-
let current_checkpoint = self.get_current_checkpoint()?;
108-
let latest_checkpoint = self.get_latest_network_checkpoint().await;
109-
110-
let current_lag = self.calculate_lag(current_checkpoint, latest_checkpoint)?;
133+
let current_lag = self.get_current_lag().await?;
111134

112135
if current_lag > lag_threshold {
113-
tracing::info!(
114-
current_checkpoint,
115-
latest_checkpoint,
116-
lag_threshold,
117-
"performing catchup - lag is above threshold"
118-
);
119-
self.perform_catchup().await?;
136+
tracing::info!(lag_threshold, "performing catchup - lag is above threshold");
137+
match self.perform_catchup().await {
138+
Ok(()) => {}
139+
Err(CatchupError::Recoverable(error)) => {
140+
tracing::warn!(?error, "recoverable error during catchup");
141+
}
142+
Err(CatchupError::NonRecoverable(error)) => {
143+
return Err(error);
144+
}
145+
}
120146
} else {
121-
tracing::info!(
122-
current_checkpoint,
123-
latest_checkpoint,
124-
lag_threshold,
125-
"skipping catchup - lag is below threshold"
126-
);
147+
tracing::info!(lag_threshold, "skipping catchup - lag is below threshold");
127148
}
128149

129150
Ok(())
@@ -184,16 +205,39 @@ impl EventBlobCatchupManager {
184205
Ok(lag)
185206
}
186207

208+
/// Gets the current lag between the local store and the network
209+
pub async fn get_current_lag(&self) -> anyhow::Result<u64> {
210+
let current_checkpoint = self.get_current_checkpoint()?;
211+
let latest_checkpoint = self.get_latest_network_checkpoint().await;
212+
self.calculate_lag(current_checkpoint, latest_checkpoint)
213+
}
214+
187215
/// Performs the catchup operation using event blobs
188-
pub async fn perform_catchup(&self) -> anyhow::Result<()> {
189-
match self.catchup_using_event_blobs().await {
216+
pub async fn perform_catchup(&self) -> Result<(), CatchupError> {
217+
let coordination_state = self.catchup_runtime_config.coordination_state.clone();
218+
if !coordination_state.try_start_catchup().await {
219+
tracing::info!("runtime catchup already active, skipping new run");
220+
return Ok(());
221+
}
222+
223+
let result = self.catchup_using_event_blobs().await;
224+
coordination_state.mark_catchup_inactive();
225+
226+
match result {
190227
Ok(()) => {
191228
tracing::info!("successfully caught up using event blobs");
192229
Ok(())
193230
}
194-
Err(error) => {
195-
tracing::error!(?error, "failed to catch up using event blobs");
196-
Err(error)
231+
Err(CatchupError::Recoverable(error)) => {
232+
tracing::warn!(?error, "recoverable error during catchup using event blobs");
233+
Err(CatchupError::Recoverable(error))
234+
}
235+
Err(CatchupError::NonRecoverable(error)) => {
236+
tracing::error!(
237+
?error,
238+
"non-recoverable error during catchup using event blobs"
239+
);
240+
Err(CatchupError::NonRecoverable(error))
197241
}
198242
}
199243
}
@@ -218,27 +262,99 @@ impl EventBlobCatchupManager {
218262
/// starting from `N+1`). If however, the local store is empty, the catch-up will store all
219263
/// events from the earliest available event blob (in which case the first stored event index
220264
/// could be greater than `0`).
221-
async fn catchup_using_event_blobs(&self) -> anyhow::Result<()> {
265+
async fn catchup_using_event_blobs(&self) -> Result<(), CatchupError> {
222266
#[cfg(msim)]
223267
sui_macros::fail_point!("fail_point_catchup_using_event_blobs_start");
268+
let coordination_state = self.catchup_runtime_config.coordination_state.clone();
224269

225-
let next_checkpoint = self.get_next_checkpoint()?;
270+
let next_checkpoint = self
271+
.get_next_checkpoint()
272+
.map_err(|error| CatchupError::Recoverable(anyhow::Error::from(error)))?;
226273
tracing::info!(next_checkpoint, "starting event catchup using event blobs");
227-
self.ensure_recovery_directory()?;
274+
self.ensure_recovery_directory()
275+
.map_err(CatchupError::Recoverable)?;
228276

229-
let blobs = self
230-
.collect_event_blobs_for_catchup(next_checkpoint)
231-
.await?;
232-
let next_event_index = self.get_next_event_index()?;
277+
let blobs = match self.collect_event_blobs_for_catchup(next_checkpoint).await {
278+
Ok(blobs) => blobs,
279+
Err(error) => {
280+
tracing::error!(error = ?error, "failed to collect event blobs for catchup");
281+
return Err(CatchupError::Recoverable(anyhow::anyhow!(
282+
"failed to collect event blobs for catchup"
283+
)));
284+
}
285+
};
233286

234-
self.process_event_blobs(blobs, next_event_index).await?;
287+
if blobs.is_empty() {
288+
tracing::info!(
289+
"no event blobs collected for catchup; skipping stop of checkpoint tailing"
290+
);
291+
return Ok(());
292+
}
235293

236-
Ok(())
294+
if let Err(e) = coordination_state.start_catchup_processing_phase().await {
295+
tracing::error!(
296+
error = ?e,
297+
"failed to send stop message to checkpoint tailing"
298+
);
299+
return Err(CatchupError::NonRecoverable(anyhow::anyhow!(
300+
"failed to send stop message to checkpoint tailing"
301+
)));
302+
}
303+
304+
tracing::info!("waiting for tailing to stop before processing blobs");
305+
if !coordination_state
306+
.wait_for_tailing_stopped(self.catchup_runtime_config.coordination_timeout)
307+
.await
308+
{
309+
tracing::error!(
310+
timeout_secs = self.catchup_runtime_config.coordination_timeout.as_secs(),
311+
"timed out waiting for checkpoint tailing to stop"
312+
);
313+
return Err(CatchupError::NonRecoverable(anyhow::anyhow!(
314+
"timed out waiting for checkpoint tailing to stop"
315+
)));
316+
}
317+
318+
let next_event_index = self
319+
.get_next_event_index()
320+
.map_err(|error| CatchupError::NonRecoverable(anyhow::Error::from(error)))?;
321+
let processing_result = tokio::time::timeout(
322+
self.catchup_runtime_config.processing_timeout,
323+
self.process_event_blobs(blobs, next_event_index),
324+
)
325+
.await;
326+
327+
let mut recoverable_error: Option<anyhow::Error> = None;
328+
match processing_result {
329+
Ok(Ok(num_events_recovered)) => {
330+
tracing::info!("successfully processed {} events", num_events_recovered);
331+
}
332+
Ok(Err(error)) => {
333+
tracing::error!(error = ?error, "failed to process event blobs");
334+
recoverable_error = Some(error);
335+
}
336+
Err(_) => {
337+
let timeout_error = anyhow::anyhow!("processing event blobs timed out");
338+
tracing::error!(error = ?timeout_error, "processing event blobs timed out");
339+
recoverable_error = Some(timeout_error);
340+
}
341+
}
342+
343+
if let Err(e) = coordination_state.complete_catchup().await {
344+
tracing::error!(error = ?e, "failed to send restart message to checkpoint tailing");
345+
Err(CatchupError::NonRecoverable(anyhow::anyhow!(
346+
"failed to send restart message to checkpoint tailing"
347+
)))
348+
} else if let Some(error) = recoverable_error {
349+
Err(CatchupError::Recoverable(error))
350+
} else {
351+
Ok(())
352+
}
237353
}
238354

239355
/// Gets the next checkpoint sequence number that is after the latest checkpoint in the
240356
/// checkpoint store.
241-
fn get_next_checkpoint(&self) -> Result<Option<u64>, TypedStoreError> {
357+
pub fn get_next_checkpoint(&self) -> Result<Option<u64>, TypedStoreError> {
242358
Ok(self
243359
.stores
244360
.checkpoint_store
@@ -249,7 +365,7 @@ impl EventBlobCatchupManager {
249365
}
250366

251367
/// Gets the next event index that is after the latest event index in the event store.
252-
fn get_next_event_index(&self) -> Result<Option<u64>, TypedStoreError> {
368+
pub fn get_next_event_index(&self) -> Result<Option<u64>, TypedStoreError> {
253369
Ok(self
254370
.stores
255371
.event_store
@@ -260,7 +376,7 @@ impl EventBlobCatchupManager {
260376
}
261377

262378
/// Ensures the recovery directory exists
263-
fn ensure_recovery_directory(&self) -> anyhow::Result<()> {
379+
pub fn ensure_recovery_directory(&self) -> anyhow::Result<()> {
264380
if !self.recovery_path.exists() {
265381
fs::create_dir_all(&self.recovery_path)?;
266382
}
@@ -281,7 +397,7 @@ impl EventBlobCatchupManager {
281397
/// This function creates a client to download event blobs up to a specified
282398
/// checkpoint. The blobs are stored in the provided recovery path.
283399
#[cfg(feature = "client")]
284-
async fn collect_event_blobs_for_catchup(
400+
pub async fn collect_event_blobs_for_catchup(
285401
&self,
286402
starting_checkpoint_to_process: Option<u64>,
287403
) -> anyhow::Result<Vec<BlobId>> {
@@ -313,11 +429,18 @@ impl EventBlobCatchupManager {
313429
Ok(blob_ids)
314430
}
315431

316-
async fn process_event_blobs(
432+
/// Processes event blobs and stores them in the event store.
433+
///
434+
/// This function performs the following steps:
435+
/// 1. Iterates through the event blobs in reverse order (oldest to newest).
436+
/// 2. Processes each blob:
437+
/// - Collects relevant events that maintain a continuous sequence with the local store.
438+
/// - Stores the events in the event store.
439+
pub async fn process_event_blobs(
317440
&self,
318441
blobs: Vec<BlobId>,
319442
next_event_index: Option<u64>,
320-
) -> anyhow::Result<()> {
443+
) -> anyhow::Result<usize> {
321444
tracing::info!("starting to process event blobs");
322445

323446
let mut num_events_recovered = 0;
@@ -372,7 +495,7 @@ impl EventBlobCatchupManager {
372495
}
373496

374497
tracing::info!("recovered {} events from event blobs", num_events_recovered);
375-
Ok(())
498+
Ok(num_events_recovered)
376499
}
377500

378501
async fn process_single_blob(

crates/walrus-service/src/event/event_processor/config.rs

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -106,6 +106,25 @@ pub struct EventProcessorConfig {
106106
#[serde_as(as = "DurationSeconds")]
107107
#[serde(rename = "sampled_tracing_interval_secs")]
108108
pub sampled_tracing_interval: Duration,
109+
/// Enable runtime catchup functionality.
110+
pub enable_runtime_catchup: bool,
111+
/// Runtime catchup lag threshold for triggering catchup at runtime.
112+
/// When the lag exceeds this threshold during runtime, the catchup manager
113+
/// will be invoked in parallel with the checkpoint downloader to accelerate
114+
/// recovery. Should typically be the same as event_stream_catchup_min_checkpoint_lag.
115+
pub runtime_catchup_lag_threshold: u64,
116+
/// Interval for checking lag during runtime to trigger catchup if needed.
117+
#[serde_as(as = "DurationSeconds")]
118+
#[serde(rename = "runtime_lag_check_interval_secs")]
119+
pub runtime_lag_check_interval: Duration,
120+
/// Timeout for catchup operations to prevent indefinite blocking.
121+
#[serde_as(as = "DurationSeconds")]
122+
#[serde(rename = "catchup_coordination_timeout_secs")]
123+
pub catchup_coordination_timeout: Duration,
124+
/// Timeout for catchup processing to prevent indefinite blocking.
125+
#[serde_as(as = "DurationSeconds")]
126+
#[serde(rename = "catchup_processing_timeout_secs")]
127+
pub catchup_processing_timeout: Duration,
109128
}
110129

111130
impl Default for EventProcessorConfig {
@@ -116,6 +135,11 @@ impl Default for EventProcessorConfig {
116135
adaptive_downloader_config: Default::default(),
117136
event_stream_catchup_min_checkpoint_lag: 20_000,
118137
sampled_tracing_interval: Duration::from_secs(3600),
138+
runtime_catchup_lag_threshold: 20_000,
139+
runtime_lag_check_interval: Duration::from_secs(300),
140+
enable_runtime_catchup: true,
141+
catchup_coordination_timeout: Duration::from_secs(3000),
142+
catchup_processing_timeout: Duration::from_secs(3000),
119143
}
120144
}
121145
}

0 commit comments

Comments
 (0)