Skip to content

Commit 4e9ef04

Browse files
authored
fix(spv): wait for MnListDiff responses before transitioning to next phase (#199)
* fix(spv): wait for MnListDiff responses before transitioning to next phase * cleanup * fmt * add retry and proceed with warning * clippy * fix * fmt
1 parent 67ebf3a commit 4e9ef04

File tree

3 files changed

+247
-48
lines changed

3 files changed

+247
-48
lines changed

dash-spv/src/sync/masternodes.rs

Lines changed: 220 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,16 @@ pub struct MasternodeSyncManager<S: StorageManager, N: NetworkManager> {
3939
// Sync state
4040
sync_in_progress: bool,
4141
last_sync_time: Option<Instant>,
42+
43+
// Track pending MnListDiff requests (for quorum validation)
44+
// This ensures we don't transition to the next phase before receiving all responses
45+
pending_mnlistdiff_requests: usize,
46+
47+
// Track when we started waiting for MnListDiff responses (for timeout detection)
48+
mnlistdiff_wait_start: Option<Instant>,
49+
50+
// Track retry attempts for MnListDiff requests
51+
mnlistdiff_retry_count: u8,
4252
}
4353

4454
impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync + 'static>
@@ -105,6 +115,9 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
105115
error: None,
106116
sync_in_progress: false,
107117
last_sync_time: None,
118+
pending_mnlistdiff_requests: 0,
119+
mnlistdiff_wait_start: None,
120+
mnlistdiff_retry_count: 0,
108121
_phantom_s: std::marker::PhantomData,
109122
_phantom_n: std::marker::PhantomData,
110123
}
@@ -379,6 +392,7 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
379392

380393
self.sync_in_progress = true;
381394
self.error = None;
395+
self.mnlistdiff_retry_count = 0; // Reset retry counter for new sync
382396

383397
// Get current chain tip
384398
let tip_height = storage
@@ -434,24 +448,160 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
434448
_network: &mut dyn NetworkManager,
435449
) -> SyncResult<bool> {
436450
self.insert_mn_list_diff(&diff, storage).await;
451+
452+
// Decrement pending request counter if we were expecting this response
453+
if self.pending_mnlistdiff_requests > 0 {
454+
self.pending_mnlistdiff_requests -= 1;
455+
tracing::info!(
456+
"📥 Received MnListDiff response ({} pending remaining)",
457+
self.pending_mnlistdiff_requests
458+
);
459+
460+
// If this was the last pending request, mark sync as complete
461+
if self.pending_mnlistdiff_requests == 0 && self.sync_in_progress {
462+
tracing::info!(
463+
"✅ All MnListDiff requests completed, marking masternode sync as done"
464+
);
465+
self.sync_in_progress = false;
466+
self.last_sync_time = Some(Instant::now());
467+
self.mnlistdiff_wait_start = None; // Clear wait timer
468+
469+
// Persist masternode state so phase manager can detect completion
470+
match storage.get_tip_height().await {
471+
Ok(Some(tip_height)) => {
472+
let state = crate::storage::MasternodeState {
473+
last_height: tip_height,
474+
engine_state: Vec::new(),
475+
last_update: std::time::SystemTime::now()
476+
.duration_since(std::time::UNIX_EPOCH)
477+
.map(|d| d.as_secs())
478+
.unwrap_or(0),
479+
};
480+
if let Err(e) = storage.store_masternode_state(&state).await {
481+
tracing::warn!("⚠️ Failed to store masternode state: {}", e);
482+
}
483+
}
484+
Ok(None) => {
485+
tracing::warn!(
486+
"⚠️ Storage returned no tip height when persisting masternode state"
487+
);
488+
}
489+
Err(e) => {
490+
tracing::warn!(
491+
"⚠️ Failed to read tip height to persist masternode state: {}",
492+
e
493+
);
494+
}
495+
}
496+
}
497+
}
498+
437499
Ok(false) // Not used for sync completion in simple approach
438500
}
439501

440502
/// Check for sync timeout
441503
pub async fn check_sync_timeout(
442504
&mut self,
443-
_storage: &mut dyn StorageManager,
444-
_network: &mut dyn NetworkManager,
505+
storage: &mut dyn StorageManager,
506+
network: &mut dyn NetworkManager,
445507
) -> SyncResult<()> {
446-
// Simple timeout check
447-
if self.sync_in_progress {
448-
if let Some(last_sync) = self.last_sync_time {
449-
if last_sync.elapsed() > Duration::from_secs(60) {
450-
self.sync_in_progress = false;
451-
self.error = Some("Sync timeout".to_string());
508+
// Check if we're waiting for MnListDiff responses and have timed out
509+
if self.pending_mnlistdiff_requests > 0 {
510+
if let Some(wait_start) = self.mnlistdiff_wait_start {
511+
let timeout_duration = Duration::from_secs(15);
512+
513+
if wait_start.elapsed() > timeout_duration {
514+
// Timeout hit
515+
if self.mnlistdiff_retry_count < 1 {
516+
// First timeout - retry by restarting the QRInfo request
517+
tracing::warn!(
518+
"⏰ Timeout waiting for {} MnListDiff responses after {:?}, retrying QRInfo request...",
519+
self.pending_mnlistdiff_requests,
520+
wait_start.elapsed()
521+
);
522+
523+
self.mnlistdiff_retry_count += 1;
524+
self.pending_mnlistdiff_requests = 0;
525+
self.mnlistdiff_wait_start = None;
526+
527+
// Restart by re-initiating the sync
528+
// Get current chain tip for the retry
529+
let tip_height = storage
530+
.get_tip_height()
531+
.await
532+
.map_err(|e| {
533+
SyncError::Storage(format!("Failed to get tip height: {}", e))
534+
})?
535+
.unwrap_or(0);
536+
537+
let tip_header = storage
538+
.get_header(tip_height)
539+
.await
540+
.map_err(|e| {
541+
SyncError::Storage(format!("Failed to get tip header: {}", e))
542+
})?
543+
.ok_or_else(|| {
544+
SyncError::Storage("Tip header not found".to_string())
545+
})?;
546+
let tip_hash = tip_header.block_hash();
547+
548+
let base_hash = if let Some(last_qrinfo_hash) = self.last_qrinfo_block_hash
549+
{
550+
last_qrinfo_hash
551+
} else {
552+
self.config.network.known_genesis_block_hash().ok_or_else(|| {
553+
SyncError::InvalidState("Genesis hash not available".to_string())
554+
})?
555+
};
556+
557+
// Re-send the QRInfo request
558+
match self.request_qrinfo(network, base_hash, tip_hash).await {
559+
Ok(()) => {
560+
tracing::info!("🔄 QRInfo retry request sent successfully");
561+
}
562+
Err(e) => {
563+
tracing::error!("❌ Failed to send retry QRInfo request: {}", e);
564+
self.error = Some(format!("Failed to retry QRInfo: {}", e));
565+
self.sync_in_progress = false;
566+
}
567+
}
568+
} else {
569+
// Already retried once - give up and force completion
570+
tracing::error!(
571+
"❌ Failed to receive {} MnListDiff responses after {:?} and {} retry attempt(s)",
572+
self.pending_mnlistdiff_requests,
573+
wait_start.elapsed(),
574+
self.mnlistdiff_retry_count
575+
);
576+
tracing::warn!(
577+
"⚠️ Proceeding without complete masternode data - quorum validation may be incomplete"
578+
);
579+
580+
// Force completion to unblock sync
581+
self.pending_mnlistdiff_requests = 0;
582+
self.mnlistdiff_wait_start = None;
583+
self.sync_in_progress = false;
584+
self.error = Some("MnListDiff requests timed out after retry".to_string());
585+
586+
// Still persist what we have
587+
if let Ok(Some(tip_height)) = storage.get_tip_height().await {
588+
let state = crate::storage::MasternodeState {
589+
last_height: tip_height,
590+
engine_state: Vec::new(),
591+
last_update: std::time::SystemTime::now()
592+
.duration_since(std::time::UNIX_EPOCH)
593+
.map(|d| d.as_secs())
594+
.unwrap_or(0),
595+
};
596+
if let Err(e) = storage.store_masternode_state(&state).await {
597+
tracing::warn!("⚠️ Failed to store masternode state: {}", e);
598+
}
599+
}
600+
}
452601
}
453602
}
454603
}
604+
455605
Ok(())
456606
}
457607

@@ -522,34 +672,49 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
522672
// Update last successful QRInfo block for progressive sync
523673
self.last_qrinfo_block_hash = Some(block_hash);
524674

525-
// Mark sync as completed successfully
526-
self.sync_in_progress = false;
527-
self.last_sync_time = Some(Instant::now());
528-
529-
// Persist masternode state so phase manager can detect completion
530-
// We store the current header tip height as the masternode sync height.
531-
match storage.get_tip_height().await {
532-
Ok(Some(tip_height)) => {
533-
let state = crate::storage::MasternodeState {
534-
last_height: tip_height,
535-
engine_state: Vec::new(),
536-
last_update: std::time::SystemTime::now()
537-
.duration_since(std::time::UNIX_EPOCH)
538-
.map(|d| d.as_secs())
539-
.unwrap_or(0),
540-
};
541-
if let Err(e) = storage.store_masternode_state(&state).await {
542-
tracing::warn!("⚠️ Failed to store masternode state: {}", e);
675+
// Check if we need to wait for MnListDiff responses
676+
if self.pending_mnlistdiff_requests == 0 {
677+
// No additional requests were sent (edge case: no quorum validation needed)
678+
// Mark sync as complete immediately
679+
tracing::info!("✅ QRInfo processing completed with no additional requests, masternode sync phase is done");
680+
self.sync_in_progress = false;
681+
self.last_sync_time = Some(Instant::now());
682+
self.mnlistdiff_wait_start = None; // Ensure wait timer is cleared
683+
684+
// Persist masternode state so phase manager can detect completion
685+
match storage.get_tip_height().await {
686+
Ok(Some(tip_height)) => {
687+
let state = crate::storage::MasternodeState {
688+
last_height: tip_height,
689+
engine_state: Vec::new(),
690+
last_update: std::time::SystemTime::now()
691+
.duration_since(std::time::UNIX_EPOCH)
692+
.map(|d| d.as_secs())
693+
.unwrap_or(0),
694+
};
695+
if let Err(e) = storage.store_masternode_state(&state).await {
696+
tracing::warn!("⚠️ Failed to store masternode state: {}", e);
697+
}
698+
}
699+
Ok(None) => {
700+
tracing::warn!(
701+
"⚠️ Storage returned no tip height when persisting masternode state"
702+
);
703+
}
704+
Err(e) => {
705+
tracing::warn!(
706+
"⚠️ Failed to read tip height to persist masternode state: {}",
707+
e
708+
);
543709
}
544710
}
545-
Ok(None) => {
546-
tracing::warn!(
547-
"⚠️ Storage returned no tip height when persisting masternode state"
548-
);
549-
}
550-
Err(e) => {
551-
tracing::warn!("⚠️ Failed to read tip height to persist masternode state: {}", e);
552-
}
711+
} else {
712+
tracing::info!(
713+
"⏳ Waiting for {} pending MnListDiff responses before completing masternode sync",
714+
self.pending_mnlistdiff_requests
715+
);
716+
// Keep sync_in_progress = true so we don't transition to the next phase yet
717+
// Completion and state persistence will happen in handle_mnlistdiff_message
553718
}
554719

555720
tracing::info!("✅ QRInfo processing completed successfully (unified path)");
@@ -652,6 +817,9 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
652817
quorum_hashes.len()
653818
);
654819

820+
// Track how many requests we're about to send
821+
let mut requests_sent = 0;
822+
655823
for quorum_hash in quorum_hashes.iter() {
656824
tracing::info!("🔍 Processing quorum hash: {}", quorum_hash);
657825

@@ -758,6 +926,9 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
758926
continue;
759927
}
760928

929+
// Track that we sent a request
930+
requests_sent += 1;
931+
761932
tracing::info!(
762933
"✅ Sent MnListDiff request for quorum hash {} (base: {} -> target: {})",
763934
quorum_hash,
@@ -766,10 +937,21 @@ impl<S: StorageManager + Send + Sync + 'static, N: NetworkManager + Send + Sync
766937
);
767938
}
768939

769-
tracing::info!(
770-
"📋 Completed sending {} MnListDiff requests for quorum validation",
771-
quorum_hashes.len()
772-
);
940+
// Update the pending request counter
941+
self.pending_mnlistdiff_requests += requests_sent;
942+
943+
// Start tracking wait time if we sent any requests
944+
if requests_sent > 0 {
945+
self.mnlistdiff_wait_start = Some(Instant::now());
946+
tracing::info!(
947+
"📋 Completed sending {} MnListDiff requests for quorum validation (total pending: {}), started timeout tracking",
948+
requests_sent,
949+
self.pending_mnlistdiff_requests
950+
);
951+
} else {
952+
tracing::info!("📋 No MnListDiff requests sent (all quorums already have data)");
953+
}
954+
773955
Ok(())
774956
}
775957
}

dash-spv/src/sync/sequential/message_handlers.rs

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -404,8 +404,7 @@ impl<
404404
network: &mut N,
405405
storage: &mut S,
406406
) -> SyncResult<()> {
407-
let continue_sync =
408-
self.masternode_sync.handle_mnlistdiff_message(diff, storage, network).await?;
407+
self.masternode_sync.handle_mnlistdiff_message(diff, storage, network).await?;
409408

410409
// Update phase state
411410
if let SyncPhase::DownloadingMnList {
@@ -422,8 +421,9 @@ impl<
422421
*diffs_processed += 1;
423422
self.current_phase.update_progress();
424423

425-
// Check if phase is complete
426-
if !continue_sync {
424+
// Check if phase is complete by verifying masternode sync is no longer in progress
425+
// This ensures we wait for all pending MnListDiff requests to be received
426+
if !self.masternode_sync.is_syncing() {
427427
// Masternode sync has completed - ensure phase state reflects this
428428
// by updating target_height to match current_height before transition
429429
if let SyncPhase::DownloadingMnList {
@@ -438,6 +438,7 @@ impl<
438438
}
439439
}
440440

441+
tracing::info!("✅ All MnListDiff requests completed, transitioning to next phase");
441442
self.transition_to_next_phase(storage, network, "Masternode sync complete").await?;
442443

443444
// Execute the next phase
@@ -472,7 +473,7 @@ impl<
472473
return Err(SyncError::Validation(error.to_string()));
473474
}
474475

475-
// Update phase state - QRInfo processing should complete the masternode sync phase
476+
// Update phase state
476477
if let SyncPhase::DownloadingMnList {
477478
current_height,
478479
diffs_processed,
@@ -486,13 +487,21 @@ impl<
486487
*diffs_processed += 1;
487488
self.current_phase.update_progress();
488489

489-
tracing::info!("✅ QRInfo processing completed, masternode sync phase finished");
490+
// Check if masternode sync is complete (all pending MnListDiff requests received)
491+
if !self.masternode_sync.is_syncing() {
492+
tracing::info!("✅ QRInfo processing completed with all MnListDiff requests, masternode sync phase finished");
490493

491-
// Transition to next phase (filter headers)
492-
self.transition_to_next_phase(storage, network, "QRInfo processing completed").await?;
494+
// Transition to next phase (filter headers)
495+
self.transition_to_next_phase(storage, network, "QRInfo processing completed")
496+
.await?;
493497

494-
// Immediately execute the next phase so CFHeaders begins without delay
495-
self.execute_current_phase(network, storage).await?;
498+
// Immediately execute the next phase so CFHeaders begins without delay
499+
self.execute_current_phase(network, storage).await?;
500+
} else {
501+
tracing::info!(
502+
"⏳ QRInfo processing completed, waiting for pending MnListDiff responses before transitioning"
503+
);
504+
}
496505
}
497506

498507
Ok(())

0 commit comments

Comments
 (0)