Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,540 changes: 1,200 additions & 340 deletions Cargo.lock

Large diffs are not rendered by default.

9 changes: 8 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ members = [
"crates/metering",
"crates/node",
"crates/test-utils",
"crates/transaction-status",
"crates/transaction-tracing",
]

Expand Down Expand Up @@ -44,10 +45,12 @@ base-reth-metering = { path = "crates/metering" }
base-reth-node = { path = "crates/node" }
base-reth-test-utils = { path = "crates/test-utils" }
base-reth-transaction-tracing = { path = "crates/transaction-tracing" }
base-reth-transaction-status = { path = "crates/transaction-status" }

# base/tips
# Note: default-features = false avoids version conflicts with reth's alloy/op-alloy dependencies
tips-core = { git = "https://github.com/base/tips", rev = "a21ee492dede17f31eea108c12c669a8190f31aa", default-features = false }
tips-audit = { git = "https://github.com/base/tips", rev = "a21ee492dede17f31eea108c12c669a8190f31aa", default-features = false }

# reth
reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.9.3" }
Expand Down Expand Up @@ -121,6 +124,10 @@ reqwest = { version = "0.12", features = ["json", "stream"] }
jsonrpsee = "0.26.0"
jsonrpsee-types = "0.26.0"

# transaction status
aws-config = "1.1.7"
aws-sdk-s3 = "1.106.0"

# misc
clap = { version = "4.4.3" }
tracing = { version = "0.1.41" }
Expand All @@ -131,7 +138,7 @@ metrics = "0.24.1"
metrics-derive = "0.1"
itertools = "0.14"
eyre = { version = "0.6.12" }
uuid = { version = "1.6.1", features = ["serde", "v5", "v4"] }
uuid = { version = "1.18.1", features = ["serde", "v5", "v4"] }
time = { version = "0.3.36", features = ["macros", "formatting", "parsing"] }
chrono = "0.4.41"
brotli = "8.0.1"
Expand Down
3 changes: 3 additions & 0 deletions crates/node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ workspace = true
base-reth-flashblocks-rpc.workspace = true
base-reth-metering.workspace = true
base-reth-transaction-tracing.workspace = true
base-reth-transaction-status.workspace = true

# reth
reth.workspace = true
Expand Down Expand Up @@ -86,6 +87,8 @@ uuid.workspace = true
time.workspace = true
chrono.workspace = true
once_cell.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true

[features]
default = []
Expand Down
46 changes: 46 additions & 0 deletions crates/node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,16 @@
use std::sync::Arc;

use aws_config::BehaviorVersion;
use aws_sdk_s3::Client as S3Client;
use base_reth_flashblocks_rpc::{
rpc::{EthApiExt, EthApiOverrideServer},
state::FlashblocksState,
subscription::FlashblocksSubscriber,
};
use base_reth_metering::{MeteringApiImpl, MeteringApiServer};
use base_reth_transaction_status::{
TransactionStatusApiImpl, TransactionStatusApiServer, TransactionStatusProxyImpl,
};
use base_reth_transaction_tracing::transaction_tracing_exex;
use clap::Parser;
use futures_util::TryStreamExt;
Expand Down Expand Up @@ -56,6 +61,20 @@ struct Args {
/// Enable metering RPC for transaction bundle simulation
#[arg(long = "enable-metering", value_name = "ENABLE_METERING")]
pub enable_metering: bool,

/// Enable transaction status RPC for transaction status lookup
#[arg(long = "enable-transaction-status", value_name = "ENABLE_TRANSACTION_STATUS")]
pub enable_transaction_status: bool,

/// S3 bucket for transaction status lookup
#[arg(long = "transaction-status-bucket", value_name = "TRANSACTION_STATUS_BUCKET")]
pub transaction_status_bucket: String,

/// Enable transaction status proxying to an external endpoint
/// Mainnet: https://mainnet.base.org
/// Sepolia: https://sepolia.base.org
#[arg(long = "transaction-status-proxy-url", value_name = "TRANSACTION_STATUS_PROXY_URL")]
pub transaction_status_proxy_url: Option<String>,
}

impl Args {
Expand Down Expand Up @@ -96,6 +115,10 @@ fn main() {

let fb_cell: Arc<OnceCell<Arc<FlashblocksState<_>>>> = Arc::new(OnceCell::new());

let transaction_status_enabled = args.enable_transaction_status;
let config = aws_config::load_defaults(BehaviorVersion::latest()).await;
let s3_client = S3Client::new(&config);

let NodeHandle { node: _node, node_exit_future } = builder
.with_types_and_provider::<OpNode, BlockchainProvider<_>>()
.with_components(op_node.components())
Expand Down Expand Up @@ -141,6 +164,29 @@ fn main() {
ctx.modules.merge_configured(metering_api.into_rpc())?;
}

if transaction_status_enabled {
info!(message = "Starting Transaction Status RPC");

// this is for external node users who will need to proxy requests to Base managed
// rpc nodes to get transaction status
if args.transaction_status_proxy_url.is_some() {
info!(message = "Transaction status proxying enabled");

let proxy_api = TransactionStatusProxyImpl::new(
args.transaction_status_proxy_url.clone().unwrap(),
)
.expect("Failed to create transaction status proxy");

ctx.modules.merge_configured(proxy_api.into_rpc())?;
} else {
let transaction_status_api = TransactionStatusApiImpl::new(
s3_client,
args.transaction_status_bucket.clone(),
);
ctx.modules.merge_configured(transaction_status_api.into_rpc())?;
}
}

if flashblocks_enabled {
info!(message = "Starting Flashblocks");

Expand Down
24 changes: 24 additions & 0 deletions crates/transaction-status/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "base-reth-transaction-status"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true

[dependencies]
tips-audit.workspace = true
jsonrpsee = { workspace = true, features = ["macros", "server", "http-client", "client"] }
tracing.workspace = true
aws-config.workspace = true
aws-sdk-s3.workspace = true
alloy-primitives.workspace = true
serde.workspace = true

[dev-dependencies]
tokio = { workspace = true, features = ["test-util"] }
uuid.workspace = true

[lints]
workspace = true
17 changes: 17 additions & 0 deletions crates/transaction-status/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
mod proxy;
mod rpc;

pub use proxy::TransactionStatusProxyImpl;
pub use rpc::{TransactionStatusApiImpl, TransactionStatusApiServer};
use serde::{Deserialize, Serialize};

#[derive(Clone, Serialize, Deserialize)]
pub enum Status {
Unknown,
Pending,
Queued,
BlockIncluded,
BuilderIncluded,
Cancelled,
Dropped,
}
100 changes: 100 additions & 0 deletions crates/transaction-status/src/proxy.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
use alloy_primitives::TxHash;
use jsonrpsee::{
core::{RpcResult, async_trait, client::ClientT},
http_client::{HttpClient, HttpClientBuilder},
rpc_params,
types::{ErrorCode, ErrorObjectOwned},
};
use tracing::{error, info};

use crate::{Status, rpc::TransactionStatusApiServer};

/// Proxy that forwards transaction status requests to an external endpoint
pub struct TransactionStatusProxyImpl {
proxy_client: HttpClient,
proxy_url: String,
}

impl TransactionStatusProxyImpl {
pub fn new(proxy_url: String) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
let proxy_client = HttpClientBuilder::default().build(&proxy_url)?;
info!(message = "initializing transaction status proxy client", url = %proxy_url);

Ok(Self { proxy_client, proxy_url })
}
}

#[async_trait]
impl TransactionStatusApiServer for TransactionStatusProxyImpl {
async fn transaction_status(&self, tx_hash: TxHash) -> RpcResult<Status> {
info!(message = "forwarding transaction status request to proxy", tx_hash = %tx_hash, proxy_url = %self.proxy_url);

match self
.proxy_client
.request::<Status, _>("base_transactionStatus", rpc_params![tx_hash])
.await
{
Ok(result) => {
info!(message = "successfully received response from proxy", tx_hash = %tx_hash);
Ok(result)
}
Err(e) => {
error!(message = "proxy request failed", tx_hash = %tx_hash, error = %e);
Err(ErrorObjectOwned::owned(
ErrorCode::InternalError.code(),
format!("Proxy request failed: {e}"),
None::<()>,
))
}
}
}
}

#[cfg(test)]
mod tests {
use std::time::Duration;

use alloy_primitives::TxHash;
use tokio::time::timeout;

use crate::{proxy::TransactionStatusProxyImpl, rpc::TransactionStatusApiServer};

#[tokio::test]
async fn test_proxy_creation_valid_url() {
let result = TransactionStatusProxyImpl::new("https://mainnet.base.org".to_string());
assert!(result.is_ok());
}

#[tokio::test]
async fn test_proxy_creation_invalid_url() {
let result = TransactionStatusProxyImpl::new("invalid-url".to_string());
assert!(result.is_err());
}

#[tokio::test]
async fn test_proxy_connection_failure() {
// Use a non-existent URL that's valid format but unreachable
let proxy = TransactionStatusProxyImpl::new("http://127.0.0.1:9999".to_string())
.expect("Failed to create proxy");

let tx_hash = TxHash::from([1u8; 32]);

// This should fail due to connection error
let result = proxy.transaction_status(tx_hash).await;
assert!(result.is_err());
}

#[tokio::test]
async fn test_proxy_timeout() {
let proxy = TransactionStatusProxyImpl::new("http://127.0.0.1:9999".to_string())
.expect("Failed to create proxy");

let tx_hash = TxHash::from([1u8; 32]);

// Test with timeout - should fail quickly
let result = timeout(Duration::from_millis(100), proxy.transaction_status(tx_hash)).await;

// Should timeout or return connection error
assert!(result.is_err() || result.unwrap().is_err());
}
}
90 changes: 90 additions & 0 deletions crates/transaction-status/src/rpc.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use alloy_primitives::TxHash;
use aws_sdk_s3::Client as S3Client;
use jsonrpsee::{
core::{RpcResult, async_trait},
proc_macros::rpc,
types::{ErrorCode, ErrorObjectOwned},
};
use tips_audit::{
BundleEventS3Reader, BundleHistory, BundleHistoryEvent, S3EventReaderWriter,
TransactionMetadata,
};
use tracing::info;

use crate::Status;

/// RPC API for transaction status
#[rpc(server, namespace = "base")]
pub trait TransactionStatusApi {
/// Gets the status of a transaction
#[method(name = "transactionStatus")]
async fn transaction_status(&self, tx_hash: TxHash) -> RpcResult<Status>;
}

/// Implementation of the metering RPC API
pub struct TransactionStatusApiImpl {
s3: S3EventReaderWriter,
}

impl TransactionStatusApiImpl {
/// Creates a new instance of TransactionStatusApi
pub fn new(client: S3Client, bucket: String) -> Self {
info!(message = "using aws s3 client");
let s3 = S3EventReaderWriter::new(client, bucket);
Self { s3 }
}
}

#[async_trait]
impl TransactionStatusApiServer for TransactionStatusApiImpl {
async fn transaction_status(&self, tx_hash: TxHash) -> RpcResult<Status> {
info!(message = "getting bundle history", tx_hash = %tx_hash);

let metadata: Option<TransactionMetadata> =
self.s3.get_transaction_metadata(tx_hash).await.map_err(|e| {
ErrorObjectOwned::owned(
ErrorCode::InternalError.code(),
format!("Failed to get transaction metadata: {}", e),
None::<()>,
)
})?;

match metadata {
Some(metadata) => {
// TODO: a transaction can be in multiple bundles, but for now we'll only get the latest one
let bundle_id = metadata.bundle_ids[metadata.bundle_ids.len() - 1];
let history: Option<BundleHistory> =
self.s3.get_bundle_history(bundle_id).await.map_err(|e| {
ErrorObjectOwned::owned(
ErrorCode::InternalError.code(),
format!("Failed to get bundle history: {}", e),
None::<()>,
)
})?;

match history {
Some(history) => {
if history.history.is_empty() {
Ok(Status::Unknown)
} else {
let last_event = history.history.last().unwrap();
match last_event {
BundleHistoryEvent::Received { .. } => Ok(Status::Pending),
BundleHistoryEvent::Cancelled { .. } => Ok(Status::Cancelled),
BundleHistoryEvent::BuilderIncluded { .. } => {
Ok(Status::BuilderIncluded)
}
BundleHistoryEvent::BlockIncluded { .. } => {
Ok(Status::BlockIncluded)
}
BundleHistoryEvent::Dropped { .. } => Ok(Status::Dropped),
}
}
}
None => Ok(Status::Unknown),
}
}
None => Ok(Status::Unknown),
}
}
}