From 67dfb0b1482f81fe5a888e94c8f54dc5be99e409 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 3 Jul 2025 23:59:14 +0000 Subject: [PATCH 1/5] test(bitcoind_rpc): Detect new mempool txs --- crates/bitcoind_rpc/tests/test_emitter.rs | 38 +++++++++++++++++++++-- 1 file changed, 36 insertions(+), 2 deletions(-) diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 9bfa12892..187325a88 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -11,7 +11,7 @@ use bdk_chain::{ Balance, BlockId, CanonicalizationParams, IndexedTxGraph, Merge, }; use bdk_testenv::{anyhow, TestEnv}; -use bitcoin::{hashes::Hash, Block, OutPoint, ScriptBuf, WScriptHash}; +use bitcoin::{hashes::Hash, Block, Network, OutPoint, ScriptBuf, WScriptHash}; use bitcoincore_rpc::RpcApi; /// Ensure that blocks are emitted in order even after reorg. @@ -351,7 +351,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> { .get_new_address(None, None)? .assume_checked(); let spk_to_track = ScriptBuf::new_p2wsh(&WScriptHash::all_zeros()); - let addr_to_track = Address::from_script(&spk_to_track, bitcoin::Network::Regtest)?; + let addr_to_track = Address::from_script(&spk_to_track, Network::Regtest)?; // setup receiver let (mut recv_chain, _) = LocalChain::from_genesis_hash(env.rpc_client().get_block_hash(0)?); @@ -867,3 +867,37 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> { Ok(()) } + +#[test] +fn detect_new_mempool_txs() -> anyhow::Result<()> { + let env = TestEnv::new()?; + env.mine_blocks(101, None)?; + + let addr = env + .rpc_client() + .get_new_address(None, None)? + .require_network(Network::Regtest)?; + + let mut emitter = Emitter::new( + env.rpc_client(), + CheckPoint::new(BlockId { + height: 0, + hash: env.rpc_client().get_block_hash(0)?, + }), + 0, + NO_EXPECTED_MEMPOOL_TXIDS, + ); + + while let Some(_) = emitter.next_block()? {} + + for n in 0..5 { + let txid = env.send(&addr, Amount::ONE_BTC)?; + let new_txs = emitter.mempool()?.new_txs; + assert!( + new_txs.iter().any(|(tx, _)| tx.compute_txid() == txid), + "must detect new tx {n}" + ); + } + + Ok(()) +} From 05464ecf850feabd30a13aa4927d8797d06148cf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 3 Jul 2025 23:58:53 +0000 Subject: [PATCH 2/5] fix(bitcoind_rpc)!: Simplify emitter Instead of having an avoid-reemission logic based on timestamps returned by bitcoind RPC, we wrap all transactions in `Arc` and always emit the entire mempool. This makes emission cheap while avoiding the flawed avoid-reemission logic. --- crates/bitcoind_rpc/src/lib.rs | 241 ++++++--------- crates/bitcoind_rpc/tests/test_emitter.rs | 278 +++--------------- .../example_bitcoind_rpc_polling/src/main.rs | 9 +- 3 files changed, 134 insertions(+), 394 deletions(-) diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 250ff4306..d72c22c7a 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -12,7 +12,11 @@ use bdk_core::{BlockId, CheckPoint}; use bitcoin::{Block, BlockHash, Transaction, Txid}; use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi}; -use std::{collections::HashSet, ops::Deref}; +use std::{ + collections::{HashMap, HashSet}, + ops::Deref, + sync::Arc, +}; pub mod bip158; @@ -37,30 +41,23 @@ pub struct Emitter { /// gives us an opportunity to re-fetch this result. last_block: Option, - /// The latest first-seen epoch of emitted mempool transactions. This is used to determine - /// whether a mempool transaction is already emitted. - last_mempool_time: usize, - - /// The last emitted block during our last mempool emission. This is used to determine whether - /// there has been a reorg since our last mempool emission. - last_mempool_tip: Option, - - /// A set of txids currently assumed to still be in the mempool. + /// The last snapshot of mempool transactions. /// - /// This is used to detect mempool evictions by comparing the set against the latest mempool - /// snapshot from bitcoind. Any txid in this set that is missing from the snapshot is - /// considered evicted. + /// This is used to detect mempool evictions and as a cache for transactions to emit. /// - /// When the emitter emits a block, confirmed txids are removed from this set. This prevents - /// confirmed transactions from being mistakenly marked with an `evicted_at` timestamp. - expected_mempool_txids: HashSet, + /// For mempool evictions, the latest call to `getrawmempool` is compared against this field. + /// Any transaction that is missing from this field is considered evicted. The exception is if + /// the transaction is confirmed into a block - therefore, we only emit evictions when we are + /// sure the tip block is already emitted. When a block is emitted, the transactions in the + /// block are removed from this field. + mempool_snapshot: HashMap>, } /// Indicates that there are no initially expected mempool transactions. /// /// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known /// to start empty (i.e. with no unconfirmed transactions). -pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty = core::iter::empty(); +pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty> = core::iter::empty(); impl Emitter where @@ -75,23 +72,27 @@ where /// `start_height` starts emission from a given height (if there are no conflicts with the /// original chain). /// - /// `expected_mempool_txids` is the initial set of unconfirmed txids provided by the wallet. - /// This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. If it is - /// known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used. + /// `expected_mempool_txs` is the initial set of unconfirmed transactions provided by the + /// wallet. This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. + /// If it is known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used. pub fn new( client: C, last_cp: CheckPoint, start_height: u32, - expected_mempool_txids: impl IntoIterator>, + expected_mempool_txs: impl IntoIterator>>, ) -> Self { Self { client, start_height, last_cp, last_block: None, - last_mempool_time: 0, - last_mempool_tip: None, - expected_mempool_txids: expected_mempool_txids.into_iter().map(Into::into).collect(), + mempool_snapshot: expected_mempool_txs + .into_iter() + .map(|tx| { + let tx: Arc = tx.into(); + (tx.compute_txid(), tx) + }) + .collect(), } } @@ -115,102 +116,81 @@ where pub fn mempool(&mut self) -> Result { let client = &*self.client; - // This is the emitted tip height during the last mempool emission. - let prev_mempool_tip = self - .last_mempool_tip - // We use `start_height - 1` as we cannot guarantee that the block at - // `start_height` has been emitted. - .unwrap_or(self.start_height.saturating_sub(1)); - - // Loop to make sure that the fetched mempool content and the fetched tip are consistent - // with one another. - let (raw_mempool, raw_mempool_txids, rpc_height, rpc_block_hash) = loop { - // Determine if height and hash matches the best block from the RPC. Evictions are - // deferred if we are not at the best block. - let height = client.get_block_count()?; - let hash = client.get_block_hash(height)?; - - // Get the raw mempool result from the RPC client which will be used to determine if any - // transactions have been evicted. - let mp = client.get_raw_mempool_verbose()?; - let mp_txids: HashSet = mp.keys().copied().collect(); - - if height == client.get_block_count()? && hash == client.get_block_hash(height)? { - break (mp, mp_txids, height, hash); + let mut rpc_tip_height; + let mut rpc_tip_hash; + let mut rpc_mempool; + let mut rpc_mempool_txids; + + // Ensure we get a mempool snapshot consistent with `rpc_tip_hash` as the tip. + loop { + rpc_tip_height = client.get_block_count()?; + rpc_tip_hash = client.get_block_hash(rpc_tip_height)?; + rpc_mempool = client.get_raw_mempool_verbose()?; + rpc_mempool_txids = rpc_mempool.keys().copied().collect::>(); + let is_still_at_tip = rpc_tip_hash == client.get_block_hash(rpc_tip_height)? + && rpc_tip_height == client.get_block_count()?; + if is_still_at_tip { + break; } - }; - - let at_tip = - rpc_height == self.last_cp.height() as u64 && rpc_block_hash == self.last_cp.hash(); - - // If at tip, any expected txid missing from raw mempool is considered evicted; - // if not at tip, we don't evict anything. - let evicted_txids: HashSet = if at_tip { - self.expected_mempool_txids - .difference(&raw_mempool_txids) - .copied() - .collect() - } else { - HashSet::new() - }; + } - // Mempool txs come with a timestamp of when the tx is introduced to the mempool. We keep - // track of the latest mempool tx's timestamp to determine whether we have seen a tx - // before. `prev_mempool_time` is the previous timestamp and `last_time` records what will - // be the new latest timestamp. - let prev_mempool_time = self.last_mempool_time; - let mut latest_time = prev_mempool_time; + let mut mempool_event = MempoolEvent::default(); + let update_time = &mut 0_u64; - let new_txs = raw_mempool + mempool_event.update = rpc_mempool .into_iter() .filter_map({ - let latest_time = &mut latest_time; - move |(txid, tx_entry)| -> Option> { - let tx_time = tx_entry.time as usize; - if tx_time > *latest_time { - *latest_time = tx_time; - } - // Best-effort check to avoid re-emitting transactions we've already emitted. - // - // Complete suppression isn't possible, since a transaction may spend outputs - // owned by the wallet. To determine if such a transaction is relevant, we must - // have already seen its ancestor(s) that contain the spent prevouts. - // - // Fortunately, bitcoind provides the block height at which the transaction - // entered the mempool. If we've already emitted that block height, we can - // reasonably assume the receiver has seen all ancestor transactions. - let is_already_emitted = tx_time <= prev_mempool_time; - let is_within_height = tx_entry.height <= prev_mempool_tip as _; - if is_already_emitted && is_within_height { - return None; - } - let tx = match client.get_raw_transaction(&txid, None) { - Ok(tx) => tx, - Err(err) if err.is_not_found_error() => return None, - Err(err) => return Some(Err(err)), + |(txid, tx_entry)| -> Option> { + *update_time = u64::max(*update_time, tx_entry.time); + let tx = match self.mempool_snapshot.get(&txid) { + Some(tx) => tx.clone(), + None => match client.get_raw_transaction(&txid, None) { + Ok(tx) => { + let tx = Arc::new(tx); + self.mempool_snapshot.insert(txid, tx.clone()); + tx + } + Err(err) if err.is_not_found_error() => return None, + Err(err) => return Some(Err(err)), + }, }; - Some(Ok((tx, tx_time as u64))) + Some(Ok((tx, tx_entry.time))) } }) .collect::, _>>()?; - self.last_mempool_time = latest_time; - self.last_mempool_tip = Some(self.last_cp.height()); + let at_tip = + rpc_tip_height == self.last_cp.height() as u64 && rpc_tip_hash == self.last_cp.hash(); - // If at tip, we replace `expected_mempool_txids` with just the new txids. Otherwise, we’re - // still catching up to the tip and keep accumulating. if at_tip { - self.expected_mempool_txids = new_txs.iter().map(|(tx, _)| tx.compute_txid()).collect(); + // We only emit evicted transactions when we have already emitted the RPC tip. This is + // because we cannot differenciate between transactions that are confirmed and + // transactions that are evicted, so we rely on emitted blocks to remove + // transactions from the `mempool_snapshot`. + mempool_event.evicted = self + .mempool_snapshot + .keys() + .filter(|&txid| !rpc_mempool_txids.contains(txid)) + .map(|&txid| (txid, *update_time)) + .collect(); + self.mempool_snapshot = mempool_event + .update + .iter() + .map(|(tx, _)| (tx.compute_txid(), tx.clone())) + .collect(); } else { - self.expected_mempool_txids - .extend(new_txs.iter().map(|(tx, _)| tx.compute_txid())); - } + // Since we are still catching up to the tip (a.k.a tip has not been emitted), we + // accumulate more transactions in `mempool_snapshot` so that we can emit evictions in + // a batch once we catch up. + self.mempool_snapshot.extend( + mempool_event + .update + .iter() + .map(|(tx, _)| (tx.compute_txid(), tx.clone())), + ); + }; - Ok(MempoolEvent { - new_txs, - evicted_txids, - latest_update_time: latest_time as u64, - }) + Ok(mempool_event) } /// Emit the next block height and block (if any). @@ -218,7 +198,7 @@ where if let Some((checkpoint, block)) = poll(self, move |hash, client| client.get_block(hash))? { // Stop tracking unconfirmed transactions that have been confirmed in this block. for tx in &block.txdata { - self.expected_mempool_txids.remove(&tx.compute_txid()); + self.mempool_snapshot.remove(&tx.compute_txid()); } return Ok(Some(BlockEvent { block, checkpoint })); } @@ -227,32 +207,13 @@ where } /// A new emission from mempool. -#[derive(Debug)] +#[derive(Debug, Default)] pub struct MempoolEvent { - /// Unemitted transactions or transactions with ancestors that are unseen by the receiver. - /// - /// To understand the second condition, consider a receiver which filters transactions based on - /// whether it alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction - /// spends a tracked UTXO which is confirmed at height `h`, but the receiver has only seen up - /// to block of height `h-1`, we want to re-emit this transaction until the receiver has - /// seen the block at height `h`. - pub new_txs: Vec<(Transaction, u64)>, - - /// [`Txid`]s of all transactions that have been evicted from mempool. - pub evicted_txids: HashSet, + /// Transactions currently in the mempool alongside their seen-at timestamp. + pub update: Vec<(Arc, u64)>, - /// The latest timestamp of when a transaction entered the mempool. - /// - /// This is useful for setting the timestamp for evicted transactions. - pub latest_update_time: u64, -} - -impl MempoolEvent { - /// Returns an iterator of `(txid, evicted_at)` pairs for all evicted transactions. - pub fn evicted_ats(&self) -> impl ExactSizeIterator + '_ { - let time = self.latest_update_time; - self.evicted_txids.iter().map(move |&txid| (txid, time)) - } + /// Transactions evicted from the mempool alongside their evicted-at timestamp. + pub evicted: Vec<(Txid, u64)>, } /// A newly emitted block from [`Emitter`]. @@ -396,16 +357,6 @@ where continue; } PollResponse::AgreementFound(res, cp) => { - let agreement_h = res.height as u32; - - // The tip during the last mempool emission needs to in the best chain, we reduce - // it if it is not. - if let Some(h) = emitter.last_mempool_tip.as_mut() { - if *h > agreement_h { - *h = agreement_h; - } - } - // get rid of evicted blocks emitter.last_cp = cp; emitter.last_block = Some(res); @@ -479,7 +430,7 @@ mod test { for txid in &mempool_txids { assert!( - emitter.expected_mempool_txids.contains(txid), + emitter.mempool_snapshot.contains_key(txid), "Expected txid {txid:?} missing" ); } @@ -500,19 +451,19 @@ mod test { .collect::>(); for txid in confirmed_txids { assert!( - !emitter.expected_mempool_txids.contains(&txid), + !emitter.mempool_snapshot.contains_key(&txid), "Expected txid {txid:?} should have been removed" ); } for txid in &mempool_txids { assert!( - emitter.expected_mempool_txids.contains(txid), + emitter.mempool_snapshot.contains_key(txid), "Expected txid {txid:?} missing" ); } } - assert!(emitter.expected_mempool_txids.is_empty()); + assert!(emitter.mempool_snapshot.is_empty()); Ok(()) } diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 187325a88..0ccb7fe61 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -1,7 +1,4 @@ -use std::{ - collections::{BTreeMap, BTreeSet, HashSet}, - ops::Deref, -}; +use std::{collections::BTreeSet, ops::Deref}; use bdk_bitcoind_rpc::{Emitter, NO_EXPECTED_MEMPOOL_TXIDS}; use bdk_chain::{ @@ -197,7 +194,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { assert!(emitter.next_block()?.is_none()); let mempool_txs = emitter.mempool()?; - let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.new_txs); + let indexed_additions = indexed_tx_graph.batch_insert_unconfirmed(mempool_txs.update); assert_eq!( indexed_additions .tx_graph @@ -449,250 +446,36 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { .map(|_| env.send(&addr, Amount::from_sat(2100))) .collect::, _>>()?; - // the first emission should include all transactions - let emitted_txids = emitter - .mempool()? - .new_txs - .into_iter() - .map(|(tx, _)| tx.compute_txid()) - .collect::>(); - assert_eq!( - emitted_txids, exp_txids, - "all mempool txs should be emitted" - ); - - // second emission should be empty - assert!( - emitter.mempool()?.new_txs.is_empty(), - "second emission should be empty" - ); - - // mine empty blocks + sync up our emitter -> we should still not re-emit - for _ in 0..BLOCKS_TO_MINE { - env.mine_empty_block()?; - } - while emitter.next_block()?.is_some() {} - assert!( - emitter.mempool()?.new_txs.is_empty(), - "third emission, after chain tip is extended, should also be empty" - ); - - Ok(()) -} - -/// Ensure mempool tx is still re-emitted if [`Emitter`] has not reached the tx's introduction -/// height. -/// -/// We introduce a mempool tx after each block, where blocks are empty (does not confirm previous -/// mempool txs). Then we emit blocks from [`Emitter`] (intertwining `mempool` calls). We check -/// that `mempool` should always re-emit txs that have introduced at a height greater than the last -/// emitted block height. -#[test] -fn mempool_re_emits_if_tx_introduction_height_not_reached() -> anyhow::Result<()> { - const PREMINE_COUNT: usize = 101; - const MEMPOOL_TX_COUNT: usize = 21; - - let env = TestEnv::new()?; - let mut emitter = Emitter::new( - env.rpc_client(), - CheckPoint::new(BlockId { - height: 0, - hash: env.rpc_client().get_block_hash(0)?, - }), - 0, - NO_EXPECTED_MEMPOOL_TXIDS, - ); - - // mine blocks to get initial balance, sync emitter up to tip - let addr = env - .rpc_client() - .get_new_address(None, None)? - .assume_checked(); - env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?; - while emitter.next_block()?.is_some() {} - - // mine blocks to introduce txs to mempool at different heights - let tx_introductions = (0..MEMPOOL_TX_COUNT) - .map(|_| -> anyhow::Result<_> { - let (height, _) = env.mine_empty_block()?; - let txid = env.send(&addr, Amount::from_sat(2100))?; - Ok((height, txid)) - }) - .collect::>>()?; - - assert_eq!( - emitter - .mempool()? - .new_txs - .into_iter() - .map(|(tx, _)| tx.compute_txid()) - .collect::>(), - tx_introductions.iter().map(|&(_, txid)| txid).collect(), - "first mempool emission should include all txs", - ); - assert_eq!( - emitter + // First two emissions should include all transactions. + for _ in 0..2 { + let emitted_txids = emitter .mempool()? - .new_txs + .update .into_iter() .map(|(tx, _)| tx.compute_txid()) - .collect::>(), - tx_introductions.iter().map(|&(_, txid)| txid).collect(), - "second mempool emission should still include all txs", - ); - - // At this point, the emitter has seen all mempool transactions. It should only re-emit those - // that have introduction heights less than the emitter's last-emitted block tip. - while let Some(emission) = emitter.next_block()? { - let height = emission.block_height(); - // We call `mempool()` twice. - // The second call (at height `h`) should skip the tx introduced at height `h`. - for try_index in 0..2 { - let exp_txids = tx_introductions - .range((height as usize + try_index, Txid::all_zeros())..) - .map(|&(_, txid)| txid) - .collect::>(); - let emitted_txids = emitter - .mempool()? - .new_txs - .into_iter() - .map(|(tx, _)| tx.compute_txid()) - .collect::>(); - assert_eq!( - emitted_txids, exp_txids, - "\n emission {} (try {}) must only contain txs introduced at that height or lower: \n\t missing: {:?} \n\t extra: {:?}", - height, - try_index, - exp_txids - .difference(&emitted_txids) - .map(|txid| (txid, tx_introductions.iter().find_map(|(h, id)| if id == txid { Some(h) } else { None }).unwrap())) - .collect::>(), - emitted_txids - .difference(&exp_txids) - .map(|txid| (txid, tx_introductions.iter().find_map(|(h, id)| if id == txid { Some(h) } else { None }).unwrap())) - .collect::>(), - ); - } + .collect::>(); + assert_eq!( + emitted_txids, exp_txids, + "all mempool txs should be emitted" + ); } - Ok(()) -} - -/// Ensure we force re-emit all mempool txs after reorg. -#[test] -fn mempool_during_reorg() -> anyhow::Result<()> { - const TIP_DIFF: usize = 10; - const PREMINE_COUNT: usize = 101; - - let env = TestEnv::new()?; - let mut emitter = Emitter::new( - env.rpc_client(), - CheckPoint::new(BlockId { - height: 0, - hash: env.rpc_client().get_block_hash(0)?, - }), - 0, - NO_EXPECTED_MEMPOOL_TXIDS, - ); - - // mine blocks to get initial balance - let addr = env - .rpc_client() - .get_new_address(None, None)? - .assume_checked(); - env.mine_blocks(PREMINE_COUNT, Some(addr.clone()))?; - - // introduce mempool tx at each block extension - for _ in 0..TIP_DIFF { + // mine empty blocks + sync up our emitter -> we should still not re-emit + for _ in 0..BLOCKS_TO_MINE { env.mine_empty_block()?; - env.send(&addr, Amount::from_sat(2100))?; } - - // sync emitter to tip, first mempool emission should include all txs (as we haven't emitted - // from the mempool yet) while emitter.next_block()?.is_some() {} + let emitted_txids = emitter + .mempool()? + .update + .into_iter() + .map(|(tx, _)| tx.compute_txid()) + .collect::>(); assert_eq!( - emitter - .mempool()? - .new_txs - .into_iter() - .map(|(tx, _)| tx.compute_txid()) - .collect::>(), - env.rpc_client() - .get_raw_mempool()? - .into_iter() - .collect::>(), - "first mempool emission should include all txs", + emitted_txids, exp_txids, + "all mempool txs should be emitted" ); - // perform reorgs at different heights, these reorgs will not confirm transactions in the - // mempool - for reorg_count in 1..TIP_DIFF { - env.reorg_empty_blocks(reorg_count)?; - - // This is a map of mempool txids to tip height where the tx was introduced to the mempool - // we recalculate this at every loop as reorgs may evict transactions from mempool. We use - // the introduction height to determine whether we expect a tx to appear in a mempool - // emission. - // TODO: How can have have reorg logic in `TestEnv` NOT blacklast old blocks first? - let tx_introductions = dbg!(env - .rpc_client() - .get_raw_mempool_verbose()? - .into_iter() - .map(|(txid, entry)| (txid, entry.height as usize)) - .collect::>()); - - // `next_header` emits the replacement block of the reorg - if let Some(emission) = emitter.next_block()? { - let height = emission.block_height(); - - // the mempool emission (that follows the first block emission after reorg) should only - // include mempool txs introduced at reorg height or greater - let mempool = emitter - .mempool()? - .new_txs - .into_iter() - .map(|(tx, _)| tx.compute_txid()) - .collect::>(); - let exp_mempool = tx_introductions - .iter() - .filter(|(_, &intro_h)| intro_h >= (height as usize)) - .map(|(&txid, _)| txid) - .collect::>(); - assert_eq!( - mempool, exp_mempool, - "the first mempool emission after reorg should only include mempool txs introduced at reorg height or greater" - ); - - let mempool = emitter - .mempool()? - .new_txs - .into_iter() - .map(|(tx, _)| tx.compute_txid()) - .collect::>(); - let exp_mempool = tx_introductions - .iter() - .filter(|&(_, &intro_height)| intro_height > (height as usize)) - .map(|(&txid, _)| txid) - .collect::>(); - assert_eq!( - mempool, exp_mempool, - "following mempool emissions after reorg should exclude mempool introduction heights <= last emitted block height: \n\t missing: {:?} \n\t extra: {:?}", - exp_mempool - .difference(&mempool) - .map(|txid| (txid, tx_introductions.get(txid).unwrap())) - .collect::>(), - mempool - .difference(&exp_mempool) - .map(|txid| (txid, tx_introductions.get(txid).unwrap())) - .collect::>(), - ); - } - - // sync emitter to tip - while emitter.next_block()?.is_some() {} - } - Ok(()) } @@ -803,14 +586,18 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> { &Address::from_script(&spk, Network::Regtest)?, Amount::ONE_BTC, )?; + let tx_1 = env + .rpc_client() + .get_transaction(&txid_1, None)? + .transaction()?; - let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1, HashSet::from([txid_1])); + let mut emitter = Emitter::new(env.rpc_client(), chain.tip(), 1, core::iter::once(tx_1)); while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); chain.apply_update(CheckPoint::from_header(&emission.block.header, height))?; } - let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.new_txs); + let changeset = graph.batch_insert_unconfirmed(emitter.mempool()?.update); assert!(changeset .tx_graph .txs @@ -852,10 +639,13 @@ fn test_expect_tx_evicted() -> anyhow::Result<()> { // Check that mempool emission contains evicted txid. let mempool_event = emitter.mempool()?; - assert!(mempool_event.evicted_txids.contains(&txid_1)); + assert!(mempool_event + .evicted + .iter() + .any(|(txid, _)| txid == &txid_1)); // Update graph with evicted tx. - let _ = graph.batch_insert_relevant_evicted_at(mempool_event.evicted_ats()); + let _ = graph.batch_insert_relevant_evicted_at(mempool_event.evicted); let canonical_txids = graph .graph() @@ -888,11 +678,11 @@ fn detect_new_mempool_txs() -> anyhow::Result<()> { NO_EXPECTED_MEMPOOL_TXIDS, ); - while let Some(_) = emitter.next_block()? {} + while emitter.next_block()?.is_some() {} for n in 0..5 { let txid = env.send(&addr, Amount::ONE_BTC)?; - let new_txs = emitter.mempool()?.new_txs; + let new_txs = emitter.mempool()?.update; assert!( new_txs.iter().any(|(tx, _)| tx.compute_txid() == txid), "must detect new tx {n}" diff --git a/examples/example_bitcoind_rpc_polling/src/main.rs b/examples/example_bitcoind_rpc_polling/src/main.rs index 51e5bc23b..cb710151a 100644 --- a/examples/example_bitcoind_rpc_polling/src/main.rs +++ b/examples/example_bitcoind_rpc_polling/src/main.rs @@ -217,7 +217,7 @@ fn main() -> anyhow::Result<()> { let graph_changeset = graph .lock() .unwrap() - .batch_insert_relevant_unconfirmed(mempool_txs.new_txs); + .batch_insert_relevant_unconfirmed(mempool_txs.update); { let db = &mut *db.lock().unwrap(); db_stage.merge(ChangeSet { @@ -315,10 +315,9 @@ fn main() -> anyhow::Result<()> { } Emission::Mempool(mempool_txs) => { let mut graph_changeset = - graph.batch_insert_relevant_unconfirmed(mempool_txs.new_txs.clone()); - graph_changeset.merge( - graph.batch_insert_relevant_evicted_at(mempool_txs.evicted_ats()), - ); + graph.batch_insert_relevant_unconfirmed(mempool_txs.update.clone()); + graph_changeset + .merge(graph.batch_insert_relevant_evicted_at(mempool_txs.evicted)); (local_chain::ChangeSet::default(), graph_changeset) } Emission::Tip(h) => { From 7e894f464ffc0fc21a05e485eae574292e8a63d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 10 Jul 2025 02:13:09 +0000 Subject: [PATCH 3/5] feat(bitcoind_rpc)!: Use `getrawmempool` without verbose This is the more performant method. The downside is that mempool txids are not returned to the seen-in-mempool timestamps so the caller either needs to provide this, or we need to rely on std to get the current timestamp. * `Emitter::mempool` method now requires the `std` feature. A non-std version of this is added: `Emitter::mempool_at` which takes in a `sync_time` parameter. Additional changes: * `NO_EXPECTED_MEMPOOL_TXIDS` is renamed to `NO_EXPECTED_MEMPOOL_TXS`. * Updated documentation. * Fix imports so that `bdk_bitcoind_rpc` compiles without `std`. --- crates/bitcoind_rpc/src/lib.rs | 61 ++++++++++++----------- crates/bitcoind_rpc/tests/test_emitter.rs | 16 +++--- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index d72c22c7a..ac0d190ca 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -9,14 +9,16 @@ //! separate method, [`Emitter::mempool`] can be used to emit the whole mempool. #![warn(missing_docs)] +#[allow(unused_imports)] +#[macro_use] +extern crate alloc; + +use alloc::sync::Arc; +use bdk_core::collections::{HashMap, HashSet}; use bdk_core::{BlockId, CheckPoint}; use bitcoin::{Block, BlockHash, Transaction, Txid}; use bitcoincore_rpc::{bitcoincore_rpc_json, RpcApi}; -use std::{ - collections::{HashMap, HashSet}, - ops::Deref, - sync::Arc, -}; +use core::ops::Deref; pub mod bip158; @@ -53,11 +55,11 @@ pub struct Emitter { mempool_snapshot: HashMap>, } -/// Indicates that there are no initially expected mempool transactions. +/// Indicates that there are no initially-expected mempool transactions. /// -/// Pass this to the `expected_mempool_txids` field of [`Emitter::new`] when the wallet is known +/// Use this as the `expected_mempool_txs` field of [`Emitter::new`] when the wallet is known /// to start empty (i.e. with no unconfirmed transactions). -pub const NO_EXPECTED_MEMPOOL_TXIDS: core::iter::Empty> = core::iter::empty(); +pub const NO_EXPECTED_MEMPOOL_TXS: core::iter::Empty> = core::iter::empty(); impl Emitter where @@ -74,7 +76,7 @@ where /// /// `expected_mempool_txs` is the initial set of unconfirmed transactions provided by the /// wallet. This allows the [`Emitter`] to inform the wallet about relevant mempool evictions. - /// If it is known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXIDS`] can be used. + /// If it is known that the wallet is empty, [`NO_EXPECTED_MEMPOOL_TXS`] can be used. pub fn new( client: C, last_cp: CheckPoint, @@ -99,21 +101,26 @@ where /// Emit mempool transactions and any evicted [`Txid`]s. /// /// This method returns a [`MempoolEvent`] containing the full transactions (with their - /// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted_txids`] which are + /// first-seen unix timestamps) that were emitted, and [`MempoolEvent::evicted`] which are /// any [`Txid`]s which were previously seen in the mempool and are now missing. Evicted txids /// are only reported once the emitter’s checkpoint matches the RPC’s best block in both height /// and hash. Until `next_block()` advances the checkpoint to tip, `mempool()` will always - /// return an empty `evicted_txids` set. + /// return an empty `evicted` set. + #[cfg(feature = "std")] + pub fn mempool(&mut self) -> Result { + let sync_time = std::time::UNIX_EPOCH + .elapsed() + .expect("must get current time") + .as_secs(); + self.mempool_at(sync_time) + } + + /// Emit mempool transactions and any evicted [`Txid`]s at the given `sync_time`. /// - /// This method emits each transaction only once, unless we cannot guarantee the transaction's - /// ancestors are already emitted. + /// `sync_time` is in unix seconds. /// - /// To understand why, consider a receiver which filters transactions based on whether it - /// alters the UTXO set of tracked script pubkeys. If an emitted mempool transaction spends a - /// tracked UTXO which is confirmed at height `h`, but the receiver has only seen up to block - /// of height `h-1`, we want to re-emit this transaction until the receiver has seen the block - /// at height `h`. - pub fn mempool(&mut self) -> Result { + /// This is the no-std version of [`mempool`](Self::mempool). + pub fn mempool_at(&mut self, sync_time: u64) -> Result { let client = &*self.client; let mut rpc_tip_height; @@ -125,8 +132,8 @@ where loop { rpc_tip_height = client.get_block_count()?; rpc_tip_hash = client.get_block_hash(rpc_tip_height)?; - rpc_mempool = client.get_raw_mempool_verbose()?; - rpc_mempool_txids = rpc_mempool.keys().copied().collect::>(); + rpc_mempool = client.get_raw_mempool()?; + rpc_mempool_txids = rpc_mempool.iter().copied().collect::>(); let is_still_at_tip = rpc_tip_hash == client.get_block_hash(rpc_tip_height)? && rpc_tip_height == client.get_block_count()?; if is_still_at_tip { @@ -135,13 +142,11 @@ where } let mut mempool_event = MempoolEvent::default(); - let update_time = &mut 0_u64; mempool_event.update = rpc_mempool .into_iter() .filter_map({ - |(txid, tx_entry)| -> Option> { - *update_time = u64::max(*update_time, tx_entry.time); + |txid| -> Option> { let tx = match self.mempool_snapshot.get(&txid) { Some(tx) => tx.clone(), None => match client.get_raw_transaction(&txid, None) { @@ -154,7 +159,7 @@ where Err(err) => return Some(Err(err)), }, }; - Some(Ok((tx, tx_entry.time))) + Some(Ok((tx, sync_time))) } }) .collect::, _>>()?; @@ -171,7 +176,7 @@ where .mempool_snapshot .keys() .filter(|&txid| !rpc_mempool_txids.contains(txid)) - .map(|&txid| (txid, *update_time)) + .map(|&txid| (txid, sync_time)) .collect(); self.mempool_snapshot = mempool_event .update @@ -396,7 +401,7 @@ impl BitcoindRpcErrorExt for bitcoincore_rpc::Error { #[cfg(test)] mod test { - use crate::{bitcoincore_rpc::RpcApi, Emitter, NO_EXPECTED_MEMPOOL_TXIDS}; + use crate::{bitcoincore_rpc::RpcApi, Emitter, NO_EXPECTED_MEMPOOL_TXS}; use bdk_chain::local_chain::LocalChain; use bdk_testenv::{anyhow, TestEnv}; use bitcoin::{hashes::Hash, Address, Amount, ScriptBuf, Txid, WScriptHash}; @@ -411,7 +416,7 @@ mod test { env.rpc_client(), chain_tip.clone(), 1, - NO_EXPECTED_MEMPOOL_TXIDS, + NO_EXPECTED_MEMPOOL_TXS, ); env.mine_blocks(100, None)?; diff --git a/crates/bitcoind_rpc/tests/test_emitter.rs b/crates/bitcoind_rpc/tests/test_emitter.rs index 0ccb7fe61..9df3b404c 100644 --- a/crates/bitcoind_rpc/tests/test_emitter.rs +++ b/crates/bitcoind_rpc/tests/test_emitter.rs @@ -1,6 +1,6 @@ use std::{collections::BTreeSet, ops::Deref}; -use bdk_bitcoind_rpc::{Emitter, NO_EXPECTED_MEMPOOL_TXIDS}; +use bdk_bitcoind_rpc::{Emitter, NO_EXPECTED_MEMPOOL_TXS}; use bdk_chain::{ bitcoin::{Address, Amount, Txid}, local_chain::{CheckPoint, LocalChain}, @@ -26,7 +26,7 @@ pub fn test_sync_local_chain() -> anyhow::Result<()> { env.rpc_client(), local_chain.tip(), 0, - NO_EXPECTED_MEMPOOL_TXIDS, + NO_EXPECTED_MEMPOOL_TXS, ); // Mine some blocks and return the actual block hashes. @@ -161,7 +161,7 @@ fn test_into_tx_graph() -> anyhow::Result<()> { index }); - let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, NO_EXPECTED_MEMPOOL_TXIDS); + let emitter = &mut Emitter::new(env.rpc_client(), chain.tip(), 0, NO_EXPECTED_MEMPOOL_TXS); while let Some(emission) = emitter.next_block()? { let height = emission.block_height(); @@ -257,7 +257,7 @@ fn ensure_block_emitted_after_reorg_is_at_reorg_height() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), EMITTER_START_HEIGHT as _, - NO_EXPECTED_MEMPOOL_TXIDS, + NO_EXPECTED_MEMPOOL_TXS, ); env.mine_blocks(CHAIN_TIP_HEIGHT, None)?; @@ -339,7 +339,7 @@ fn tx_can_become_unconfirmed_after_reorg() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), 0, - NO_EXPECTED_MEMPOOL_TXIDS, + NO_EXPECTED_MEMPOOL_TXS, ); // setup addresses @@ -430,7 +430,7 @@ fn mempool_avoids_re_emission() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), 0, - NO_EXPECTED_MEMPOOL_TXIDS, + NO_EXPECTED_MEMPOOL_TXS, ); // mine blocks and sync up emitter @@ -503,7 +503,7 @@ fn no_agreement_point() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), (PREMINE_COUNT - 2) as u32, - NO_EXPECTED_MEMPOOL_TXIDS, + NO_EXPECTED_MEMPOOL_TXS, ); // mine 101 blocks @@ -675,7 +675,7 @@ fn detect_new_mempool_txs() -> anyhow::Result<()> { hash: env.rpc_client().get_block_hash(0)?, }), 0, - NO_EXPECTED_MEMPOOL_TXIDS, + NO_EXPECTED_MEMPOOL_TXS, ); while emitter.next_block()?.is_some() {} From 73ab1eb06bcfaa0368a07b84a054bdd2f4f84f06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BF=97=E5=AE=87?= Date: Thu, 10 Jul 2025 03:04:24 +0000 Subject: [PATCH 4/5] chore(bitcoind_rpc): Make clippy happy --- crates/bitcoind_rpc/src/lib.rs | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index ac0d190ca..2c2ca35a8 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -141,12 +141,10 @@ where } } - let mut mempool_event = MempoolEvent::default(); - - mempool_event.update = rpc_mempool - .into_iter() - .filter_map({ - |txid| -> Option> { + let mut mempool_event = MempoolEvent { + update: rpc_mempool + .into_iter() + .filter_map(|txid| -> Option> { let tx = match self.mempool_snapshot.get(&txid) { Some(tx) => tx.clone(), None => match client.get_raw_transaction(&txid, None) { @@ -160,9 +158,10 @@ where }, }; Some(Ok((tx, sync_time))) - } - }) - .collect::, _>>()?; + }) + .collect::, _>>()?, + ..Default::default() + }; let at_tip = rpc_tip_height == self.last_cp.height() as u64 && rpc_tip_hash == self.last_cp.hash(); From 51ee99a39fbf9a6dc0ead964e794ca3bebd43ed0 Mon Sep 17 00:00:00 2001 From: Wei Chen Date: Mon, 21 Jul 2025 07:27:22 +0000 Subject: [PATCH 5/5] docs(bitcoind_rpc): fixed typo in docs --- crates/bitcoind_rpc/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/bitcoind_rpc/src/lib.rs b/crates/bitcoind_rpc/src/lib.rs index 2c2ca35a8..f516be6ca 100644 --- a/crates/bitcoind_rpc/src/lib.rs +++ b/crates/bitcoind_rpc/src/lib.rs @@ -168,7 +168,7 @@ where if at_tip { // We only emit evicted transactions when we have already emitted the RPC tip. This is - // because we cannot differenciate between transactions that are confirmed and + // because we cannot differentiate between transactions that are confirmed and // transactions that are evicted, so we rely on emitted blocks to remove // transactions from the `mempool_snapshot`. mempool_event.evicted = self