@@ -61,7 +61,7 @@ use crate::{
6161 mcache:: MessageCache ,
6262 peer_score:: { PeerScore , PeerScoreParams , PeerScoreState , PeerScoreThresholds , RejectReason } ,
6363 protocol:: SIGNING_PREFIX ,
64- rpc :: Sender ,
64+ queue :: Queue ,
6565 rpc_proto:: proto,
6666 subscription_filter:: { AllowAllSubscriptionFilter , TopicSubscriptionFilter } ,
6767 time_cache:: DuplicateCache ,
@@ -751,6 +751,7 @@ where
751751 if self . send_message (
752752 * peer_id,
753753 RpcOut :: Publish {
754+ message_id : msg_id. clone ( ) ,
754755 message : raw_message. clone ( ) ,
755756 timeout : Delay :: new ( self . config . publish_queue_duration ( ) ) ,
756757 } ,
@@ -1341,6 +1342,7 @@ where
13411342 self . send_message (
13421343 * peer_id,
13431344 RpcOut :: Forward {
1345+ message_id : id. clone ( ) ,
13441346 message : msg,
13451347 timeout : Delay :: new ( self . config . forward_queue_duration ( ) ) ,
13461348 } ,
@@ -2081,9 +2083,9 @@ where
20812083 // steady-state size of the queues.
20822084 #[ cfg( feature = "metrics" ) ]
20832085 if let Some ( m) = & mut self . metrics {
2084- for sender_queue in self . connected_peers . values ( ) . map ( |v| & v. sender ) {
2085- m. observe_priority_queue_size ( sender_queue. priority_queue_len ( ) ) ;
2086- m. observe_non_priority_queue_size ( sender_queue. non_priority_queue_len ( ) ) ;
2086+ for sender_queue in self . connected_peers . values ( ) . map ( |v| & v. messages ) {
2087+ m. observe_priority_queue_size ( sender_queue. priority_len ( ) ) ;
2088+ m. observe_non_priority_queue_size ( sender_queue. non_priority_len ( ) ) ;
20872089 }
20882090 }
20892091
@@ -2499,6 +2501,11 @@ where
24992501 // Report expired messages
25002502 for ( peer_id, failed_messages) in self . failed_messages . drain ( ) {
25012503 tracing:: debug!( "Peer couldn't consume messages: {:?}" , failed_messages) ;
2504+ #[ cfg( feature = "metrics" ) ]
2505+ if let Some ( metrics) = self . metrics . as_mut ( ) {
2506+ metrics. observe_failed_priority_messages ( failed_messages. priority ) ;
2507+ metrics. observe_failed_non_priority_messages ( failed_messages. non_priority ) ;
2508+ }
25022509 self . events
25032510 . push_back ( ToSwarm :: GenerateEvent ( Event :: SlowPeer {
25042511 peer_id,
@@ -2746,6 +2753,7 @@ where
27462753 self . send_message (
27472754 * peer_id,
27482755 RpcOut :: Forward {
2756+ message_id : msg_id. clone ( ) ,
27492757 message : message. clone ( ) ,
27502758 timeout : Delay :: new ( self . config . forward_queue_duration ( ) ) ,
27512759 } ,
@@ -2874,33 +2882,20 @@ where
28742882 return false ;
28752883 }
28762884
2877- // Try sending the message to the connection handler.
2878- match peer. sender . send_message ( rpc) {
2885+ // Try sending the message to the connection handler,
2886+ // High priority messages should not fail.
2887+ match peer. messages . try_push ( rpc) {
28792888 Ok ( ( ) ) => true ,
28802889 Err ( rpc) => {
28812890 // Sending failed because the channel is full.
28822891 tracing:: warn!( peer=%peer_id, "Send Queue full. Could not send {:?}." , rpc) ;
28832892
28842893 // Update failed message counter.
28852894 let failed_messages = self . failed_messages . entry ( peer_id) . or_default ( ) ;
2886- match rpc {
2887- RpcOut :: Publish { .. } => {
2888- failed_messages. priority += 1 ;
2889- failed_messages. publish += 1 ;
2890- }
2891- RpcOut :: Forward { .. } => {
2892- failed_messages. non_priority += 1 ;
2893- failed_messages. forward += 1 ;
2894- }
2895- RpcOut :: IWant ( _) | RpcOut :: IHave ( _) | RpcOut :: IDontWant ( _) => {
2896- failed_messages. non_priority += 1 ;
2897- }
2898- RpcOut :: Graft ( _)
2899- | RpcOut :: Prune ( _)
2900- | RpcOut :: Subscribe ( _)
2901- | RpcOut :: Unsubscribe ( _) => {
2902- unreachable ! ( "Channel for highpriority control messages is unbounded and should always be open." )
2903- }
2895+ if rpc. priority ( ) {
2896+ failed_messages. priority += 1 ;
2897+ } else {
2898+ failed_messages. non_priority += 1 ;
29042899 }
29052900
29062901 // Update peer score.
@@ -3129,16 +3124,18 @@ where
31293124 kind : PeerKind :: Floodsub ,
31303125 connections : vec ! [ ] ,
31313126 outbound : false ,
3132- sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3127+ messages : Queue :: new ( self . config . connection_handler_queue_len ( ) ) ,
31333128 topics : Default :: default ( ) ,
31343129 dont_send : LinkedHashMap :: new ( ) ,
31353130 } ) ;
31363131 // Add the new connection
31373132 connected_peer. connections . push ( connection_id) ;
31383133
3134+ // This clones a reference to the Queue so any new handlers reference the same underlying
3135+ // queue. No data is actually cloned here.
31393136 Ok ( Handler :: new (
31403137 self . config . protocol_config ( ) ,
3141- connected_peer. sender . new_receiver ( ) ,
3138+ connected_peer. messages . clone ( ) ,
31423139 ) )
31433140 }
31443141
@@ -3156,16 +3153,18 @@ where
31563153 // Diverging from the go implementation we only want to consider a peer as outbound peer
31573154 // if its first connection is outbound.
31583155 outbound : !self . px_peers . contains ( & peer_id) ,
3159- sender : Sender :: new ( self . config . connection_handler_queue_len ( ) ) ,
3156+ messages : Queue :: new ( self . config . connection_handler_queue_len ( ) ) ,
31603157 topics : Default :: default ( ) ,
31613158 dont_send : LinkedHashMap :: new ( ) ,
31623159 } ) ;
31633160 // Add the new connection
31643161 connected_peer. connections . push ( connection_id) ;
31653162
3163+ // This clones a reference to the Queue so any new handlers reference the same underlying
3164+ // queue. No data is actually cloned here.
31663165 Ok ( Handler :: new (
31673166 self . config . protocol_config ( ) ,
3168- connected_peer. sender . new_receiver ( ) ,
3167+ connected_peer. messages . clone ( ) ,
31693168 ) )
31703169 }
31713170
@@ -3207,6 +3206,8 @@ where
32073206 }
32083207 }
32093208 }
3209+ // rpc is only used for metrics code.
3210+ #[ allow( unused_variables) ]
32103211 HandlerEvent :: MessageDropped ( rpc) => {
32113212 // Account for this in the scoring logic
32123213 if let PeerScoreState :: Active ( peer_score) = & mut self . peer_score {
@@ -3215,32 +3216,7 @@ where
32153216
32163217 // Keep track of expired messages for the application layer.
32173218 let failed_messages = self . failed_messages . entry ( propagation_source) . or_default ( ) ;
3218- failed_messages. timeout += 1 ;
3219- match rpc {
3220- RpcOut :: Publish { .. } => {
3221- failed_messages. publish += 1 ;
3222- }
3223- RpcOut :: Forward { .. } => {
3224- failed_messages. forward += 1 ;
3225- }
3226- _ => { }
3227- }
3228-
3229- // Record metrics on the failure.
3230- #[ cfg( feature = "metrics" ) ]
3231- if let Some ( metrics) = self . metrics . as_mut ( ) {
3232- match rpc {
3233- RpcOut :: Publish { message, .. } => {
3234- metrics. publish_msg_dropped ( & message. topic ) ;
3235- metrics. timeout_msg_dropped ( & message. topic ) ;
3236- }
3237- RpcOut :: Forward { message, .. } => {
3238- metrics. forward_msg_dropped ( & message. topic ) ;
3239- metrics. timeout_msg_dropped ( & message. topic ) ;
3240- }
3241- _ => { }
3242- }
3243- }
3219+ failed_messages. non_priority += 1 ;
32443220 }
32453221 HandlerEvent :: Message {
32463222 rpc,
@@ -3339,10 +3315,17 @@ where
33393315 "Could not handle IDONTWANT, peer doesn't exist in connected peer list" ) ;
33403316 continue ;
33413317 } ;
3318+
3319+ // Remove messages from the queue.
3320+ #[ allow( unused) ]
3321+ let removed = peer. messages . remove_data_messages ( & message_ids) ;
3322+
33423323 #[ cfg( feature = "metrics" ) ]
33433324 if let Some ( metrics) = self . metrics . as_mut ( ) {
33443325 metrics. register_idontwant ( message_ids. len ( ) ) ;
3326+ metrics. register_removed_messages ( removed) ;
33453327 }
3328+
33463329 for message_id in message_ids {
33473330 peer. dont_send . insert ( message_id, Instant :: now ( ) ) ;
33483331 // Don't exceed capacity.
0 commit comments