@@ -28,10 +28,7 @@ use cumulus_relay_chain_interface::{RelayChainInterface, RelayChainResult};
2828use cumulus_relay_chain_minimal_node:: {
2929 build_minimal_relay_chain_node_light_client, build_minimal_relay_chain_node_with_rpc,
3030} ;
31- use futures:: {
32- channel:: { mpsc, oneshot} ,
33- FutureExt , StreamExt ,
34- } ;
31+ use futures:: { channel:: mpsc, StreamExt } ;
3532use polkadot_primitives:: { CollatorPair , OccupiedCoreAssumption } ;
3633use sc_client_api:: {
3734 Backend as BackendT , BlockBackend , BlockchainEvents , Finalizer , ProofProvider , UsageProvider ,
@@ -43,7 +40,7 @@ use sc_consensus::{
4340use sc_network:: { config:: SyncMode , service:: traits:: NetworkService , NetworkBackend } ;
4441use sc_network_sync:: SyncingService ;
4542use sc_network_transactions:: TransactionsHandlerController ;
46- use sc_service:: { Configuration , NetworkStarter , SpawnTaskHandle , TaskManager , WarpSyncParams } ;
43+ use sc_service:: { Configuration , NetworkStarter , SpawnTaskHandle , TaskManager , WarpSyncConfig } ;
4744use sc_telemetry:: { log, TelemetryWorkerHandle } ;
4845use sc_utils:: mpsc:: TracingUnboundedSender ;
4946use sp_api:: ProvideRuntimeApi ;
@@ -467,12 +464,19 @@ where
467464{
468465 let warp_sync_params = match parachain_config. network . sync_mode {
469466 SyncMode :: Warp => {
470- let target_block = warp_sync_get :: < Block , RCInterface > (
471- para_id,
472- relay_chain_interface. clone ( ) ,
473- spawn_handle. clone ( ) ,
474- ) ;
475- Some ( WarpSyncParams :: WaitForTarget ( target_block) )
467+ log:: debug!( target: LOG_TARGET_SYNC , "waiting for announce block..." ) ;
468+
469+ let target_block =
470+ wait_for_finalized_para_head :: < Block , _ > ( para_id, relay_chain_interface. clone ( ) )
471+ . await
472+ . inspect_err ( |e| {
473+ log:: error!(
474+ target: LOG_TARGET_SYNC ,
475+ "Unable to determine parachain target block {:?}" ,
476+ e
477+ ) ;
478+ } ) ?;
479+ Some ( WarpSyncConfig :: WithTarget ( target_block) )
476480 } ,
477481 _ => None ,
478482 } ;
@@ -500,67 +504,37 @@ where
500504 spawn_handle,
501505 import_queue,
502506 block_announce_validator_builder : Some ( Box :: new ( move |_| block_announce_validator) ) ,
503- warp_sync_params,
507+ warp_sync_config : warp_sync_params,
504508 block_relay : None ,
505509 metrics,
506510 } )
507511}
508512
509- /// Creates a new background task to wait for the relay chain to sync up and retrieve the parachain
510- /// header
511- fn warp_sync_get < B , RCInterface > (
512- para_id : ParaId ,
513- relay_chain_interface : RCInterface ,
514- spawner : SpawnTaskHandle ,
515- ) -> oneshot:: Receiver < <B as BlockT >:: Header >
516- where
517- B : BlockT + ' static ,
518- RCInterface : RelayChainInterface + ' static ,
519- {
520- let ( sender, receiver) = oneshot:: channel :: < B :: Header > ( ) ;
521- spawner. spawn (
522- "cumulus-parachain-wait-for-target-block" ,
523- None ,
524- async move {
525- log:: debug!(
526- target: LOG_TARGET_SYNC ,
527- "waiting for announce block in a background task..." ,
528- ) ;
529-
530- let _ = wait_for_finalized_para_head :: < B , _ > ( sender, para_id, relay_chain_interface)
531- . await
532- . map_err ( |e| {
533- log:: error!(
534- target: LOG_TARGET_SYNC ,
535- "Unable to determine parachain target block {:?}" ,
536- e
537- )
538- } ) ;
539- }
540- . boxed ( ) ,
541- ) ;
542-
543- receiver
544- }
545-
546513/// Waits for the relay chain to have finished syncing and then gets the parachain header that
547514/// corresponds to the last finalized relay chain block.
548515async fn wait_for_finalized_para_head < B , RCInterface > (
549- sender : oneshot:: Sender < <B as BlockT >:: Header > ,
550516 para_id : ParaId ,
551517 relay_chain_interface : RCInterface ,
552- ) -> Result < ( ) , Box < dyn std :: error:: Error + Send + Sync > >
518+ ) -> sc_service :: error:: Result < < B as BlockT > :: Header >
553519where
554520 B : BlockT + ' static ,
555521 RCInterface : RelayChainInterface + Send + ' static ,
556522{
557- let mut imported_blocks = relay_chain_interface. import_notification_stream ( ) . await ?. fuse ( ) ;
558- while imported_blocks. next ( ) . await . is_some ( ) {
559- let is_syncing = relay_chain_interface. is_major_syncing ( ) . await . map_err ( |e| {
560- Box :: < dyn std:: error:: Error + Send + Sync > :: from ( format ! (
561- "Unable to determine sync status. {e}"
523+ let mut imported_blocks = relay_chain_interface
524+ . import_notification_stream ( )
525+ . await
526+ . map_err ( |error| {
527+ sc_service:: Error :: Other ( format ! (
528+ "Relay chain import notification stream error when waiting for parachain head: \
529+ {error}"
562530 ) )
563- } ) ?;
531+ } ) ?
532+ . fuse ( ) ;
533+ while imported_blocks. next ( ) . await . is_some ( ) {
534+ let is_syncing = relay_chain_interface
535+ . is_major_syncing ( )
536+ . await
537+ . map_err ( |e| format ! ( "Unable to determine sync status: {e}" ) ) ?;
564538
565539 if !is_syncing {
566540 let relay_chain_best_hash = relay_chain_interface
@@ -586,8 +560,7 @@ where
586560 finalized_header. number( ) ,
587561 finalized_header. hash( )
588562 ) ;
589- let _ = sender. send ( finalized_header) ;
590- return Ok ( ( ) )
563+ return Ok ( finalized_header)
591564 }
592565 }
593566
0 commit comments