diff --git a/daemon/test_daemon.go b/daemon/test_daemon.go index 4f889d7bf..1f3af5c18 100644 --- a/daemon/test_daemon.go +++ b/daemon/test_daemon.go @@ -47,6 +47,7 @@ import ( "github.com/bsv-blockchain/teranode/test/utils/wait" "github.com/bsv-blockchain/teranode/ulogger" "github.com/bsv-blockchain/teranode/util" + libp2pPeer "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" tc "github.com/testcontainers/testcontainers-go/modules/compose" @@ -460,8 +461,13 @@ func NewTestDaemon(t *testing.T, opts TestOptions) *TestDaemon { blockAssembler, ok := blockAssemblyService.(*blockassembly.BlockAssembly) require.True(t, ok) + assetURL := fmt.Sprintf("http://127.0.0.1:%d", appSettings.Asset.HTTPPort) + if appSettings.Asset.APIPrefix != "" { + assetURL += appSettings.Asset.APIPrefix + } + return &TestDaemon{ - AssetURL: fmt.Sprintf("http://127.0.0.1:%d", appSettings.Asset.HTTPPort), + AssetURL: assetURL, BlockAssembler: blockAssembler.GetBlockAssembler(), BlockAssemblyClient: blockAssemblyClient, BlockValidationClient: blockValidationClient, @@ -1294,6 +1300,25 @@ func (td *TestDaemon) WaitForBlockStateChange(t *testing.T, expectedBlock *model } } +func (td *TestDaemon) WaitForBlockhash(t *testing.T, blockHash *chainhash.Hash, timeout time.Duration) { + ctx, cancel := context.WithTimeout(td.Ctx, timeout) + defer cancel() + + for { + select { + case <-ctx.Done(): + t.Errorf("Timeout waiting for block with hash %s", blockHash.String()) + return + default: + _, err := td.BlockchainClient.GetBlock(ctx, blockHash) + if err == nil { + return + } + time.Sleep(100 * time.Millisecond) + } + } +} + func (td *TestDaemon) WaitForBlock(t *testing.T, expectedBlock *model.Block, timeout time.Duration, skipVerifyChain ...bool) { ctx, cancel := context.WithTimeout(td.Ctx, timeout) defer cancel() @@ -1865,6 +1890,25 @@ func (td *TestDaemon) DisconnectFromPeer(t *testing.T, peer *TestDaemon) { require.NoError(t, err, "Failed to disconnect from peer") } +func (td *TestDaemon) InjectPeer(t *testing.T, peer *TestDaemon) { + peerID, err := libp2pPeer.Decode(peer.Settings.P2P.PeerID) + require.NoError(t, err, "Failed to decode peer ID") + + p2pService, err := td.d.ServiceManager.GetService("P2P") + require.NoError(t, err, "Failed to get P2P service") + + p2pServer, ok := p2pService.(*p2p.Server) + require.True(t, ok, "Failed to cast P2P service to Server") + + // Inject my peer info to other peer... + header, meta, err := peer.BlockchainClient.GetBestBlockHeader(td.Ctx) + require.NoError(t, err, "Failed to get best block header") + + p2pServer.InjectPeerForTesting(peerID, peer.Settings.Context, peer.AssetURL, meta.Height, header.Hash().String()) + + t.Logf("Injected peer %s into %s's registry (PeerID: %s)", peer.Settings.Context, td.Settings.Context, peerID) +} + func peerAddress(peer *TestDaemon) string { return fmt.Sprintf("/dns/127.0.0.1/tcp/%d/p2p/%s", peer.Settings.P2P.Port, peer.Settings.P2P.PeerID) } diff --git a/services/p2p/server_helpers.go b/services/p2p/server_helpers.go index 5fa4d043a..6f6c0515c 100644 --- a/services/p2p/server_helpers.go +++ b/services/p2p/server_helpers.go @@ -480,6 +480,23 @@ func (s *Server) addConnectedPeer(peerID peer.ID, clientName string, height uint } } +// InjectPeerForTesting directly injects a peer into the registry for testing purposes. +// This method allows deterministic peer setup without requiring actual P2P network connections. +func (s *Server) InjectPeerForTesting(peerID peer.ID, clientName, dataHubURL string, height uint32, blockHash string) { + s.addConnectedPeer(peerID, clientName) + s.updateDataHubURL(peerID, dataHubURL) + s.updateBlockHash(peerID, blockHash) + s.updatePeerHeight(peerID, int32(height)) + + if s.peerRegistry != nil { + s.peerRegistry.UpdateStorage(peerID, "full") + } + + if s.syncCoordinator != nil { + s.syncCoordinator.UpdatePeerInfo(peerID, int32(height), blockHash, dataHubURL) + } +} + func (s *Server) removePeer(peerID peer.ID) { if s.peerRegistry != nil { // Mark as disconnected before removing diff --git a/services/p2p/sync_coordinator.go b/services/p2p/sync_coordinator.go index 1aa37eefe..c77110402 100644 --- a/services/p2p/sync_coordinator.go +++ b/services/p2p/sync_coordinator.go @@ -503,7 +503,7 @@ func (sc *SyncCoordinator) logCandidateList(candidates []*PeerInfo) { func (sc *SyncCoordinator) periodicEvaluation(ctx context.Context) { defer sc.wg.Done() - ticker := time.NewTicker(30 * time.Second) + ticker := time.NewTicker(sc.settings.P2P.SyncCoordinatorPeriodicEvaluationInterval) defer ticker.Stop() for { diff --git a/settings/interface.go b/settings/interface.go index 1c6dc18b8..afce24a90 100644 --- a/settings/interface.go +++ b/settings/interface.go @@ -451,6 +451,9 @@ type P2PSettings struct { // Node mode configuration (full vs pruned) AllowPrunedNodeFallback bool // If true, fall back to pruned nodes when no full nodes available (default: true). Selects youngest pruned node (smallest height) to minimize UTXO pruning risk. + + // This is the time we trigger a periodic evaluation in the sync coordinator + SyncCoordinatorPeriodicEvaluationInterval time.Duration } type CoinbaseSettings struct { diff --git a/settings/settings.go b/settings/settings.go index 40f1e6020..d7df011eb 100644 --- a/settings/settings.go +++ b/settings/settings.go @@ -400,6 +400,7 @@ func NewSettings(alternativeContext ...string) *Settings { AllowPrivateIPs: getBool("p2p_allow_private_ips", false, alternativeContext...), // Default false for production safety // Full/pruned node selection configuration AllowPrunedNodeFallback: getBool("p2p_allow_pruned_node_fallback", true, alternativeContext...), + SyncCoordinatorPeriodicEvaluationInterval: getDuration("p2p_sync_coordinator_periodic_evaluation_interval", 30*time.Second, alternativeContext...), }, Coinbase: CoinbaseSettings{ DB: getString("coinbaseDB", "", alternativeContext...), diff --git a/test/e2e/daemon/ready/multi_node_inject_test.go b/test/e2e/daemon/ready/multi_node_inject_test.go new file mode 100644 index 000000000..c1ab38387 --- /dev/null +++ b/test/e2e/daemon/ready/multi_node_inject_test.go @@ -0,0 +1,124 @@ +package smoke + +import ( + "fmt" + "net/url" + "testing" + "time" + + "github.com/bsv-blockchain/teranode/daemon" + "github.com/bsv-blockchain/teranode/services/blockchain" + "github.com/bsv-blockchain/teranode/settings" + "github.com/bsv-blockchain/teranode/test/utils/aerospike" + "github.com/stretchr/testify/require" +) + +func getAerospikeInstance(t *testing.T) *url.URL { + urlStr, teardownFn, err := aerospike.InitAerospikeContainer() + require.NoError(t, err, "Failed to setup Aerospike container") + + url, err := url.Parse(urlStr) + require.NoError(t, err, "Failed to parse UTXO store URL") + + t.Cleanup(func() { + _ = teardownFn() + }) + + return url +} + +func getTestDaemon(t *testing.T, settingsContext string, aerospikeURL *url.URL) *daemon.TestDaemon { + d := daemon.NewTestDaemon(t, daemon.TestOptions{ + EnableRPC: true, + EnableP2P: true, + EnableValidator: true, + SettingsContext: settingsContext, + SettingsOverrideFunc: func(s *settings.Settings) { + s.P2P.PeerCacheDir = t.TempDir() + s.UtxoStore.UtxoStore = aerospikeURL + s.ChainCfgParams.CoinbaseMaturity = 2 + s.P2P.SyncCoordinatorPeriodicEvaluationInterval = 1 * time.Second + }, + FSMState: blockchain.FSMStateRUNNING, + // EnableFullLogging: true, + }) + + t.Cleanup(func() { + d.Stop(t) + }) + + return d +} + +func printPeerRegistry(t *testing.T, td *daemon.TestDaemon) { + registry, err := td.P2PClient.GetPeerRegistry(t.Context()) + require.NoError(t, err) + + fmt.Printf("\nPeer %s (%s) registry:\n", td.Settings.ClientName, td.Settings.P2P.PeerID) + + for _, peerInfo := range registry { + fmt.Printf("\tName: %s (%s): Height=%d, BlockHash=%s, DataHubURL=%s", peerInfo.ClientName, peerInfo.ID, peerInfo.Height, peerInfo.BlockHash, peerInfo.DataHubURL) + } + + fmt.Println() + fmt.Println() +} + +// This test creates 2 nodes, and nodeA mines 3 blocks. Then we inject nodeA into nodeB, and nodeB should sync up to nodeA's height. +func Test_NodeB_Inject_After_NodeA_Mined(t *testing.T) { + SharedTestLock.Lock() + defer SharedTestLock.Unlock() + + sharedAerospike := getAerospikeInstance(t) + nodeA := getTestDaemon(t, "docker.host.teranode1.daemon", sharedAerospike) + nodeB := getTestDaemon(t, "docker.host.teranode2.daemon", sharedAerospike) + + t.Log(" Creating initial blockchain: [Genesis] -> [Block1] -> [Block2] -> [Block3]") + coinbaseTx := nodeA.MineToMaturityAndGetSpendableCoinbaseTx(t, nodeA.Ctx) + t.Logf(" Coinbase transaction available for spending: %s", coinbaseTx.TxIDChainHash().String()) + + // nodeA.InjectPeer(t, nodeB) + nodeB.InjectPeer(t, nodeA) + + printPeerRegistry(t, nodeB) + + nodeABestBlockHeader, _, err := nodeA.BlockchainClient.GetBestBlockHeader(nodeA.Ctx) + require.NoError(t, err) + + nodeB.WaitForBlockhash(t, nodeABestBlockHeader.Hash(), 10*time.Second) + +} + +// This test creates 2 nodes, and nodeB injects nodeA before nodeA mines any blocks. Then we mine 3 blocks on nodeA, and nodeB should sync up to nodeA's height. +func Test_NodeB_Inject_Before_NodeA_Mined(t *testing.T) { + SharedTestLock.Lock() + defer SharedTestLock.Unlock() + + sharedAerospike := getAerospikeInstance(t) + nodeA := getTestDaemon(t, "docker.host.teranode1.daemon", sharedAerospike) + nodeB := getTestDaemon(t, "docker.host.teranode2.daemon", sharedAerospike) + + // nodeA.InjectPeer(t, nodeB) + nodeB.InjectPeer(t, nodeA) + + printPeerRegistry(t, nodeB) + + // go func() { + // for { + // time.Sleep(5 * time.Second) + // printPeerRegistry(t, nodeB) + // } + // }() + + t.Log(" Creating initial blockchain: [Genesis] -> [Block1] -> [Block2] -> [Block3]") + coinbaseTx := nodeA.MineToMaturityAndGetSpendableCoinbaseTx(t, nodeA.Ctx) + t.Logf(" Coinbase transaction available for spending: %s", coinbaseTx.TxIDChainHash().String()) + + printPeerRegistry(t, nodeB) + + nodeABestBlockHeader, _, err := nodeA.BlockchainClient.GetBestBlockHeader(nodeA.Ctx) + require.NoError(t, err) + + nodeB.WaitForBlockhash(t, nodeABestBlockHeader.Hash(), 26*time.Second) + +}