@@ -267,11 +267,23 @@ async fn setup_database_and_rpc(
267267
268268fn spawn_router_service ( should_terminate : Arc < AtomicBool > ) -> tokio:: task:: JoinHandle < Result < ( ) > > {
269269 tokio:: spawn ( async move {
270- if let Err ( e) = router:: initialize_router ( should_terminate. clone ( ) ) . await {
271- error ! ( "[router] unexpected error {}" , e) ;
270+ info ! ( "[router] Starting router service" ) ;
271+ match router:: initialize_router ( should_terminate. clone ( ) ) . await {
272+ Ok ( ( ) ) => {
273+ info ! ( "[router] Router service completed normally" ) ;
274+ Ok ( ( ) )
275+ }
276+ Err ( e) => {
277+ error ! (
278+ "[router] CRITICAL: Router service failed with error: {:?}" ,
279+ e
280+ ) ;
281+ error ! ( "[router] This router failure may have caused the indexer to become unreachable" ) ;
282+ Err ( BlockchainError :: internal ( format ! (
283+ "Router service failed: {e}"
284+ ) ) )
285+ }
272286 }
273- info ! ( "[router] shutting down" ) ;
274- Ok ( ( ) )
275287 } )
276288}
277289
@@ -292,11 +304,23 @@ fn spawn_quick_indexer_service(
292304 let quick_indexer = QuickIndexer :: new ( quick_config, db, rpc_client, should_terminate) ;
293305
294306 Ok ( tokio:: spawn ( async move {
295- info ! ( "Starting quick indexer" ) ;
296- if let Err ( e) = quick_indexer. index ( ) . await {
297- error ! ( "[quick_index] unexpected error {}" , e) ;
307+ info ! ( "[quick_index] Starting quick indexer service" ) ;
308+ match quick_indexer. index ( ) . await {
309+ Ok ( ( ) ) => {
310+ info ! ( "[quick_index] Quick indexer completed normally" ) ;
311+ Ok ( ( ) )
312+ }
313+ Err ( e) => {
314+ error ! (
315+ "[quick_index] CRITICAL: Quick indexer failed with error: {:?}" ,
316+ e
317+ ) ;
318+ error ! ( "[quick_index] Quick indexer handles real-time block indexing - this failure stops new block processing" ) ;
319+ Err ( BlockchainError :: internal ( format ! (
320+ "Quick indexer failed: {e}"
321+ ) ) )
322+ }
298323 }
299- Ok ( ( ) )
300324 } ) )
301325}
302326
@@ -319,11 +343,23 @@ fn spawn_batch_indexer_service(
319343 let batch_indexer = BatchIndexer :: new ( batch_config, db, rpc_client, should_terminate) ;
320344
321345 Ok ( tokio:: spawn ( async move {
322- info ! ( "Starting batch indexer" ) ;
323- if let Err ( e) = batch_indexer. index ( ) . await {
324- error ! ( "[batch_index] unexpected error {}" , e) ;
346+ info ! ( "[batch_index] Starting batch indexer service" ) ;
347+ match batch_indexer. index ( ) . await {
348+ Ok ( ( ) ) => {
349+ info ! ( "[batch_index] Batch indexer completed normally" ) ;
350+ Ok ( ( ) )
351+ }
352+ Err ( e) => {
353+ error ! (
354+ "[batch_index] CRITICAL: Batch indexer failed with error: {:?}" ,
355+ e
356+ ) ;
357+ error ! ( "[batch_index] Batch indexer handles historical block indexing - this failure stops backfilling" ) ;
358+ Err ( BlockchainError :: internal ( format ! (
359+ "Batch indexer failed: {e}"
360+ ) ) )
361+ }
325362 }
326- Ok ( ( ) )
327363 } ) )
328364}
329365
@@ -389,20 +425,69 @@ async fn initialize_index_metadata(
389425 Err ( BlockchainError :: internal ( "Failed to get indexer metadata" ) )
390426}
391427
428+ #[ allow( clippy:: cognitive_complexity) ]
392429async fn wait_for_thread_completion ( handles : Vec < JoinHandle < Result < ( ) > > > ) -> Result < ( ) > {
393- for handle in handles {
430+ let mut has_errors = false ;
431+
432+ for ( index, handle) in handles. into_iter ( ) . enumerate ( ) {
433+ let service_name = match index {
434+ 0 => "router" ,
435+ 1 => "quick_indexer" ,
436+ 2 => "batch_indexer" ,
437+ _ => "unknown_service" ,
438+ } ;
439+
394440 match handle. await {
395441 Ok ( Ok ( ( ) ) ) => {
396- info ! ( "Thread completed successfully" ) ;
442+ info ! ( "[{}] Thread completed successfully" , service_name ) ;
397443 }
398444 Ok ( Err ( e) ) => {
399- error ! ( "Thread completed with an error: {:?}" , e) ;
445+ error ! (
446+ "[{}] CRITICAL: Thread completed with an error: {:?}" ,
447+ service_name, e
448+ ) ;
449+ error ! (
450+ "[{}] Error details: {}" ,
451+ service_name,
452+ format_error_details( & e)
453+ ) ;
454+ has_errors = true ;
400455 }
401456 Err ( e) => {
402- error ! ( "Thread panicked: {:?}" , e) ;
457+ error ! ( "[{}] CRITICAL: Thread panicked: {:?}" , service_name, e) ;
458+ if e. is_panic ( ) {
459+ error ! ( "[{}] This was a panic - check for unwrap(), expect(), or other panic sources" , service_name) ;
460+ }
461+ if e. is_cancelled ( ) {
462+ error ! ( "[{}] Task was cancelled" , service_name) ;
463+ }
464+ has_errors = true ;
403465 }
404466 }
405467 }
406468
469+ if has_errors {
470+ error ! (
471+ "INDEXER SHUTDOWN: One or more services failed - this explains why the indexer stopped"
472+ ) ;
473+ return Err ( BlockchainError :: internal (
474+ "One or more indexing services failed" ,
475+ ) ) ;
476+ }
477+
478+ info ! ( "All indexing services completed successfully" ) ;
407479 Ok ( ( ) )
408480}
481+
482+ fn format_error_details ( error : & BlockchainError ) -> String {
483+ let mut details = Vec :: new ( ) ;
484+ details. push ( format ! ( "Error: {error}" ) ) ;
485+
486+ let mut current_error: & dyn std:: error:: Error = error;
487+ while let Some ( source) = current_error. source ( ) {
488+ details. push ( format ! ( "Caused by: {source}" ) ) ;
489+ current_error = source;
490+ }
491+
492+ details. join ( "\n " )
493+ }
0 commit comments