Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions dash-spv/src/storage/headers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ impl DiskStorageManager {
) -> StorageResult<()> {
let hashes = headers.iter().map(|header| header.block_hash()).collect::<Vec<_>>();

self.block_headers.write().await.store_items(headers, height, self).await?;
self.block_headers.write().await.store_items_at_height(headers, height).await?;

// Update reverse index
let mut reverse_index = self.header_hash_index.write().await;
Expand All @@ -30,9 +30,6 @@ impl DiskStorageManager {
height += 1;
}

// Release locks before saving (to avoid deadlocks during background saves)
drop(reverse_index);

Ok(())
}

Expand Down
143 changes: 16 additions & 127 deletions dash-spv/src/storage/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use std::time::Duration;
use tokio::sync::RwLock;

use dashcore::{block::Header as BlockHeader, hash_types::FilterHeader, BlockHash, Txid};

Expand All @@ -14,24 +15,6 @@ use crate::types::{MempoolState, UnconfirmedTransaction};

use super::lockfile::LockFile;

/// Commands for the background worker
#[derive(Debug, Clone)]
pub(super) enum WorkerCommand {
SaveBlockHeaderSegmentCache {
segment_id: u32,
},
SaveFilterHeaderSegmentCache {
segment_id: u32,
},
SaveFilterSegmentCache {
segment_id: u32,
},
SaveIndex {
index: HashMap<BlockHash, u32>,
},
Shutdown,
}

/// Disk-based storage manager with segmented files and async background saving.
pub struct DiskStorageManager {
pub(super) base_path: PathBuf,
Expand All @@ -45,12 +28,8 @@ pub struct DiskStorageManager {
pub(super) header_hash_index: Arc<RwLock<HashMap<BlockHash, u32>>>,

// Background worker
pub(super) worker_tx: Option<mpsc::Sender<WorkerCommand>>,
pub(super) worker_handle: Option<tokio::task::JoinHandle<()>>,

// Index save tracking to avoid redundant saves
pub(super) last_index_save_count: Arc<RwLock<usize>>,

// Mempool storage
pub(super) mempool_transactions: Arc<RwLock<HashMap<Txid, UnconfirmedTransaction>>>,
pub(super) mempool_state: Arc<RwLock<Option<MempoolState>>>,
Expand Down Expand Up @@ -94,9 +73,7 @@ impl DiskStorageManager {
)),
filters: Arc::new(RwLock::new(SegmentCache::load_or_new(base_path.clone()).await?)),
header_hash_index: Arc::new(RwLock::new(HashMap::new())),
worker_tx: None,
worker_handle: None,
last_index_save_count: Arc::new(RwLock::new(0)),
mempool_transactions: Arc::new(RwLock::new(HashMap::new())),
mempool_state: Arc::new(RwLock::new(None)),
_lock_file: lock_file,
Expand All @@ -107,7 +84,8 @@ impl DiskStorageManager {
tracing::debug!("Loaded sync_base_height: {}", state.sync_base_height);
}

// Start background worker
// Start background worker that
// persists data when appropriate
storage.start_worker().await;

// Rebuild index
Expand Down Expand Up @@ -136,118 +114,29 @@ impl DiskStorageManager {

/// Start the background worker
pub(super) async fn start_worker(&mut self) {
let (worker_tx, mut worker_rx) = mpsc::channel::<WorkerCommand>(100);

let worker_base_path = self.base_path.clone();
let base_path = self.base_path.clone();

let block_headers = Arc::clone(&self.block_headers);
let filter_headers = Arc::clone(&self.filter_headers);
let cfilters = Arc::clone(&self.filters);
let filters = Arc::clone(&self.filters);

let worker_handle = tokio::spawn(async move {
while let Some(cmd) = worker_rx.recv().await {
match cmd {
WorkerCommand::SaveBlockHeaderSegmentCache {
segment_id,
} => {
let mut cache = block_headers.write().await;
let segment = match cache.get_segment_mut(&segment_id).await {
Ok(segment) => segment,
Err(e) => {
eprintln!("Failed to get segment {}: {}", segment_id, e);
continue;
}
};

match segment.persist(&base_path).await {
Ok(()) => {
tracing::trace!(
"Background worker completed saving header segment {}",
segment_id
);
}
Err(e) => {
eprintln!("Failed to save segment {}: {}", segment_id, e);
}
}
}
WorkerCommand::SaveFilterHeaderSegmentCache {
segment_id,
} => {
let mut cache = filter_headers.write().await;
let segment = match cache.get_segment_mut(&segment_id).await {
Ok(segment) => segment,
Err(e) => {
eprintln!("Failed to get segment {}: {}", segment_id, e);
continue;
}
};

match segment.persist(&base_path).await {
Ok(()) => {
tracing::trace!(
"Background worker completed saving header segment {}",
segment_id
);
}
Err(e) => {
eprintln!("Failed to save segment {}: {}", segment_id, e);
}
}
}
WorkerCommand::SaveFilterSegmentCache {
segment_id,
} => {
let mut cache = cfilters.write().await;
let segment = match cache.get_segment_mut(&segment_id).await {
Ok(segment) => segment,
Err(e) => {
eprintln!("Failed to get segment {}: {}", segment_id, e);
continue;
}
};

match segment.persist(&base_path).await {
Ok(()) => {
tracing::trace!(
"Background worker completed saving filter segment {}",
segment_id
);
}
Err(e) => {
eprintln!("Failed to save segment {}: {}", segment_id, e);
}
}
}
WorkerCommand::SaveIndex {
index,
} => {
let path = worker_base_path.join("headers/index.dat");
if let Err(e) = super::headers::save_index_to_disk(&path, &index).await {
eprintln!("Failed to save index: {}", e);
} else {
tracing::trace!("Background worker completed saving index");
}
}
WorkerCommand::Shutdown => {
break;
}
}
let mut ticker = tokio::time::interval(Duration::from_secs(5));

loop {
ticker.tick().await;

block_headers.write().await.persist_evicted().await;
filter_headers.write().await.persist_evicted().await;
filters.write().await.persist_evicted().await;
}
});

self.worker_tx = Some(worker_tx);
self.worker_handle = Some(worker_handle);
}

/// Stop the background worker without forcing a save.
pub(super) async fn stop_worker(&mut self) {
if let Some(tx) = self.worker_tx.take() {
let _ = tx.send(WorkerCommand::Shutdown).await;
}
if let Some(handle) = self.worker_handle.take() {
let _ = handle.await;
pub(super) fn stop_worker(&self) {
if let Some(handle) = &self.worker_handle {
handle.abort();
}
}
}
Loading
Loading