Skip to content

Commit f80ee2a

Browse files
committed
Add prefix_mapping to Discovery
1 parent 115416e commit f80ee2a

File tree

2 files changed

+87
-7
lines changed

2 files changed

+87
-7
lines changed

beacon_node/lighthouse_network/src/discovery/mod.rs

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@ mod prefix_mapping;
5252
mod subnet_predicate;
5353

5454
use crate::discovery::enr::{NEXT_FORK_DIGEST_ENR_KEY, PEERDAS_CUSTODY_GROUP_COUNT_ENR_KEY};
55+
use crate::discovery::prefix_mapping::PrefixMapping;
5556
pub use subnet_predicate::subnet_predicate;
5657
use types::non_zero_usize::new_non_zero_usize;
5758

@@ -103,6 +104,19 @@ struct SubnetQuery {
103104
subnet: Subnet,
104105
min_ttl: Option<Instant>,
105106
retries: usize,
107+
target: QueryTarget,
108+
}
109+
110+
/// Target for a peer discovery query.
111+
///
112+
/// Specifies which peers to search for during a discovery query.
113+
#[derive(Debug, Clone, PartialEq)]
114+
pub enum QueryTarget {
115+
/// Query for random peers in the network.
116+
Random,
117+
/// Query for peers with specific node ID prefixes.
118+
/// Used for deterministic subnet peer discovery to find peers in specific DHT keyspace regions.
119+
Prefix(Vec<NodeId>),
106120
}
107121

108122
impl SubnetQuery {
@@ -127,6 +141,7 @@ impl std::fmt::Debug for SubnetQuery {
127141
.field("subnet", &self.subnet)
128142
.field("min_ttl_secs", &min_ttl_secs)
129143
.field("retries", &self.retries)
144+
.field("target", &self.target)
130145
.finish()
131146
}
132147
}
@@ -197,6 +212,11 @@ pub struct Discovery<E: EthSpec> {
197212
update_ports: UpdatePorts,
198213

199214
spec: Arc<ChainSpec>,
215+
216+
/// Mapping from attestation subnet IDs to DHT key prefixes.
217+
/// Used for deterministic subnet peer discovery to target specific regions of the DHT keyspace
218+
/// when searching for subnet peers.
219+
prefix_mapping: PrefixMapping,
200220
}
201221

202222
impl<E: EthSpec> Discovery<E> {
@@ -330,6 +350,7 @@ impl<E: EthSpec> Discovery<E> {
330350
update_ports,
331351
enr_dir,
332352
spec: Arc::new(spec.clone()),
353+
prefix_mapping: PrefixMapping::new(spec.clone()),
333354
})
334355
}
335356

@@ -375,7 +396,33 @@ impl<E: EthSpec> Discovery<E> {
375396
);
376397

377398
for subnet in subnets_to_discover {
378-
self.add_subnet_query(subnet.subnet, subnet.min_ttl, 0);
399+
let query_target = match subnet.subnet {
400+
Subnet::Attestation(subnet_id) => {
401+
match self.prefix_mapping.get_prefixed_node_ids(&subnet_id) {
402+
Ok(mut node_ids) => {
403+
if node_ids.is_empty() {
404+
warn!(
405+
?subnet_id,
406+
"No NodeIds given for prefix search, falling back to random search."
407+
);
408+
QueryTarget::Random
409+
} else {
410+
QueryTarget::Prefix(node_ids)
411+
}
412+
}
413+
Err(error) => {
414+
warn!(
415+
?error,
416+
?subnet_id,
417+
"Failed to get NodeIds for prefix search, falling back to random search."
418+
);
419+
QueryTarget::Random
420+
}
421+
}
422+
}
423+
Subnet::SyncCommittee(_) | Subnet::DataColumn(_) => QueryTarget::Random,
424+
};
425+
self.add_subnet_query(subnet.subnet, subnet.min_ttl, 0, query_target);
379426
}
380427
}
381428

@@ -660,7 +707,13 @@ impl<E: EthSpec> Discovery<E> {
660707

661708
/// Adds a subnet query if one doesn't exist. If a subnet query already exists, this
662709
/// updates the min_ttl field.
663-
fn add_subnet_query(&mut self, subnet: Subnet, min_ttl: Option<Instant>, retries: usize) {
710+
fn add_subnet_query(
711+
&mut self,
712+
subnet: Subnet,
713+
min_ttl: Option<Instant>,
714+
retries: usize,
715+
target: QueryTarget,
716+
) {
664717
// remove the entry and complete the query if greater than the maximum search count
665718
if retries > MAX_DISCOVERY_RETRY {
666719
debug!("Subnet peer discovery did not find sufficient peers. Reached max retry limit");
@@ -689,6 +742,7 @@ impl<E: EthSpec> Discovery<E> {
689742
subnet,
690743
min_ttl,
691744
retries,
745+
target,
692746
});
693747
metrics::set_gauge(
694748
&discovery_metrics::DISCOVERY_QUEUE,
@@ -884,7 +938,12 @@ impl<E: EthSpec> Discovery<E> {
884938
"Grouped subnet discovery query yielded no results."
885939
);
886940
queries.iter().for_each(|query| {
887-
self.add_subnet_query(query.subnet, query.min_ttl, query.retries + 1);
941+
self.add_subnet_query(
942+
query.subnet,
943+
query.min_ttl,
944+
query.retries + 1,
945+
query.target.clone(),
946+
);
888947
})
889948
}
890949
Ok(r) => {
@@ -916,7 +975,12 @@ impl<E: EthSpec> Discovery<E> {
916975
v.inc();
917976
}
918977
// A subnet query has completed. Add back to the queue, incrementing retries.
919-
self.add_subnet_query(query.subnet, query.min_ttl, query.retries + 1);
978+
self.add_subnet_query(
979+
query.subnet,
980+
query.min_ttl,
981+
query.retries + 1,
982+
query.target.clone(),
983+
);
920984

921985
// Check the specific subnet against the enr
922986
let subnet_predicate =
@@ -1283,17 +1347,24 @@ mod tests {
12831347
subnet: Subnet::Attestation(SubnetId::new(1)),
12841348
min_ttl: Some(now),
12851349
retries: 0,
1350+
target: QueryTarget::Random,
12861351
};
12871352
discovery.add_subnet_query(
12881353
subnet_query.subnet,
12891354
subnet_query.min_ttl,
12901355
subnet_query.retries,
1356+
subnet_query.target.clone(),
12911357
);
12921358
assert_eq!(discovery.queued_queries.back(), Some(&subnet_query));
12931359

12941360
// New query should replace old query
12951361
subnet_query.min_ttl = Some(now + Duration::from_secs(1));
1296-
discovery.add_subnet_query(subnet_query.subnet, subnet_query.min_ttl, 1);
1362+
discovery.add_subnet_query(
1363+
subnet_query.subnet,
1364+
subnet_query.min_ttl,
1365+
1,
1366+
subnet_query.target.clone(),
1367+
);
12971368

12981369
subnet_query.retries += 1;
12991370

@@ -1309,6 +1380,7 @@ mod tests {
13091380
subnet_query.subnet,
13101381
subnet_query.min_ttl,
13111382
MAX_DISCOVERY_RETRY + 1,
1383+
subnet_query.target,
13121384
);
13131385

13141386
assert_eq!(discovery.queued_queries.len(), 0);
@@ -1344,11 +1416,13 @@ mod tests {
13441416
subnet: Subnet::Attestation(SubnetId::new(1)),
13451417
min_ttl: instant1,
13461418
retries: 0,
1419+
target: QueryTarget::Random,
13471420
},
13481421
SubnetQuery {
13491422
subnet: Subnet::Attestation(SubnetId::new(2)),
13501423
min_ttl: instant2,
13511424
retries: 0,
1425+
target: QueryTarget::Random,
13521426
},
13531427
]);
13541428

beacon_node/lighthouse_network/src/discovery/prefix_mapping.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use alloy_primitives::U256;
22
use discv5::enr::NodeId;
3+
use rand::prelude::SliceRandom;
34
use std::collections::HashMap;
45
use types::{ChainSpec, SubnetId};
56

@@ -29,7 +30,7 @@ impl PrefixMapping {
2930
// prefix bits.
3031
let mask = U256::from(2_i32.pow(prefix_bits) - 1) << (256 - prefix_bits);
3132

32-
Ok(self
33+
let mut node_ids = self
3334
.mapping
3435
.get(subnet_id)
3536
.ok_or("No prefix mapping for subnet_id")?
@@ -46,7 +47,12 @@ impl PrefixMapping {
4647

4748
NodeId::from(raw_node_id)
4849
})
49-
.collect::<Vec<_>>())
50+
.collect::<Vec<_>>();
51+
// Shuffle the order of `NodeId`s to avoid always querying the same prefixes first and to
52+
// distribute discovery queries more evenly across the keyspace.
53+
node_ids.shuffle(&mut rand::rng());
54+
55+
Ok(node_ids)
5056
}
5157
}
5258

0 commit comments

Comments
 (0)