Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions node/bft/src/gateway.rs
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ impl<N: Network> Gateway<N> {
(self.account.address() != aleo_addr
&& !self.is_connected_address(aleo_addr)
&& self.is_authorized_validator_address(aleo_addr))
.then_some(listener_addr)
.then_some((listener_addr, None))
})
.collect::<Vec<_>>();
if !valid_addrs.is_empty() {
Expand Down Expand Up @@ -931,10 +931,10 @@ impl<N: Network> Gateway<N> {
// The trusted ones are already handled by `handle_trusted_validators`.
let trusted_validators = self.trusted_peers();
if self.number_of_connected_peers() < N::LATEST_MAX_CERTIFICATES().unwrap() as usize {
for candidate_addr in self.candidate_peers() {
if !trusted_validators.contains(&candidate_addr) {
for peer in self.get_candidate_peers() {
if !trusted_validators.contains(&peer.listener_addr) {
// Attempt to connect to unconnected validators.
self.connect(candidate_addr);
self.connect(peer.listener_addr);
}
}

Expand Down
72 changes: 63 additions & 9 deletions node/router/messages/src/peer_response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::borrow::Cow;

#[derive(Clone, Debug, PartialEq, Eq)]
pub struct PeerResponse {
pub peers: Vec<SocketAddr>,
pub peers: Vec<(SocketAddr, Option<u32>)>,
}

impl MessageTrait for PeerResponse {
Expand All @@ -39,20 +39,59 @@ impl ToBytes for PeerResponse {
return Err(io::Error::new(io::ErrorKind::InvalidInput, format!("Too many peers: {}", self.peers.len())));
}

// A version indicator; we don't expect empty peer responses, so a zero value can serve
// as an indicator that this message is to be processed differently. The version value
// can be changed to a 2 in the future, once everyone expects it there.
0u8.write_le(&mut writer)?;

(self.peers.len() as u8).write_le(&mut writer)?;
for peer in self.peers.iter() {
peer.write_le(&mut writer)?;
for (addr, height) in self.peers.iter() {
addr.write_le(&mut writer)?;
if let Some(h) = height {
1u8.write_le(&mut writer)?;
h.write_le(&mut writer)?;
} else {
0u8.write_le(&mut writer)?;
}
}
Ok(())
}
}

impl FromBytes for PeerResponse {
fn read_le<R: io::Read>(mut reader: R) -> io::Result<Self> {
let count = u8::read_le(&mut reader)?;
// Read the peer count if their heights aren't present; otherwise, interpret this value
// as the message version. It is a workaround for a currently missing version value.
// The worst-case scenario is if a node hasn't updated, and it gets a `PeerRequest` from
// its only peer who has; this would cause it to return a message that appears as if it
// contains heights (due to a leading `0`), but it would end up failing to deserialize.
// TODO: after a release or two, we should always be expecting the version to be present,
// simplifying the deserialization; also, remove the `empty_old_peerlist_handling` test.
let mut contains_heights = false;
let count_or_version = u8::read_le(&mut reader)?;
let count = if count_or_version == 0 {
// Version indicator found; this message will contain optional heights.
contains_heights = true;
// If the first value is a zero, the next u8 is the peer count.
u8::read_le(&mut reader)?
} else {
// A non-zero value indicates that this is the "old" PeerResponse without heights.
count_or_version
};

let mut peers = Vec::with_capacity(count as usize);
for _ in 0..count {
peers.push(SocketAddr::read_le(&mut reader)?);
let addr = SocketAddr::read_le(&mut reader)?;
let height = if contains_heights {
match u8::read_le(&mut reader)? {
1 => Some(u32::read_le(&mut reader)?),
0 => None,
_ => return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid peer height".to_string())),
}
} else {
None
};
peers.push((addr, height));
}

Ok(Self { peers })
Expand All @@ -69,14 +108,19 @@ pub mod prop_tests {
collection::vec,
prelude::{BoxedStrategy, Strategy, any},
};
use std::net::{IpAddr, SocketAddr};
use std::{
io,
net::{IpAddr, SocketAddr},
};
use test_strategy::proptest;

pub fn any_valid_socket_addr() -> BoxedStrategy<SocketAddr> {
any::<(IpAddr, u16)>().prop_map(|(ip_addr, port)| SocketAddr::new(ip_addr, port)).boxed()
pub fn any_valid_socket_addr() -> BoxedStrategy<(SocketAddr, Option<u32>)> {
any::<(IpAddr, u16, Option<u32>)>()
.prop_map(|(ip_addr, port, height)| (SocketAddr::new(ip_addr, port), height))
.boxed()
}

pub fn any_vec() -> BoxedStrategy<Vec<SocketAddr>> {
pub fn any_vec() -> BoxedStrategy<Vec<(SocketAddr, Option<u32>)>> {
vec(any_valid_socket_addr(), 0..50).prop_map(|v| v).boxed()
}

Expand All @@ -91,4 +135,14 @@ pub mod prop_tests {
let decoded = PeerResponse::read_le(&mut bytes.into_inner().reader()).unwrap();
assert_eq!(decoded, peer_response);
}

// The following test will be obsolete once all the nodes handle heights in the `PeerResponse`.
#[test]
fn empty_old_peerlist_handling() {
// An empty `PeerResponse` without heights contains a single 0u8.
let serialized = &[0u8];
let deserialized = PeerResponse::read_le(&serialized[..]).unwrap_err();
// Check for the expected error.
assert_eq!(deserialized.kind(), io::ErrorKind::UnexpectedEof);
}
}
18 changes: 15 additions & 3 deletions node/router/src/heartbeat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,9 +239,21 @@ pub trait Heartbeat<N: Network>: Outbound<N> {
// Initialize an RNG.
let rng = &mut OsRng;

// Attempt to connect to more peers.
for peer_ip in self.router().candidate_peers().into_iter().choose_multiple(rng, num_deficient) {
self.router().connect(peer_ip);
// Attempt to connect to more peers, separately choosing from those at a greater block
// height, and those whose height is lower or unknown to us.
let own_height = self.router().ledger.latest_block_height();
let (higher_peers, other_peers): (Vec<_>, Vec<_>) = self
.router()
.get_candidate_peers()
.into_iter()
.partition(|peer| peer.last_height_seen.map(|h| h > own_height).unwrap_or(false));
// We may not know of half of `num_deficient` candidates; account for it using `min`.
let num_higher_peers = num_deficient.div_ceil(2).min(higher_peers.len());
for peer in higher_peers.into_iter().choose_multiple(rng, num_higher_peers) {
self.router().connect(peer.listener_addr);
}
for peer in other_peers.into_iter().choose_multiple(rng, num_deficient - num_higher_peers) {
self.router().connect(peer.listener_addr);
}

if self.router().allow_external_peers() {
Expand Down
10 changes: 8 additions & 2 deletions node/router/src/helpers/peer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub struct CandidatePeer {
pub listener_addr: SocketAddr,
/// Indicates whether the peer is considered trusted.
pub trusted: bool,
/// The latest block height known to be associated with the peer.
pub last_height_seen: Option<u32>,
}

/// A fully connected peer.
Expand Down Expand Up @@ -73,7 +75,7 @@ pub struct ConnectedPeer<N: Network> {
impl<N: Network> Peer<N> {
/// Create a candidate peer.
pub const fn new_candidate(listener_addr: SocketAddr, trusted: bool) -> Self {
Self::Candidate(CandidatePeer { listener_addr, trusted })
Self::Candidate(CandidatePeer { listener_addr, trusted, last_height_seen: None })
}

/// Create a connecting peer.
Expand Down Expand Up @@ -114,7 +116,11 @@ impl<N: Network> Peer<N> {

/// Demote a peer to candidate status, marking it as disconnected.
pub fn downgrade_to_candidate(&mut self, listener_addr: SocketAddr) {
*self = Self::new_candidate(listener_addr, self.is_trusted());
*self = Self::Candidate(CandidatePeer {
listener_addr,
trusted: self.is_trusted(),
last_height_seen: self.last_height_seen(),
});
}

/// Returns the type of the node (only applicable to connected peers).
Expand Down
4 changes: 2 additions & 2 deletions node/router/src/inbound.rs
Original file line number Diff line number Diff line change
Expand Up @@ -341,15 +341,15 @@ pub trait Inbound<N: Network>: Reading + Outbound<N> {

// Truncate and convert to socket addrs.
peers.truncate(MAX_PEERS_TO_SEND);
let peers = peers.into_iter().map(|peer| peer.listener_addr).collect();
let peers = peers.into_iter().map(|peer| (peer.listener_addr, peer.last_height_seen)).collect();

// Send a `PeerResponse` message to the peer.
self.router().send(peer_ip, Message::PeerResponse(PeerResponse { peers }));
true
}

/// Handles a `PeerResponse` message.
fn peer_response(&self, _peer_ip: SocketAddr, peers: Vec<SocketAddr>) -> bool {
fn peer_response(&self, _peer_ip: SocketAddr, peers: Vec<(SocketAddr, Option<u32>)>) -> bool {
// Check if the number of peers received is less than MAX_PEERS_TO_SEND.
if peers.len() > MAX_PEERS_TO_SEND {
return false;
Expand Down
50 changes: 34 additions & 16 deletions node/router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -212,17 +212,27 @@ pub trait PeerPoolHandling<N: Network>: P2P {
}

/// Adds new candidate peers to the peer pool, ensuring their validity and following the
/// limit on the number of peers in the pool.
fn insert_candidate_peers(&self, mut listener_addrs: Vec<SocketAddr>) {
/// limit on the number of peers in the pool. The listener addresses may be paired with
/// the last known block height of the associated peer.
fn insert_candidate_peers(&self, mut listener_addrs: Vec<(SocketAddr, Option<u32>)>) {
// Hold a write guard from now on, so as not to accidentally slash multiple times
// based on multiple batches of candidate peers, and to not overwrite any entries.
let mut peer_pool = self.peer_pool().write();

// Perform filtering to ensure candidate validity.
listener_addrs.retain(|&addr| {
!peer_pool.contains_key(&addr)
&& !self.is_ip_banned(addr.ip())
// Perform filtering to ensure candidate validity. Also count how many entries are updates.
let mut num_updates: usize = 0;
listener_addrs.retain(|&(addr, height)| {
!self.is_ip_banned(addr.ip())
&& if self.is_dev() { !is_bogon_ip(addr.ip()) } else { self.is_valid_peer_ip(addr) }
&& peer_pool
.get(&addr)
.map(|peer| peer.is_candidate() && height.is_some())
.inspect(|is_valid_update| {
if *is_valid_update {
num_updates += 1
}
})
.unwrap_or(true)
});

// If we've managed to filter out every entry, there's nothing to do.
Expand All @@ -231,7 +241,9 @@ pub trait PeerPoolHandling<N: Network>: P2P {
}

// If we're about to exceed the peer pool size limit, apply candidate slashing.
if self.number_of_peers() + listener_addrs.len() >= Self::MAXIMUM_POOL_SIZE && Self::PEER_SLASHING_COUNT != 0 {
if self.number_of_peers() + listener_addrs.len() - num_updates >= Self::MAXIMUM_POOL_SIZE
&& Self::PEER_SLASHING_COUNT != 0
{
// Collect the addresses of prospect peers.
let mut peers_to_slash = peer_pool
.iter()
Expand Down Expand Up @@ -266,9 +278,18 @@ pub trait PeerPoolHandling<N: Network>: P2P {
return;
}

// Insert new candidate peers.
for addr in listener_addrs {
peer_pool.insert(addr, Peer::new_candidate(addr, false));
// Insert or update the applicable candidate peers.
for (addr, height) in listener_addrs {
match peer_pool.entry(addr) {
Entry::Vacant(entry) => {
entry.insert(Peer::new_candidate(addr, false));
}
Entry::Occupied(mut entry) => {
if let Peer::Candidate(peer) = entry.get_mut() {
peer.last_height_seen = height;
}
}
}
}
}

Expand Down Expand Up @@ -393,14 +414,11 @@ pub trait PeerPoolHandling<N: Network>: P2P {
}

/// Returns the list of candidate peers.
fn candidate_peers(&self) -> Vec<SocketAddr> {
let banned_ips = self.tcp().banned_peers().get_banned_ips();
fn get_candidate_peers(&self) -> Vec<CandidatePeer> {
self.peer_pool()
.read()
.iter()
.filter_map(|(addr, peer)| {
(matches!(peer, Peer::Candidate(_)) && !banned_ips.contains(&addr.ip())).then_some(*addr)
})
.values()
.filter_map(|peer| if let Peer::Candidate(peer) = peer { Some(peer.clone()) } else { None })
.collect()
}

Expand Down
10 changes: 7 additions & 3 deletions node/tests/peering.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,10 @@ macro_rules! test_reject_unsolicited_peer_response {
// Check the candidate peers.
assert_eq!(node.router().number_of_candidate_peers(), 0);

let peers = vec!["1.1.1.1:1111".parse().unwrap(), "2.2.2.2:2222".parse().unwrap()];
let peers = vec![
("1.1.1.1:1111".parse().unwrap(), None),
("2.2.2.2:2222".parse().unwrap(), None),
];

// Send a `PeerResponse` to the node.
assert!(
Expand All @@ -71,8 +74,9 @@ macro_rules! test_reject_unsolicited_peer_response {
deadline!(Duration::from_secs(5), move || node_clone.router().number_of_connected_peers() == 0);

// Make sure the sent addresses weren't inserted in the candidate peers.
for peer in peers {
assert!(!node.router().candidate_peers().contains(&peer));
let candidate_peer_addrs = node.router().get_candidate_peers().into_iter().map(|peer| peer.listener_addr).collect::<Vec<_>>();
for (peer, _) in peers {
assert!(!candidate_peer_addrs.contains(&peer));
}
}
}
Expand Down