diff --git a/crates/flashblocks-rpc/src/subscription.rs b/crates/flashblocks-rpc/src/subscription.rs index 750c9b0..b29ea8e 100644 --- a/crates/flashblocks-rpc/src/subscription.rs +++ b/crates/flashblocks-rpc/src/subscription.rs @@ -1,3 +1,4 @@ +use std::time::{SystemTime, UNIX_EPOCH}; use std::{io::Read, sync::Arc, time::Duration}; use alloy_primitives::map::foldhash::HashMap; @@ -12,7 +13,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::mpsc; use tokio::time::interval; use tokio_tungstenite::{connect_async, tungstenite::protocol::Message}; -use tracing::{error, info, trace, warn}; +use tracing::{debug, error, info, trace, warn}; use url::Url; use crate::metrics::Metrics; @@ -122,7 +123,18 @@ where ?data, "Received pong from upstream" ); - awaiting_pong_resp = false + awaiting_pong_resp = false; + if let Some(rtt) = rtt_from_pong(data.as_ref()) { + debug!( + message= "Received pong from upstream flashblocks", + rtt = ?rtt, + ); + }else { + debug!( + message = "Received UNEXPECTED pong from upstream flashblocks", + data=?data + ); + } } Err(e) => { metrics.upstream_errors.increment(1); @@ -152,7 +164,7 @@ where "Sending ping to upstream" ); - if let Err(error) = write.send(Message::Ping(Default::default())).await { + if let Err(error) = write.send(ping_with_timestamp()).await { warn!( target: "flashblocks_rpc::subscription", ?backoff, @@ -242,3 +254,25 @@ fn try_parse_message(bytes: &[u8]) -> eyre::Result { let text = String::from_utf8(decompressed)?; Ok(text) } + +fn ping_with_timestamp() -> Message { + let now_us: u64 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_micros() as u64; + + Message::Ping(now_us.to_be_bytes().to_vec().into()) +} + +fn rtt_from_pong(data: &[u8]) -> Option { + let arr: [u8; 8] = data.try_into().ok()?; + let sent_us = u64::from_be_bytes(arr); + + let now_us: u64 = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap_or(Duration::ZERO) + .as_micros() as u64; + + let delta_us = now_us.saturating_sub(sent_us); + Some(Duration::from_micros(delta_us)) +}