Skip to content

Commit 8f56e21

Browse files
committed
fix(node/bft): ensure gateways are shut down during prod tests
1 parent d703522 commit 8f56e21

File tree

10 files changed

+156
-109
lines changed

10 files changed

+156
-109
lines changed

node/bft/src/bft.rs

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -899,7 +899,10 @@ mod tests {
899899
use aleo_std::StorageMode;
900900
use anyhow::Result;
901901
use indexmap::{IndexMap, IndexSet};
902-
use std::sync::Arc;
902+
use std::{
903+
net::{Ipv4Addr, SocketAddr, SocketAddrV4},
904+
sync::Arc,
905+
};
903906

904907
type CurrentNetwork = snarkvm::console::network::MainnetV0;
905908

@@ -934,13 +937,17 @@ mod tests {
934937
) -> anyhow::Result<BFT<CurrentNetwork>> {
935938
// Create the block synchronization logic.
936939
let block_sync = Arc::new(BlockSync::new(ledger.clone()));
940+
941+
// Pick a random port so we can run tests concurrently.
942+
let any_addr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::LOCALHOST, 0));
943+
937944
// Initialize the BFT.
938945
BFT::new(
939946
account.clone(),
940947
storage.clone(),
941948
ledger.clone(),
942949
block_sync,
943-
None,
950+
Some(any_addr),
944951
&[],
945952
StorageMode::new_test(None),
946953
None,

node/bft/src/gateway.rs

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,29 +20,31 @@ use crate::{
2020
MAX_BATCH_DELAY_IN_MS,
2121
MEMORY_POOL_PORT,
2222
Worker,
23-
events::{BatchPropose, BatchSignature, EventCodec, PrimaryPing},
23+
events::{
24+
BatchPropose,
25+
BatchSignature,
26+
BlockRequest,
27+
BlockResponse,
28+
CertificateRequest,
29+
CertificateResponse,
30+
ChallengeRequest,
31+
ChallengeResponse,
32+
DataBlocks,
33+
DisconnectReason,
34+
Event,
35+
EventCodec,
36+
EventTrait,
37+
PrimaryPing,
38+
TransmissionRequest,
39+
TransmissionResponse,
40+
ValidatorsRequest,
41+
ValidatorsResponse,
42+
},
2443
helpers::{Cache, CallbackHandle, Resolver, Storage, WorkerSender, assign_to_worker},
25-
spawn_blocking,
44+
ledger_service::LedgerService,
2645
};
27-
use aleo_std::StorageMode;
46+
2847
use snarkos_account::Account;
29-
use snarkos_node_bft_events::{
30-
BlockRequest,
31-
BlockResponse,
32-
CertificateRequest,
33-
CertificateResponse,
34-
ChallengeRequest,
35-
ChallengeResponse,
36-
DataBlocks,
37-
DisconnectReason,
38-
Event,
39-
EventTrait,
40-
TransmissionRequest,
41-
TransmissionResponse,
42-
ValidatorsRequest,
43-
ValidatorsResponse,
44-
};
45-
use snarkos_node_bft_ledger_service::LedgerService;
4648
use snarkos_node_router::{NodeType, Peer, PeerPoolHandling};
4749
use snarkos_node_sync::{MAX_BLOCKS_BEHIND, communication_service::CommunicationService, locators::BlockLocators};
4850
use snarkos_node_tcp::{
@@ -53,6 +55,8 @@ use snarkos_node_tcp::{
5355
Tcp,
5456
protocols::{Disconnect, Handshake, OnConnect, Reading, Writing},
5557
};
58+
59+
use aleo_std::StorageMode;
5660
use snarkvm::{
5761
console::prelude::*,
5862
ledger::{
@@ -61,6 +65,7 @@ use snarkvm::{
6165
narwhal::{BatchCertificate, BatchHeader, Data},
6266
},
6367
prelude::{Address, Field},
68+
utilities::task::{self, JoinHandle},
6469
};
6570

6671
use colored::Colorize;
@@ -82,7 +87,6 @@ use std::{
8287
use tokio::{
8388
net::TcpStream,
8489
sync::{OnceCell, oneshot},
85-
task::{self, JoinHandle},
8690
};
8791
use tokio_stream::StreamExt;
8892
use tokio_util::codec::Framed;
@@ -215,8 +219,14 @@ impl<N: Network> Gateway<N> {
215219
(None, None) => SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, MEMORY_POOL_PORT)),
216220
(Some(ip), _) => ip,
217221
};
222+
223+
// Allow at most as many connections as the maximum committe size.
224+
// and fail if the chosen port is not available.
225+
let mut tcp_config = Config::new(ip, Committee::<N>::max_committee_size()?);
226+
tcp_config.allow_random_port = false;
227+
218228
// Initialize the TCP stack.
219-
let tcp = Tcp::new(Config::new(ip, Committee::<N>::max_committee_size()?));
229+
let tcp = Tcp::new(tcp_config);
220230

221231
// Prepare the collection of the initial peers.
222232
let mut initial_peers = HashMap::new();
@@ -609,19 +619,14 @@ impl<N: Network> Gateway<N> {
609619
}
610620

611621
let self_ = self.clone();
612-
let blocks = match task::spawn_blocking(move || {
622+
let blocks = task::spawn_blocking(move || {
613623
// Retrieve the blocks within the requested range.
614624
match self_.ledger.get_blocks(start_height..end_height) {
615625
Ok(blocks) => Ok(Data::Object(DataBlocks(blocks))),
616626
Err(error) => bail!("Missing blocks {start_height} to {end_height} from ledger - {error}"),
617627
}
618628
})
619-
.await
620-
{
621-
Ok(Ok(blocks)) => blocks,
622-
Ok(Err(error)) => return Err(error),
623-
Err(error) => return Err(anyhow!("[BlockRequest] {error}")),
624-
};
629+
.await?;
625630

626631
let self_ = self.clone();
627632
tokio::spawn(async move {
@@ -870,7 +875,7 @@ impl<N: Network> Gateway<N> {
870875
/// Spawns a task with the given future; it should only be used for long-running tasks.
871876
#[allow(dead_code)]
872877
fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
873-
self.handles.lock().push(tokio::spawn(future));
878+
self.handles.lock().push(task::spawn(future));
874879
}
875880

876881
/// Shuts down the gateway.
@@ -1510,7 +1515,7 @@ impl<N: Network> Gateway<N> {
15101515
return Some(DisconnectReason::InvalidChallengeResponse);
15111516
}
15121517
// Perform the deferred non-blocking deserialization of the signature.
1513-
let Ok(signature) = spawn_blocking!(signature.deserialize_blocking()) else {
1518+
let Ok(signature) = task::spawn_blocking(|| signature.deserialize_blocking()).await else {
15141519
warn!("{CONTEXT} Gateway handshake with '{peer_addr}' failed (cannot deserialize the signature)");
15151520
return Some(DisconnectReason::InvalidChallengeResponse);
15161521
};
@@ -1621,17 +1626,11 @@ mod prop_tests {
16211626

16221627
impl GatewayAddress {
16231628
fn ip(&self) -> Option<SocketAddr> {
1624-
if let GatewayAddress::Prod(ip) = self {
1625-
return *ip;
1626-
}
1627-
None
1629+
if let GatewayAddress::Prod(ip) = self { *ip } else { None }
16281630
}
16291631

16301632
fn port(&self) -> Option<u16> {
1631-
if let GatewayAddress::Dev(port) = self {
1632-
return Some(*port as u16);
1633-
}
1634-
None
1633+
if let GatewayAddress::Dev(port) = self { Some(*port as u16) } else { None }
16351634
}
16361635
}
16371636

@@ -1689,8 +1688,8 @@ mod prop_tests {
16891688
.boxed()
16901689
}
16911690

1692-
#[proptest]
1693-
fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
1691+
#[proptest(async = "tokio")]
1692+
async fn gateway_dev_initialization(#[strategy(any_valid_dev_gateway())] input: GatewayInput) {
16941693
let (storage, _, private_key, dev) = input;
16951694
let account = Account::try_from(private_key).unwrap();
16961695

@@ -1711,10 +1710,13 @@ mod prop_tests {
17111710
let tcp_config = gateway.tcp().config();
17121711
assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
17131712
assert_eq!(gateway.account().address(), account.address());
1713+
1714+
// Ensure the gateway shuts down and unbinds the TCP port.
1715+
gateway.shut_down().await;
17141716
}
17151717

1716-
#[proptest]
1717-
fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
1718+
#[proptest(async = "tokio")]
1719+
async fn gateway_prod_initialization(#[strategy(any_valid_prod_gateway())] input: GatewayInput) {
17181720
let (storage, _, private_key, dev) = input;
17191721
let account = Account::try_from(private_key).unwrap();
17201722

@@ -1740,6 +1742,9 @@ mod prop_tests {
17401742
let tcp_config = gateway.tcp().config();
17411743
assert_eq!(tcp_config.max_connections, Committee::<CurrentNetwork>::max_committee_size().unwrap());
17421744
assert_eq!(gateway.account().address(), account.address());
1745+
1746+
// Ensure the gateway shuts down and unbinds the TCP port.
1747+
gateway.shut_down().await;
17431748
}
17441749

17451750
#[proptest(async = "tokio")]
@@ -1793,6 +1798,9 @@ mod prop_tests {
17931798
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), MEMORY_POOL_PORT + dev.port().unwrap())
17941799
);
17951800
assert_eq!(gateway.num_workers(), workers.len() as u8);
1801+
1802+
// Ensure the gateway shuts down and unbinds the TCP port.
1803+
gateway.shut_down().await;
17961804
}
17971805

17981806
#[proptest]

node/bft/src/lib.rs

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -69,14 +69,3 @@ pub const MAX_WORKERS: u8 = 1; // worker(s)
6969
pub const PRIMARY_PING_IN_MS: u64 = 2 * MAX_BATCH_DELAY_IN_MS; // ms
7070
/// The interval at which each worker broadcasts a ping to every other node.
7171
pub const WORKER_PING_IN_MS: u64 = 4 * MAX_BATCH_DELAY_IN_MS; // ms
72-
73-
/// A helper macro to spawn a blocking task.
74-
#[macro_export]
75-
macro_rules! spawn_blocking {
76-
($expr:expr) => {
77-
match tokio::task::spawn_blocking(move || $expr).await {
78-
Ok(value) => value,
79-
Err(error) => Err(anyhow::anyhow!("[tokio::spawn_blocking] {error}")),
80-
}
81-
};
82-
}

0 commit comments

Comments
 (0)