Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 13 additions & 5 deletions lightning-background-processor/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1228,12 +1228,20 @@ where
}

if let Some(liquidity_manager) = liquidity_manager.as_ref() {
log_trace!(logger, "Persisting LiquidityManager...");
let fut = async {
liquidity_manager.get_lm().persist().await.map_err(|e| {
log_error!(logger, "Persisting LiquidityManager failed: {}", e);
e
})
liquidity_manager
.get_lm()
.persist()
.await
.map(|did_persist| {
if did_persist {
log_trace!(logger, "Persisted LiquidityManager.");
}
})
.map_err(|e| {
log_error!(logger, "Persisting LiquidityManager failed: {}", e);
e
})
};
futures.set_e(Box::pin(fut));
}
Expand Down
6 changes: 3 additions & 3 deletions lightning-liquidity/src/events/event_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,12 +129,12 @@ where
EventQueueNotifierGuard(self)
}

pub async fn persist(&self) -> Result<(), lightning::io::Error> {
pub async fn persist(&self) -> Result<bool, lightning::io::Error> {
let fut = {
let mut state_lock = self.state.lock().unwrap();

if !state_lock.needs_persist {
return Ok(());
return Ok(false);
}

state_lock.needs_persist = false;
Expand All @@ -153,7 +153,7 @@ where
e
})?;

Ok(())
Ok(true)
}
}

Expand Down
9 changes: 6 additions & 3 deletions lightning-liquidity/src/lsps2/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1786,15 +1786,16 @@ where
})
}

pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> {
pub(crate) async fn persist(&self) -> Result<bool, lightning::io::Error> {
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
// introduce some batching to upper-bound the number of requests inflight at any given
// time.
let mut did_persist = false;

if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
// If we're not the first event processor to get here, just return early, the increment
// we just did will be treated as "go around again" at the end.
return Ok(());
return Ok(did_persist);
}

loop {
Expand All @@ -1820,6 +1821,7 @@ where
for counterparty_node_id in need_persist.into_iter() {
debug_assert!(!need_remove.contains(&counterparty_node_id));
self.persist_peer_state(counterparty_node_id).await?;
did_persist = true;
}

for counterparty_node_id in need_remove {
Expand Down Expand Up @@ -1854,6 +1856,7 @@ where
}
if let Some(future) = future_opt {
future.await?;
did_persist = true;
} else {
self.persist_peer_state(counterparty_node_id).await?;
}
Expand All @@ -1868,7 +1871,7 @@ where
break;
}

Ok(())
Ok(did_persist)
}

pub(crate) fn peer_disconnected(&self, counterparty_node_id: PublicKey) {
Expand Down
10 changes: 7 additions & 3 deletions lightning-liquidity/src/lsps5/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,17 @@ where
})
}

pub(crate) async fn persist(&self) -> Result<(), lightning::io::Error> {
pub(crate) async fn persist(&self) -> Result<bool, lightning::io::Error> {
// TODO: We should eventually persist in parallel, however, when we do, we probably want to
// introduce some batching to upper-bound the number of requests inflight at any given
// time.

let mut did_persist = false;

if self.persistence_in_flight.fetch_add(1, Ordering::AcqRel) > 0 {
// If we're not the first event processor to get here, just return early, the increment
// we just did will be treated as "go around again" at the end.
return Ok(());
return Ok(did_persist);
}

loop {
Expand All @@ -277,6 +279,7 @@ where
for client_id in need_persist.into_iter() {
debug_assert!(!need_remove.contains(&client_id));
self.persist_peer_state(client_id).await?;
did_persist = true;
}

for client_id in need_remove {
Expand Down Expand Up @@ -311,6 +314,7 @@ where
}
if let Some(future) = future_opt {
future.await?;
did_persist = true;
} else {
self.persist_peer_state(client_id).await?;
}
Expand All @@ -325,7 +329,7 @@ where
break;
}

Ok(())
Ok(did_persist)
}

fn check_prune_stale_webhooks<'a>(
Expand Down
20 changes: 13 additions & 7 deletions lightning-liquidity/src/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -670,23 +670,27 @@ where
self.pending_events.get_and_clear_pending_events()
}

/// Persists the state of the service handlers towards the given [`KVStore`] implementation.
/// Persists the state of the service handlers towards the given [`KVStore`] implementation if
/// needed.
///
/// Returns `true` if it persisted sevice handler data.
///
/// This will be regularly called by LDK's background processor if necessary and only needs to
/// be called manually if it's not utilized.
pub async fn persist(&self) -> Result<(), lightning::io::Error> {
pub async fn persist(&self) -> Result<bool, lightning::io::Error> {
// TODO: We should eventually persist in parallel.
self.pending_events.persist().await?;
let mut did_persist = false;
did_persist |= self.pending_events.persist().await?;

if let Some(lsps2_service_handler) = self.lsps2_service_handler.as_ref() {
lsps2_service_handler.persist().await?;
did_persist |= lsps2_service_handler.persist().await?;
}

if let Some(lsps5_service_handler) = self.lsps5_service_handler.as_ref() {
lsps5_service_handler.persist().await?;
did_persist |= lsps5_service_handler.persist().await?;
}

Ok(())
Ok(did_persist)
}

fn handle_lsps_message(
Expand Down Expand Up @@ -1285,8 +1289,10 @@ where

/// Persists the state of the service handlers towards the given [`KVStoreSync`] implementation.
///
/// Returns `true` if it persisted sevice handler data.
///
/// Wraps [`LiquidityManager::persist`].
pub fn persist(&self) -> Result<(), lightning::io::Error> {
pub fn persist(&self) -> Result<bool, lightning::io::Error> {
let mut waker = dummy_waker();
let mut ctx = task::Context::from_waker(&mut waker);
match Box::pin(self.inner.persist()).as_mut().poll(&mut ctx) {
Expand Down