-
Notifications
You must be signed in to change notification settings - Fork 81
feat: NTX Actor support for notes that predate account creation #1324
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 12 commits
928492d
dad21ad
d0481b5
b302198
2cf78a2
969a385
6059e89
48407c6
49b115a
a957335
a7d0171
cba59ab
234b369
99b348a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2,8 +2,11 @@ use std::collections::HashMap; | |
| use std::sync::Arc; | ||
|
|
||
| use anyhow::Context; | ||
| use indexmap::IndexMap; | ||
| use miden_node_proto::domain::account::NetworkAccountPrefix; | ||
| use miden_node_proto::domain::mempool::MempoolEvent; | ||
| use miden_node_proto::domain::note::NetworkNote; | ||
| use miden_objects::transaction::TransactionId; | ||
| use tokio::sync::mpsc::error::SendError; | ||
| use tokio::sync::{Semaphore, mpsc}; | ||
| use tokio::task::JoinSet; | ||
|
|
@@ -82,6 +85,10 @@ pub struct Coordinator { | |
| /// Each actor must acquire a permit from this semaphore before processing a transaction, | ||
| /// ensuring fair resource allocation and system stability under load. | ||
| semaphore: Arc<Semaphore>, | ||
|
|
||
| /// Cache of events received from the mempool that predate corresponding network accounts. | ||
| /// Grouped by account prefix to allow targeted event delivery to actors upon creation. | ||
| predating_events: HashMap<NetworkAccountPrefix, IndexMap<TransactionId, Arc<MempoolEvent>>>, | ||
| } | ||
|
|
||
| impl Coordinator { | ||
|
|
@@ -94,6 +101,7 @@ impl Coordinator { | |
| actor_registry: HashMap::new(), | ||
| actor_join_set: JoinSet::new(), | ||
| semaphore: Arc::new(Semaphore::new(max_inflight_transactions)), | ||
| predating_events: HashMap::new(), | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -103,7 +111,11 @@ impl Coordinator { | |
| /// and adds it to the coordinator's management system. The actor will be responsible for | ||
| /// processing transactions and managing state for accounts matching the network prefix. | ||
| #[tracing::instrument(name = "ntx.builder.spawn_actor", skip(self, origin, config))] | ||
| pub fn spawn_actor(&mut self, origin: AccountOrigin, config: &AccountActorConfig) { | ||
| pub async fn spawn_actor( | ||
| &mut self, | ||
| origin: AccountOrigin, | ||
| config: &AccountActorConfig, | ||
| ) -> Result<(), SendError<MempoolEvent>> { | ||
| let account_prefix = origin.prefix(); | ||
|
|
||
| // If an actor already exists for this account prefix, something has gone wrong. | ||
|
|
@@ -117,13 +129,21 @@ impl Coordinator { | |
| let cancel_token = tokio_util::sync::CancellationToken::new(); | ||
| let actor = AccountActor::new(origin, config, event_rx, cancel_token.clone()); | ||
| let handle = ActorHandle::new(event_tx, cancel_token); | ||
| self.actor_registry.insert(account_prefix, handle); | ||
|
|
||
| // Run the actor. | ||
| let semaphore = self.semaphore.clone(); | ||
| self.actor_join_set.spawn(Box::pin(actor.run(semaphore))); | ||
|
|
||
| // Send the new actor any events that contain notes that predate account creation. | ||
| if let Some(prefix_events) = self.predating_events.remove(&account_prefix) { | ||
| for event in prefix_events.values() { | ||
| Self::send(&handle, event.clone()).await?; | ||
| } | ||
| } | ||
|
|
||
| self.actor_registry.insert(account_prefix, handle); | ||
| tracing::info!("created actor for account prefix: {}", account_prefix); | ||
| Ok(()) | ||
| } | ||
|
|
||
| /// Broadcasts a mempool event to all active account actors. | ||
|
|
@@ -133,7 +153,7 @@ impl Coordinator { | |
| /// message channel and can process it accordingly. | ||
| /// | ||
| /// If an actor fails to receive the event, it will be canceled. | ||
| pub async fn broadcast_event(&mut self, event: &MempoolEvent) { | ||
| pub async fn broadcast(&mut self, event: Arc<MempoolEvent>) { | ||
| tracing::debug!( | ||
| actor_count = self.actor_registry.len(), | ||
| "broadcasting event to all actors" | ||
|
|
@@ -143,7 +163,7 @@ impl Coordinator { | |
|
|
||
| // Send event to all actors. | ||
| for (account_prefix, handle) in &self.actor_registry { | ||
| if let Err(err) = Self::send(handle, event).await { | ||
| if let Err(err) = Self::send(handle, event.clone()).await { | ||
| tracing::error!("failed to send event to actor {}: {}", account_prefix, err); | ||
| failed_actors.push(*account_prefix); | ||
| } | ||
|
|
@@ -195,11 +215,57 @@ impl Coordinator { | |
| } | ||
| } | ||
|
|
||
| /// Sends a mempool event to all network account actors that are found in the corresponding | ||
| /// transaction's notes. | ||
| /// | ||
| /// Caches the mempool event for each network account found in the transaction's notes that does | ||
| /// not currently have a corresponding actor. If an actor does not exist for the account, it is | ||
| /// assumed that the account has not been created on the chain yet. | ||
| /// | ||
| /// Cached events will be fed to the corresponding actor when the account creation transaction | ||
| /// is processed. | ||
| pub async fn send_targeted( | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: suffix |
||
| &mut self, | ||
| event: &Arc<MempoolEvent>, | ||
| ) -> Result<(), SendError<MempoolEvent>> { | ||
| let mut target_actors = HashMap::new(); | ||
| if let MempoolEvent::TransactionAdded { id, network_notes, .. } = event.as_ref() { | ||
| // Determine target actors for each note. | ||
| for note in network_notes { | ||
| if let NetworkNote::SingleTarget(note) = note { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am missing the
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, the network tx builder doesn't manage/handle multi target network notes. Those will one day be handled by something else since they can be consumed by.. well many things. |
||
| let prefix = note.account_prefix(); | ||
| if let Some(actor) = self.actor_registry.get(&prefix) { | ||
| // Register actor as target. | ||
| target_actors.insert(prefix, actor); | ||
| } else { | ||
| // Cache event for every note that doesn't have a corresponding actor. | ||
| self.predating_events.entry(prefix).or_default().insert(*id, event.clone()); | ||
| } | ||
| } | ||
| } | ||
| } | ||
| // Send event to target actors. | ||
| for actor in target_actors.values() { | ||
| Self::send(actor, event.clone()).await?; | ||
| } | ||
| Ok(()) | ||
| } | ||
sergerad marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| /// Removes any cached events for a given transaction ID from all account prefix caches. | ||
| pub fn drain_predating_events(&mut self, tx_id: &TransactionId) { | ||
| // Remove the transaction from all prefix caches. | ||
| self.predating_events.retain(|_, prefix_event| { | ||
| prefix_event.shift_remove(tx_id); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. JFYI: |
||
| // Remove entries for account prefixes with no more cached events. | ||
| !prefix_event.is_empty() | ||
| }); | ||
|
Comment on lines
+259
to
+263
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This would iterate over all accounts in the |
||
| } | ||
|
|
||
| /// Helper function to send an event to a single account actor. | ||
| async fn send( | ||
| handle: &ActorHandle, | ||
| event: &MempoolEvent, | ||
| event: Arc<MempoolEvent>, | ||
| ) -> Result<(), SendError<MempoolEvent>> { | ||
| handle.event_tx.send(event.clone()).await | ||
| handle.event_tx.send((*event).clone()).await | ||
sergerad marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was it a bug previously that we didn't broadcast the event for newly created actors?