Skip to content
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/ntx-builder/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ workspace = true
[dependencies]
anyhow = { workspace = true }
futures = { workspace = true }
indexmap = { workspace = true }
lru = { workspace = true }
miden-node-proto = { workspace = true }
miden-node-utils = { workspace = true }
Expand Down
28 changes: 15 additions & 13 deletions crates/ntx-builder/src/actor/account_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ impl NetworkAccountState {

/// Updates state with the mempool event.
#[instrument(target = COMPONENT, name = "ntx.state.mempool_update", skip_all)]
pub fn mempool_update(&mut self, update: MempoolEvent) -> Option<ActorShutdownReason> {
pub fn mempool_update(&mut self, update: &MempoolEvent) -> Option<ActorShutdownReason> {
let span = tracing::Span::current();
span.set_attribute("mempool_event.kind", update.kind());

Expand All @@ -165,21 +165,23 @@ impl NetworkAccountState {
account_delta,
} => {
// Filter network notes relevant to this account.
let network_notes =
filter_by_prefix_and_map_to_single_target(self.account_prefix, network_notes);
self.add_transaction(id, nullifiers, network_notes, account_delta);
let network_notes = filter_by_prefix_and_map_to_single_target(
self.account_prefix,
network_notes.clone(),
);
self.add_transaction(*id, nullifiers, &network_notes, account_delta.as_ref());
},
MempoolEvent::TransactionsReverted(txs) => {
for tx in txs {
let shutdown_reason = self.revert_transaction(tx);
let shutdown_reason = self.revert_transaction(*tx);
if shutdown_reason.is_some() {
return shutdown_reason;
}
}
},
MempoolEvent::BlockCommitted { txs, .. } => {
for tx in txs {
self.commit_transaction(tx);
self.commit_transaction(*tx);
}
},
}
Expand All @@ -193,9 +195,9 @@ impl NetworkAccountState {
fn add_transaction(
&mut self,
id: TransactionId,
nullifiers: Vec<Nullifier>,
network_notes: Vec<SingleTargetNetworkNote>,
account_delta: Option<AccountUpdateDetails>,
nullifiers: &[Nullifier],
network_notes: &[SingleTargetNetworkNote],
account_delta: Option<&AccountUpdateDetails>,
) {
// Skip transactions we already know about.
//
Expand Down Expand Up @@ -226,16 +228,16 @@ impl NetworkAccountState {
);
tx_impact.notes.insert(note.nullifier());
self.nullifier_idx.insert(note.nullifier());
self.account.add_note(note);
self.account.add_note(note.clone());
}
for nullifier in nullifiers {
// Ignore nullifiers that aren't network note nullifiers.
if !self.nullifier_idx.contains(&nullifier) {
if !self.nullifier_idx.contains(nullifier) {
continue;
}
tx_impact.nullifiers.insert(nullifier);
tx_impact.nullifiers.insert(*nullifier);
// We don't use the entry wrapper here because the account must already exist.
let _ = self.account.add_nullifier(nullifier);
let _ = self.account.add_nullifier(*nullifier);
}

if !tx_impact.is_empty() {
Expand Down
10 changes: 5 additions & 5 deletions crates/ntx-builder/src/actor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ pub struct AccountActor {
origin: AccountOrigin,
store: StoreClient,
mode: ActorMode,
event_rx: mpsc::Receiver<MempoolEvent>,
event_rx: mpsc::Receiver<Arc<MempoolEvent>>,
cancel_token: CancellationToken,
block_producer: BlockProducerClient,
prover: Option<RemoteTransactionProver>,
Expand All @@ -158,7 +158,7 @@ impl AccountActor {
pub fn new(
origin: AccountOrigin,
config: &AccountActorConfig,
event_rx: mpsc::Receiver<MempoolEvent>,
event_rx: mpsc::Receiver<Arc<MempoolEvent>>,
cancel_token: CancellationToken,
) -> Self {
let block_producer = BlockProducerClient::new(config.block_producer_url.clone());
Expand Down Expand Up @@ -217,8 +217,8 @@ impl AccountActor {
};
// Re-enable transaction execution if the transaction being waited on has been
// added to the mempool.
if let ActorMode::TransactionInflight(ref awaited_id) = self.mode {
if let MempoolEvent::TransactionAdded { id, .. } = &event {
if let ActorMode::TransactionInflight(awaited_id) = self.mode {
if let MempoolEvent::TransactionAdded { id, .. } = *event {
if id == awaited_id {
self.mode = ActorMode::NotesAvailable;
}
Expand All @@ -227,7 +227,7 @@ impl AccountActor {
self.mode = ActorMode::NotesAvailable;
}
// Update state.
if let Some(shutdown_reason) = state.mempool_update(event) {
if let Some(shutdown_reason) = state.mempool_update(event.as_ref()) {
return shutdown_reason;
}
},
Expand Down
6 changes: 3 additions & 3 deletions crates/ntx-builder/src/actor/note_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,16 +197,16 @@ pub enum NetworkAccountEffect {
}

impl NetworkAccountEffect {
pub fn from_protocol(update: AccountUpdateDetails) -> Option<Self> {
pub fn from_protocol(update: &AccountUpdateDetails) -> Option<Self> {
let update = match update {
AccountUpdateDetails::Private => return None,
AccountUpdateDetails::Delta(update) if update.is_full_state() => {
NetworkAccountEffect::Created(
Account::try_from(&update)
Account::try_from(update)
.expect("Account should be derivable by full state AccountDelta"),
)
},
AccountUpdateDetails::Delta(update) => NetworkAccountEffect::Updated(update),
AccountUpdateDetails::Delta(update) => NetworkAccountEffect::Updated(update.clone()),
};

update.account_id().is_network().then_some(update)
Expand Down
48 changes: 32 additions & 16 deletions crates/ntx-builder/src/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,9 @@ impl NetworkTransactionBuilder {
let account_ids = store.get_network_account_ids().await?;
for account_id in account_ids {
if let Ok(account_prefix) = NetworkAccountPrefix::try_from(account_id) {
self.coordinator.spawn_actor(AccountOrigin::store(account_prefix), &config);
self.coordinator
.spawn_actor(AccountOrigin::store(account_prefix), &config)
.await?;
}
}

Expand All @@ -157,10 +159,10 @@ impl NetworkTransactionBuilder {
.context("mempool event stream failed")?;

self.handle_mempool_event(
event,
event.into(),
&config,
chain_state.clone(),
).await;
).await?;
},
}
}
Expand All @@ -174,35 +176,49 @@ impl NetworkTransactionBuilder {
)]
async fn handle_mempool_event(
&mut self,
event: MempoolEvent,
event: Arc<MempoolEvent>,
account_actor_config: &AccountActorConfig,
chain_state: Arc<RwLock<ChainState>>,
) {
match &event {
) -> Result<(), anyhow::Error> {
match event.as_ref() {
MempoolEvent::TransactionAdded { account_delta, .. } => {
// Handle account deltas in case an account is being created.
if let Some(AccountUpdateDetails::Delta(delta)) = account_delta {
// Handle network accounts only.
// Handle account deltas for network accounts only.
if let Some(network_account) = AccountOrigin::transaction(delta) {
// Spawn new actors if a transaction creates a new network account
let is_creating_account = delta.is_full_state();
if is_creating_account {
self.coordinator.spawn_actor(network_account, account_actor_config);
} else {
self.coordinator.broadcast_event(&event).await;
self.coordinator
.spawn_actor(network_account, account_actor_config)
.await?;
}
}
} else {
self.coordinator.broadcast_event(&event).await;
}
self.coordinator.send_targeted(&event).await?;
Ok(())
},
// Update chain state and broadcast.
MempoolEvent::BlockCommitted { header, .. } => {
MempoolEvent::BlockCommitted { header, txs } => {
self.update_chain_tip(header.clone(), chain_state).await;
self.coordinator.broadcast_event(&event).await;
self.coordinator.broadcast(event.clone()).await;

// All transactions pertaining to predating events should now be available through
// the store. So we can now drain them.
for tx_id in txs {
self.coordinator.drain_predating_events(tx_id);
}
Ok(())
},
// Broadcast to all actors.
MempoolEvent::TransactionsReverted(_) => {
self.coordinator.broadcast_event(&event).await;
MempoolEvent::TransactionsReverted(txs) => {
self.coordinator.broadcast(event.clone()).await;

// Reverted predating transactions need not be processed.
for tx_id in txs {
self.coordinator.drain_predating_events(tx_id);
}
Ok(())
},
}
}
Expand Down
84 changes: 75 additions & 9 deletions crates/ntx-builder/src/coordinator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ use std::collections::HashMap;
use std::sync::Arc;

use anyhow::Context;
use indexmap::IndexMap;
use miden_node_proto::domain::account::NetworkAccountPrefix;
use miden_node_proto::domain::mempool::MempoolEvent;
use miden_node_proto::domain::note::NetworkNote;
use miden_objects::transaction::TransactionId;
use tokio::sync::mpsc::error::SendError;
use tokio::sync::{Semaphore, mpsc};
use tokio::task::JoinSet;
Expand All @@ -17,12 +20,12 @@ use crate::actor::{AccountActor, AccountActorConfig, AccountOrigin, ActorShutdow
/// Handle to account actors that are spawned by the coordinator.
#[derive(Clone)]
struct ActorHandle {
event_tx: mpsc::Sender<MempoolEvent>, // TODO: consider Arc<MempoolEvent>.
event_tx: mpsc::Sender<Arc<MempoolEvent>>,
cancel_token: CancellationToken,
}

impl ActorHandle {
fn new(event_tx: mpsc::Sender<MempoolEvent>, cancel_token: CancellationToken) -> Self {
fn new(event_tx: mpsc::Sender<Arc<MempoolEvent>>, cancel_token: CancellationToken) -> Self {
Self { event_tx, cancel_token }
}
}
Expand Down Expand Up @@ -82,6 +85,10 @@ pub struct Coordinator {
/// Each actor must acquire a permit from this semaphore before processing a transaction,
/// ensuring fair resource allocation and system stability under load.
semaphore: Arc<Semaphore>,

/// Cache of events received from the mempool that predate corresponding network accounts.
/// Grouped by account prefix to allow targeted event delivery to actors upon creation.
predating_events: HashMap<NetworkAccountPrefix, IndexMap<TransactionId, Arc<MempoolEvent>>>,
}

impl Coordinator {
Expand All @@ -94,6 +101,7 @@ impl Coordinator {
actor_registry: HashMap::new(),
actor_join_set: JoinSet::new(),
semaphore: Arc::new(Semaphore::new(max_inflight_transactions)),
predating_events: HashMap::new(),
}
}

Expand All @@ -103,7 +111,11 @@ impl Coordinator {
/// and adds it to the coordinator's management system. The actor will be responsible for
/// processing transactions and managing state for accounts matching the network prefix.
#[tracing::instrument(name = "ntx.builder.spawn_actor", skip(self, origin, config))]
pub fn spawn_actor(&mut self, origin: AccountOrigin, config: &AccountActorConfig) {
pub async fn spawn_actor(
&mut self,
origin: AccountOrigin,
config: &AccountActorConfig,
) -> Result<(), SendError<Arc<MempoolEvent>>> {
let account_prefix = origin.prefix();

// If an actor already exists for this account prefix, something has gone wrong.
Expand All @@ -117,13 +129,21 @@ impl Coordinator {
let cancel_token = tokio_util::sync::CancellationToken::new();
let actor = AccountActor::new(origin, config, event_rx, cancel_token.clone());
let handle = ActorHandle::new(event_tx, cancel_token);
self.actor_registry.insert(account_prefix, handle);

// Run the actor.
let semaphore = self.semaphore.clone();
self.actor_join_set.spawn(Box::pin(actor.run(semaphore)));

// Send the new actor any events that contain notes that predate account creation.
if let Some(prefix_events) = self.predating_events.remove(&account_prefix) {
for event in prefix_events.values() {
Self::send(&handle, event.clone()).await?;
}
}

self.actor_registry.insert(account_prefix, handle);
tracing::info!("created actor for account prefix: {}", account_prefix);
Ok(())
}

/// Broadcasts a mempool event to all active account actors.
Expand All @@ -133,7 +153,7 @@ impl Coordinator {
/// message channel and can process it accordingly.
///
/// If an actor fails to receive the event, it will be canceled.
pub async fn broadcast_event(&mut self, event: &MempoolEvent) {
pub async fn broadcast(&mut self, event: Arc<MempoolEvent>) {
tracing::debug!(
actor_count = self.actor_registry.len(),
"broadcasting event to all actors"
Expand All @@ -143,7 +163,7 @@ impl Coordinator {

// Send event to all actors.
for (account_prefix, handle) in &self.actor_registry {
if let Err(err) = Self::send(handle, event).await {
if let Err(err) = Self::send(handle, event.clone()).await {
tracing::error!("failed to send event to actor {}: {}", account_prefix, err);
failed_actors.push(*account_prefix);
}
Expand Down Expand Up @@ -195,11 +215,57 @@ impl Coordinator {
}
}

/// Sends a mempool event to all network account actors that are found in the corresponding
/// transaction's notes.
///
/// Caches the mempool event for each network account found in the transaction's notes that does
/// not currently have a corresponding actor. If an actor does not exist for the account, it is
/// assumed that the account has not been created on the chain yet.
///
/// Cached events will be fed to the corresponding actor when the account creation transaction
/// is processed.
pub async fn send_targeted(
&mut self,
event: &Arc<MempoolEvent>,
) -> Result<(), SendError<Arc<MempoolEvent>>> {
let mut target_actors = HashMap::new();
if let MempoolEvent::TransactionAdded { id, network_notes, .. } = event.as_ref() {
// Determine target actors for each note.
for note in network_notes {
if let NetworkNote::SingleTarget(note) = note {
let prefix = note.account_prefix();
if let Some(actor) = self.actor_registry.get(&prefix) {
// Register actor as target.
target_actors.insert(prefix, actor);
} else {
// Cache event for every note that doesn't have a corresponding actor.
self.predating_events.entry(prefix).or_default().insert(*id, event.clone());
}
}
}
}
// Send event to target actors.
for actor in target_actors.values() {
Self::send(actor, event.clone()).await?;
}
Ok(())
}

/// Removes any cached events for a given transaction ID from all account prefix caches.
pub fn drain_predating_events(&mut self, tx_id: &TransactionId) {
// Remove the transaction from all prefix caches.
self.predating_events.retain(|_, prefix_event| {
prefix_event.shift_remove(tx_id);
// Remove entries for account prefixes with no more cached events.
!prefix_event.is_empty()
});
}

/// Helper function to send an event to a single account actor.
async fn send(
handle: &ActorHandle,
event: &MempoolEvent,
) -> Result<(), SendError<MempoolEvent>> {
handle.event_tx.send(event.clone()).await
event: Arc<MempoolEvent>,
) -> Result<(), SendError<Arc<MempoolEvent>>> {
handle.event_tx.send(event).await
}
}
Loading