Skip to content

Commit e2a6965

Browse files
authored
redis memory leak fix & stronger EOA healing (#48)
* add redis cleanup script * clean up EOA transaction data for successful and failed transactions * stronger self-healing and recovery for blocked EOA * review fixes * more review fixes
1 parent f649ddb commit e2a6965

File tree

22 files changed

+1224
-132
lines changed

22 files changed

+1224
-132
lines changed

Cargo.lock

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

executors/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,3 +27,4 @@ futures = "0.3.31"
2727
moka = { version = "0.12.10", features = ["future"] }
2828
prometheus = "0.13.4"
2929
lazy_static = "1.5.0"
30+
tokio-retry2 = {version = "0.6.0", features = ["jitter"]}

executors/src/eoa/error_classifier.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,9 @@ pub enum EoaExecutionError {
3838
message: String,
3939
inner_error: Option<EngineError>,
4040
},
41+
42+
/// Thirdweb support error that requires retry in place
43+
ThirdwebSupportError { message: String },
4144
}
4245

4346
/// Recovery strategy for an EOA execution error
@@ -115,6 +118,10 @@ impl EoaErrorMapper {
115118
EoaExecutionError::AccountError {
116119
message: message.to_string(),
117120
}
121+
} else if msg_lower.contains("we are not able to process your request at this time") {
122+
EoaExecutionError::ThirdwebSupportError {
123+
message: message.to_string(),
124+
}
118125
} else {
119126
// Not an actionable error - let engine error handle it
120127
EoaExecutionError::RpcError {
@@ -204,6 +211,14 @@ impl EoaErrorMapper {
204211
retry_delay: None,
205212
},
206213

214+
EoaExecutionError::ThirdwebSupportError { .. } => RecoveryStrategy {
215+
queue_confirmation: false,
216+
recycle_nonce: false,
217+
needs_resync: false,
218+
retryable: true,
219+
retry_delay: Some(Duration::from_secs(1)), // Short delay for retry in place
220+
},
221+
207222
EoaExecutionError::RpcError { .. } => {
208223
// This should not be used - let engine error handle it
209224
RecoveryStrategy {
@@ -231,6 +246,7 @@ impl EoaExecutionError {
231246
| EoaExecutionError::GasError { message }
232247
| EoaExecutionError::PoolLimitExceeded { message }
233248
| EoaExecutionError::AccountError { message }
249+
| EoaExecutionError::ThirdwebSupportError { message }
234250
| EoaExecutionError::RpcError { message, .. } => message,
235251
}
236252
}

executors/src/eoa/store/atomic.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -503,7 +503,8 @@ impl AtomicEoaExecutorStore {
503503
// Add new hash:id to submitted (keeping old ones)
504504
pipeline.zadd(&submitted_key, &submitted_transaction_string, nonce);
505505

506-
// Still maintain separate hash-to-ID mapping for backward compatibility
506+
// Create hash-to-ID mapping for the new gas bump hash
507+
// Keep ALL hashes until transaction is fully cleaned up for re-org protection
507508
pipeline.set(&hash_to_id_key, &submitted_transaction.transaction_id);
508509

509510
// Simply push the new attempt to the attempts list
@@ -554,6 +555,7 @@ impl AtomicEoaExecutorStore {
554555
pending_transaction: &PendingTransaction,
555556
error: EoaExecutorWorkerError,
556557
webhook_queue: Arc<twmq::Queue<WebhookJobHandler>>,
558+
failed_transaction_expiry_seconds: u64,
557559
) -> Result<(), TransactionStoreError> {
558560
self.with_lock_check(|pipeline| {
559561
let pending_key = self.pending_transactions_zset_name();
@@ -563,11 +565,23 @@ impl AtomicEoaExecutorStore {
563565
// Remove from pending state
564566
pipeline.zrem(&pending_key, &pending_transaction.transaction_id);
565567

566-
// Update transaction data with failure
568+
// Update transaction data with failure and set expiry
567569
pipeline.hset(&tx_data_key, "completed_at", now);
568570
pipeline.hset(&tx_data_key, "failure_reason", error.to_string());
569571
pipeline.hset(&tx_data_key, "status", "failed");
570572

573+
// Set expiry on all related keys for failed transactions
574+
let transaction_keys =
575+
self.get_all_transaction_keys(&pending_transaction.transaction_id);
576+
577+
let ttl: i64 = i64::try_from(failed_transaction_expiry_seconds)
578+
.unwrap_or(i64::MAX)
579+
.max(1);
580+
581+
for key in transaction_keys {
582+
pipeline.expire(&key, ttl);
583+
}
584+
571585
let event = EoaExecutorEvent {
572586
transaction_id: pending_transaction.transaction_id.clone(),
573587
address: pending_transaction.user_request.from,
@@ -613,12 +627,14 @@ impl AtomicEoaExecutorStore {
613627
&self,
614628
results: Vec<SubmissionResult>,
615629
webhook_queue: Arc<twmq::Queue<WebhookJobHandler>>,
630+
failed_transaction_expiry_seconds: u64,
616631
) -> Result<BorrowedProcessingReport, TransactionStoreError> {
617632
self.execute_with_watch_and_retry(&ProcessBorrowedTransactions {
618633
results,
619634
keys: &self.keys,
620635
webhook_queue,
621636
eoa_metrics: &self.eoa_metrics,
637+
failed_transaction_expiry_seconds,
622638
})
623639
.await
624640
}

executors/src/eoa/store/borrowed.rs

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::eoa::{
1212
},
1313
worker::error::EoaExecutorWorkerError,
1414
};
15-
use crate::metrics::{current_timestamp_ms, calculate_duration_seconds, EoaMetrics};
15+
use crate::metrics::{EoaMetrics, calculate_duration_seconds, current_timestamp_ms};
1616
use crate::webhook::{WebhookJobHandler, queue_webhook_envelopes};
1717

1818
#[derive(Debug, Clone)]
@@ -41,6 +41,7 @@ pub struct ProcessBorrowedTransactions<'a> {
4141
pub keys: &'a EoaExecutorStoreKeys,
4242
pub webhook_queue: Arc<Queue<WebhookJobHandler>>,
4343
pub eoa_metrics: &'a EoaMetrics,
44+
pub failed_transaction_expiry_seconds: u64,
4445
}
4546

4647
#[derive(Debug, Default)]
@@ -121,15 +122,13 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
121122
SubmissionResultType::Success => {
122123
// Record metrics: transaction queued to sent
123124
let sent_timestamp = current_timestamp_ms();
124-
let queued_to_sent_duration = calculate_duration_seconds(
125-
result.transaction.queued_at,
126-
sent_timestamp
127-
);
125+
let queued_to_sent_duration =
126+
calculate_duration_seconds(result.transaction.queued_at, sent_timestamp);
128127
// Record metrics using the clean EoaMetrics abstraction
129128
self.eoa_metrics.record_transaction_sent(
130129
self.keys.eoa,
131130
self.keys.chain_id,
132-
queued_to_sent_duration
131+
queued_to_sent_duration,
133132
);
134133

135134
// Add to submitted zset
@@ -189,7 +188,7 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
189188
// Update transaction data status
190189
let tx_data_key = self.keys.transaction_data_key_name(transaction_id);
191190
pipeline.hset(&tx_data_key, "status", "pending");
192-
191+
193192
// ask for this nonce to be recycled because we did not consume the nonce
194193
pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce);
195194

@@ -219,12 +218,22 @@ impl SafeRedisTransaction for ProcessBorrowedTransactions<'_> {
219218
report.moved_to_pending += 1;
220219
}
221220
SubmissionResultType::Fail(err) => {
222-
// Mark as failed
221+
// Mark as failed and set expiry
223222
let tx_data_key = self.keys.transaction_data_key_name(transaction_id);
224223
pipeline.hset(&tx_data_key, "status", "failed");
225224
pipeline.hset(&tx_data_key, "completed_at", now);
226225
pipeline.hset(&tx_data_key, "failure_reason", err.to_string());
227226

227+
let ttl: i64 = i64::try_from(self.failed_transaction_expiry_seconds)
228+
.unwrap_or(i64::MAX)
229+
.max(1);
230+
231+
// Set expiry on all related keys for failed transactions
232+
let transaction_keys = self.keys.get_all_transaction_keys(transaction_id);
233+
for key in transaction_keys {
234+
pipeline.expire(&key, ttl);
235+
}
236+
228237
// ask for this nonce to be recycled because we did not consume the nonce
229238
pipeline.zadd(self.keys.recycled_nonces_zset_name(), nonce, nonce);
230239

executors/src/eoa/store/mod.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -186,6 +186,26 @@ impl EoaExecutorStoreKeys {
186186
}
187187
}
188188

189+
/// Get all Redis keys related to a transaction for cleanup
190+
pub fn get_all_transaction_keys(&self, transaction_id: &str) -> Vec<String> {
191+
vec![
192+
self.transaction_data_key_name(transaction_id),
193+
self.transaction_attempts_list_name(transaction_id),
194+
]
195+
}
196+
197+
/// Get all Redis keys related to a transaction including hash mappings for cleanup
198+
pub fn get_all_transaction_keys_with_hashes(&self, transaction_id: &str, transaction_hashes: &[String]) -> Vec<String> {
199+
let mut keys = self.get_all_transaction_keys(transaction_id);
200+
201+
// Add hash-to-id mappings
202+
for hash in transaction_hashes {
203+
keys.push(self.transaction_hash_to_id_key_name(hash));
204+
}
205+
206+
keys
207+
}
208+
189209
/// Name of the hashmap that maps `transaction_id` -> `BorrowedTransactionData`
190210
///
191211
/// This is used for crash recovery. Before submitting a transaction, we atomically move from pending to this borrowed hashmap.

executors/src/eoa/store/submitted.rs

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -354,21 +354,19 @@ impl SafeRedisTransaction for CleanSubmittedTransactions<'_> {
354354
(id, Some(confirmed_tx)) => {
355355
// Clean up confirmed transaction from Redis
356356
self.remove_transaction_from_redis_submitted_zset(pipeline, tx);
357-
let data_key_name = self.keys.transaction_data_key_name(id);
358-
pipeline.hset(&data_key_name, "status", "confirmed");
359-
pipeline.hset(&data_key_name, "completed_at", now);
360-
pipeline.hset(
361-
&data_key_name,
362-
"receipt",
363-
confirmed_tx.receipt_serialized.clone(),
364-
);
357+
358+
// IMMEDIATE CLEANUP: Delete all transaction data since it's confirmed
359+
// Note: Hash mappings will be cleaned up periodically by maintenance script
360+
let keys_to_delete = self.keys.get_all_transaction_keys(id);
361+
for key in keys_to_delete {
362+
pipeline.del(&key);
363+
}
365364

366365
if let SubmittedTransactionHydrated::Real(tx) = tx {
367366
// Record metrics: transaction queued to mined for confirmed transactions
368367
let confirmed_timestamp = current_timestamp_ms();
369368
let queued_to_mined_duration =
370369
calculate_duration_seconds(tx.queued_at, confirmed_timestamp);
371-
// Record metrics using the clean EoaMetrics abstraction
372370
self.eoa_metrics.record_transaction_confirmed(
373371
self.keys.eoa,
374372
self.keys.chain_id,

executors/src/eoa/worker/confirm.rs

Lines changed: 49 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use alloy::{primitives::B256, providers::Provider};
22
use engine_core::{chain::Chain, error::AlloyRpcErrorToEngineError};
33
use serde::{Deserialize, Serialize};
4+
use tracing::Instrument;
45

56
use crate::{
67
FlashblocksTransactionCount, TransactionCounts,
@@ -108,8 +109,38 @@ impl<C: Chain> EoaExecutorWorker<C> {
108109
{
109110
tracing::warn!(
110111
error = ?e,
111-
"Failed to attempt gas bump for stalled nonce"
112+
"Failed to attempt gas bump for stalled nonce, trying fallback"
112113
);
114+
115+
// Fallback: try to send a no-op transaction
116+
tracing::info!(
117+
nonce = transaction_counts.preconfirmed,
118+
"Gas bump failed, attempting no-op transaction as fallback"
119+
);
120+
if let Ok(noop_tx) = self.send_noop_transaction(transaction_counts.preconfirmed).await {
121+
if let Err(e) = self.store.process_noop_transactions(&[noop_tx]).await {
122+
tracing::error!(
123+
error = ?e,
124+
"Failed to process fallback no-op transaction for stalled nonce"
125+
);
126+
}
127+
} else {
128+
tracing::error!("Failed to send fallback no-op transaction for stalled nonce");
129+
130+
// Ultimate fallback: check if we should trigger auto-reset
131+
let time_since_movement = now.saturating_sub(current_health.last_nonce_movement_at);
132+
if time_since_movement > 5 * 60 * 1000 && submitted_count > 0 { // 5 minutes
133+
tracing::warn!(
134+
nonce = transaction_counts.preconfirmed,
135+
time_since_movement = time_since_movement,
136+
"EOA appears permanently stuck, scheduling auto-reset"
137+
);
138+
139+
if let Err(e) = self.store.schedule_manual_reset().await {
140+
tracing::error!(error = ?e, "Failed to schedule auto-reset");
141+
}
142+
}
143+
}
113144
}
114145
}
115146
}
@@ -356,7 +387,7 @@ impl<C: Chain> EoaExecutorWorker<C> {
356387
error = ?e,
357388
"Failed to build typed transaction for gas bump"
358389
);
359-
return Ok(false);
390+
return Err(e);
360391
}
361392
};
362393
let bumped_typed_tx = self.apply_gas_bump_to_typed_transaction(typed_tx, 120); // 20% increase
@@ -375,7 +406,7 @@ impl<C: Chain> EoaExecutorWorker<C> {
375406
error = ?e,
376407
"Failed to sign transaction for gas bump"
377408
);
378-
return Ok(false);
409+
return Err(e);
379410
}
380411
};
381412

@@ -393,9 +424,15 @@ impl<C: Chain> EoaExecutorWorker<C> {
393424
)
394425
.await?;
395426

396-
// Send the bumped transaction
397-
let tx_envelope = bumped_tx.into();
398-
match self.chain.provider().send_tx_envelope(tx_envelope).await {
427+
// Send the bumped transaction with retry logic
428+
match self.send_tx_envelope_with_retry(bumped_tx.into(), crate::eoa::worker::error::SendContext::InitialBroadcast)
429+
.instrument(tracing::info_span!(
430+
"send_tx_envelope_with_retry",
431+
transaction_id = %newest_transaction_data.transaction_id,
432+
nonce = expected_nonce,
433+
context = "gas_bump"
434+
))
435+
.await {
399436
Ok(_) => {
400437
tracing::info!(
401438
transaction_id = ?newest_transaction_data.transaction_id,
@@ -409,10 +446,13 @@ impl<C: Chain> EoaExecutorWorker<C> {
409446
transaction_id = ?newest_transaction_data.transaction_id,
410447
nonce = expected_nonce,
411448
error = ?e,
412-
"Failed to send gas bumped transaction"
449+
"Failed to send gas bumped transaction after retries"
413450
);
414-
// Don't fail the worker, just log the error
415-
Ok(false)
451+
// Convert RPC error to worker error and bubble up
452+
Err(EoaExecutorWorkerError::TransactionSendError {
453+
message: format!("Failed to send gas bumped transaction: {e:?}"),
454+
inner_error: e.to_engine_error(&self.chain),
455+
})
416456
}
417457
}
418458
} else {

0 commit comments

Comments
 (0)