Skip to content

Commit 73a4a45

Browse files
committed
Use prob set when sending broadcast
Signed-off-by: Somtochi Onyekwere <[email protected]>
1 parent 82be99e commit 73a4a45

File tree

8 files changed

+477
-137
lines changed

8 files changed

+477
-137
lines changed

Cargo.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/corro-agent/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ itertools = { workspace = true }
2828
metrics = { workspace = true }
2929
opentelemetry = { workspace = true }
3030
parking_lot = { workspace = true }
31+
probabilistic-set = { path = "../probabilistic-set" }
3132
quinn = { workspace = true }
3233
quinn-proto = { workspace = true }
3334
quinn-plaintext = { workspace = true }

crates/corro-agent/src/broadcast/mod.rs

Lines changed: 327 additions & 124 deletions
Large diffs are not rendered by default.

crates/corro-types/src/actor.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,10 @@ impl ActorId {
3737
pub fn from_bytes(bytes: [u8; 16]) -> Self {
3838
Self(Uuid::from_bytes(bytes))
3939
}
40+
41+
pub fn to_u128(&self) -> u128 {
42+
self.0.as_u128()
43+
}
4044
}
4145

4246
impl TryFrom<ActorId> for uhlc::ID {

crates/corro-types/src/broadcast.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use corro_base_types::{CrsqlDbVersionRange, CrsqlSeqRange};
1010
use foca::{Identity, Member, Notification, Runtime, Timer};
1111
use indexmap::{map::Entry, IndexMap};
1212
use metrics::counter;
13-
use probabilistic_set::ProbabilisticSet;
13+
use probabilistic_set::ProbSet;
1414
use rusqlite::{
1515
types::{FromSql, FromSqlError},
1616
ToSql,
@@ -117,7 +117,7 @@ pub struct ColumnChange {
117117
pub enum ChangeSource {
118118
Broadcast,
119119
Sync,
120-
BroadcastV2(ProbabilisticSet, u64),
120+
BroadcastV2(ProbSet, u8),
121121
}
122122

123123
#[derive(Debug, Clone, PartialEq, Readable, Writable)]
@@ -596,6 +596,7 @@ pub enum BroadcastDecodeError {
596596
pub enum BroadcastInput {
597597
Rebroadcast(BroadcastV1),
598598
AddBroadcast(BroadcastV1),
599+
AddBroadcastV2(BroadcastV2),
599600
RebroadcastV2(BroadcastV2),
600601
}
601602

crates/corro-types/src/members.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -170,10 +170,10 @@ impl Members {
170170

171171
/// Get member addresses where the ring index is `0` (meaning a
172172
/// very small RTT)
173-
pub fn ring0(&self, cluster_id: ClusterId) -> impl Iterator<Item = SocketAddr> + '_ {
174-
self.states.values().filter_map(move |v| {
173+
pub fn ring0(&self, cluster_id: ClusterId) -> impl Iterator<Item = (ActorId, SocketAddr)> + '_ {
174+
self.states.iter().filter_map(move |(id, v)| {
175175
v.ring
176-
.and_then(|ring| (v.cluster_id == cluster_id && ring == 0).then_some(v.addr))
176+
.and_then(|ring| (v.cluster_id == cluster_id && ring == 0).then_some((*id, v.addr)))
177177
})
178178
}
179179
}

crates/probabilistic-set/Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ edition = "2024"
55

66
[dependencies]
77
rand = { workspace = true }
8-
serde = { workspace = true }
8+
serde = { workspace = true, features = ["derive"] }
99
speedy = { workspace = true }
1010

11+
[dev-dependencies]
12+
uuid = { version = "1.0", features = ["v4"] }
13+
1114
[lints]
1215
workspace = true

crates/probabilistic-set/src/lib.rs

Lines changed: 133 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,24 @@ use std::collections::hash_map::DefaultHasher;
55
use std::hash::{Hash, Hasher};
66

77
#[derive(Serialize, Debug, Deserialize, Clone, Readable, Writable)]
8-
pub struct ProbabilisticSet {
8+
pub struct ProbSet {
99
// todo: consider making bits and array?
1010
bits: Vec<u8>,
1111
size_bits: usize,
1212
seed: u64, // Store the random seed
1313
}
1414

15-
impl ProbabilisticSet {
15+
impl ProbSet {
1616
pub fn new(expected_items: usize, bits_per_item: usize) -> Self {
17+
if expected_items == 0 || bits_per_item == 0 {
18+
panic!("expected_items and bits_per_item must be greater than 0");
19+
}
20+
1721
let size_bits = expected_items * bits_per_item;
1822
let seed = rand::thread_rng().r#gen(); // Generate random seed
1923

20-
ProbabilisticSet {
21-
bits: vec![0; (size_bits + 7) / 8],
24+
ProbSet {
25+
bits: vec![0; size_bits.div_ceil(8)],
2226
size_bits,
2327
seed,
2428
}
@@ -27,8 +31,8 @@ impl ProbabilisticSet {
2731
// Create with a specific seed (useful for testing or controlled randomness)
2832
pub fn with_seed(expected_items: usize, bits_per_item: usize, seed: u64) -> Self {
2933
let size_bits: usize = expected_items * bits_per_item;
30-
ProbabilisticSet {
31-
bits: vec![0; (size_bits + 7) / 8],
34+
ProbSet {
35+
bits: vec![0; size_bits.div_ceil(8)],
3236
size_bits,
3337
seed,
3438
}
@@ -74,4 +78,126 @@ impl ProbabilisticSet {
7478
}
7579

7680
#[cfg(test)]
77-
mod tests {}
81+
mod tests {
82+
use super::*;
83+
use uuid::Uuid;
84+
85+
#[test]
86+
fn test_uuid_collision_rates() {
87+
// Test different configurations with UUIDs
88+
let configs = vec![(1000, 2), (1000, 4)];
89+
90+
for (expected_items, bits_per_item) in configs {
91+
let collision_rate = measure_uuid_collision_rate(expected_items, bits_per_item, 1000);
92+
println!(
93+
"Config: {} items, {} bits/item -> Collision rate: {:.2}%",
94+
expected_items,
95+
bits_per_item,
96+
collision_rate * 100.0
97+
);
98+
}
99+
}
100+
101+
#[test]
102+
fn test_uuid_false_positive_rate() {
103+
let mut set = ProbSet::new(1000, 4);
104+
let mut inserted_uuids = Vec::new();
105+
106+
for _ in 0..500 {
107+
let uuid = Uuid::new_v4();
108+
let uuid_u128 = uuid.as_u128();
109+
set.insert(uuid_u128);
110+
inserted_uuids.push(uuid_u128);
111+
}
112+
113+
for &uuid in &inserted_uuids {
114+
assert!(set.contains(uuid), "False negative detected!");
115+
}
116+
117+
// Test false positive rate with 10,000 random UUIDs
118+
let test_size = 10000;
119+
let mut false_positives = 0;
120+
121+
for _ in 0..test_size {
122+
let test_uuid = Uuid::new_v4().as_u128();
123+
124+
if inserted_uuids.contains(&test_uuid) {
125+
continue;
126+
}
127+
128+
if set.contains(test_uuid) {
129+
false_positives += 1;
130+
}
131+
}
132+
133+
let false_positive_rate = false_positives as f64 / test_size as f64;
134+
println!("False positive rate: {:.2}%", false_positive_rate * 100.0);
135+
136+
assert!(
137+
false_positive_rate < 0.20,
138+
"False positive rate too high: {:.2}%",
139+
false_positive_rate * 100.0
140+
);
141+
}
142+
143+
#[test]
144+
fn test_capacity_vs_collision_rate() {
145+
let bits_per_item = 16;
146+
let max_capacity = 1000;
147+
148+
// Test collision rates at different capacity utilizations
149+
for utilization in [0.25, 0.5, 0.75, 1.0, 1.25, 1.5] {
150+
let num_items = (max_capacity as f64 * utilization) as usize;
151+
let collision_rate =
152+
measure_uuid_collision_rate(max_capacity, bits_per_item, num_items);
153+
154+
println!(
155+
"Utilization: {:.0}% -> Collision rate: {:.2}%",
156+
utilization * 100.0,
157+
collision_rate * 100.0
158+
);
159+
}
160+
}
161+
162+
#[test]
163+
fn test_deterministic_behavior() {
164+
// Test that same seed produces same results
165+
let seed = 12345;
166+
let mut set1 = ProbSet::with_seed(1000, 16, seed);
167+
let mut set2 = ProbSet::with_seed(1000, 16, seed);
168+
169+
let test_uuids: Vec<u128> = (0..100).map(|_| Uuid::new_v4().as_u128()).collect();
170+
171+
for &uuid in &test_uuids {
172+
set1.insert(uuid);
173+
set2.insert(uuid);
174+
}
175+
176+
for _ in 0..1000 {
177+
let test_uuid = Uuid::new_v4().as_u128();
178+
assert_eq!(
179+
set1.contains(test_uuid),
180+
set2.contains(test_uuid),
181+
"Sets with same seed should behave identically"
182+
);
183+
}
184+
}
185+
// Helper function to measure collision rate
186+
fn measure_uuid_collision_rate(
187+
expected_items: usize,
188+
bits_per_item: usize,
189+
num_test_items: usize,
190+
) -> f64 {
191+
let set = ProbSet::new(expected_items, bits_per_item);
192+
let mut unique_positions = std::collections::HashMap::new();
193+
194+
for _ in 0..num_test_items {
195+
let uuid = Uuid::new_v4().as_u128();
196+
let position = set.hash(uuid) % set.size_bits;
197+
*unique_positions.entry(position).or_insert(0) += 1;
198+
}
199+
200+
let collisions = num_test_items - unique_positions.len();
201+
collisions as f64 / num_test_items as f64
202+
}
203+
}

0 commit comments

Comments
 (0)