Skip to content

Commit 82be99e

Browse files
committed
introduce new type
1 parent a1bf3ce commit 82be99e

File tree

8 files changed

+178
-23
lines changed

8 files changed

+178
-23
lines changed

Cargo.lock

Lines changed: 10 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/corro-agent/src/agent/handlers.rs

Lines changed: 30 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ use corro_types::{
2424
actor::{Actor, ActorId},
2525
agent::{Agent, Bookie, SplitPool},
2626
base::CrsqlSeq,
27-
broadcast::{BroadcastInput, BroadcastV1, ChangeSource, ChangeV1, FocaInput},
27+
broadcast::{BroadcastInput, BroadcastV1, BroadcastV2, ChangeSource, ChangeV1, FocaInput},
2828
channel::CorroReceiver,
2929
members::MemberAddedResult,
3030
sync::generate_sync,
@@ -690,7 +690,7 @@ pub async fn handle_changes(
690690
continue;
691691
}
692692

693-
let src_str: &'static str = src.into();
693+
let src_str: &'static str = (&src).into();
694694
let recv_lag = change.ts().and_then(|ts| {
695695
let mut our_ts = Timestamp::from(agent.clock().new_timestamp());
696696
if ts > our_ts {
@@ -705,7 +705,10 @@ pub async fn handle_changes(
705705
Some((our_ts.0 - ts.0).to_duration())
706706
});
707707

708-
if matches!(src, ChangeSource::Broadcast) {
708+
if matches!(
709+
src,
710+
ChangeSource::Broadcast | ChangeSource::BroadcastV2(_, _)
711+
) {
709712
counter!("corro.broadcast.recv.count", "kind" => "change").increment(1);
710713
}
711714

@@ -766,20 +769,32 @@ pub async fn handle_changes(
766769
}
767770
}
768771

769-
assert_sometimes!(
770-
matches!(src, ChangeSource::Sync),
771-
"Corrosion receives changes through sync"
772-
);
773-
if matches!(src, ChangeSource::Broadcast) && !change.is_empty() {
774-
assert_sometimes!(true, "Corrosion rebroadcasts changes");
775-
if let Err(_e) =
776-
agent
777-
.tx_bcast()
778-
.try_send(BroadcastInput::Rebroadcast(BroadcastV1::Change(
772+
if !change.is_empty() {
773+
let bcast = match src.clone() {
774+
ChangeSource::Broadcast => {
775+
assert_sometimes!(true, "Corrosion rebroadcasts changes");
776+
Some(BroadcastInput::Rebroadcast(BroadcastV1::Change(
779777
change.clone(),
780778
)))
781-
{
782-
debug!("broadcasts are full or done!");
779+
}
780+
ChangeSource::BroadcastV2(set, num_broadcasts) => {
781+
assert_sometimes!(true, "Corrosion rebroadcasts changes");
782+
Some(BroadcastInput::RebroadcastV2(BroadcastV2 {
783+
change: BroadcastV1::Change(change.clone()),
784+
set,
785+
num_broadcasts,
786+
}))
787+
}
788+
ChangeSource::Sync => {
789+
assert_sometimes!(true, "Corrosion receives changes through sync");
790+
None
791+
}
792+
};
793+
794+
if let Some(bcast) = bcast {
795+
if let Err(_e) = agent.tx_bcast().try_send(bcast) {
796+
debug!("broadcasts are full or done!");
797+
}
783798
}
784799
}
785800

crates/corro-agent/src/agent/uni.rs

Lines changed: 28 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use corro_types::{
22
actor::ClusterId,
3-
broadcast::{BroadcastV1, ChangeSource, ChangeV1, UniPayload, UniPayloadV1},
3+
broadcast::{BroadcastV1, BroadcastV2, ChangeSource, ChangeV1, UniPayload, UniPayloadV1},
44
channel::CorroSender,
55
};
66
use metrics::counter;
@@ -66,16 +66,38 @@ pub fn spawn_unipayload_handler(
6666

6767
match payload {
6868
UniPayload::V1 {
69-
data:
70-
UniPayloadV1::Broadcast(BroadcastV1::Change(
71-
change,
72-
)),
69+
data: payload_data,
7370
cluster_id: payload_cluster_id,
7471
} => {
7572
if cluster_id != payload_cluster_id {
7673
continue;
7774
}
78-
changes.push((change, ChangeSource::Broadcast));
75+
76+
match payload_data {
77+
UniPayloadV1::Broadcast(
78+
BroadcastV1::Change(change),
79+
) => {
80+
changes.push((
81+
change,
82+
ChangeSource::Broadcast,
83+
));
84+
}
85+
UniPayloadV1::BroadcastV2(
86+
BroadcastV2 {
87+
change: BroadcastV1::Change(change),
88+
set,
89+
num_broadcasts,
90+
},
91+
) => {
92+
changes.push((
93+
change,
94+
ChangeSource::BroadcastV2(
95+
set,
96+
num_broadcasts,
97+
),
98+
));
99+
}
100+
}
79101
}
80102
}
81103
}

crates/corro-agent/src/broadcast/mod.rs

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,10 @@ use tripwire::Tripwire;
3838
use corro_types::{
3939
actor::{Actor, ActorId},
4040
agent::Agent,
41-
broadcast::{BroadcastInput, DispatchRuntime, FocaCmd, FocaInput, UniPayload, UniPayloadV1},
41+
broadcast::{
42+
BroadcastChangeV1, BroadcastInput, BroadcastV1, BroadcastV2, DispatchRuntime, FocaCmd,
43+
FocaInput, UniPayload, UniPayloadV1,
44+
},
4245
channel::{bounded, CorroReceiver, CorroSender},
4346
};
4447

@@ -523,6 +526,10 @@ async fn handle_broadcasts(
523526
let (bcast, is_local) = match input {
524527
BroadcastInput::Rebroadcast(bcast) => (bcast, false),
525528
BroadcastInput::AddBroadcast(bcast) => (bcast, true),
529+
BroadcastInput::RebroadcastV2(bcast) => {
530+
let BroadcastV2::Change(BroadcastChangeV1 { change, .. }) = bcast;
531+
(BroadcastV1::Change(change), false)
532+
}
526533
};
527534
trace!("adding broadcast: {bcast:?}, local? {is_local}");
528535

crates/corro-types/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ config = { workspace = true }
1919
consul-client = { version = "0.1.0-alpha.0", path = "../consul-client" }
2020
corro-api-types = { version = "0.1.0-alpha.1", path = "../corro-api-types" }
2121
corro-base-types = { version = "0.1.0-alpha.1", path = "../corro-base-types" }
22+
probabilistic-set = { version = "0.1.0-alpha.1", path = "../probabilistic-set" }
2223
deadpool = { workspace = true }
2324
enquote = { workspace = true }
2425
fallible-iterator = { workspace = true }

crates/corro-types/src/broadcast.rs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use corro_base_types::{CrsqlDbVersionRange, CrsqlSeqRange};
1010
use foca::{Identity, Member, Notification, Runtime, Timer};
1111
use indexmap::{map::Entry, IndexMap};
1212
use metrics::counter;
13+
use probabilistic_set::ProbabilisticSet;
1314
use rusqlite::{
1415
types::{FromSql, FromSqlError},
1516
ToSql,
@@ -48,6 +49,7 @@ pub enum UniPayload {
4849
#[derive(Debug, Clone, Readable, Writable)]
4950
pub enum UniPayloadV1 {
5051
Broadcast(BroadcastV1),
52+
BroadcastV2(BroadcastV2),
5153
}
5254

5355
#[derive(Debug, Clone, Readable, Writable)]
@@ -94,6 +96,13 @@ pub enum BroadcastV1 {
9496
Change(ChangeV1),
9597
}
9698

99+
#[derive(Clone, Debug, Readable, Writable)]
100+
pub struct BroadcastV2 {
101+
pub change: BroadcastV1,
102+
pub set: ProbSet,
103+
pub num_broadcasts: u8,
104+
}
105+
97106
#[derive(Debug, Clone, PartialEq, Readable, Writable)]
98107
pub struct ColumnChange {
99108
pub cid: ColumnName,
@@ -103,11 +112,12 @@ pub struct ColumnChange {
103112
pub cl: i64,
104113
}
105114

106-
#[derive(Debug, Clone, Copy, strum::IntoStaticStr)]
115+
#[derive(Debug, Clone, strum::IntoStaticStr)]
107116
#[strum(serialize_all = "snake_case")]
108117
pub enum ChangeSource {
109118
Broadcast,
110119
Sync,
120+
BroadcastV2(ProbabilisticSet, u64),
111121
}
112122

113123
#[derive(Debug, Clone, PartialEq, Readable, Writable)]
@@ -586,6 +596,7 @@ pub enum BroadcastDecodeError {
586596
pub enum BroadcastInput {
587597
Rebroadcast(BroadcastV1),
588598
AddBroadcast(BroadcastV1),
599+
RebroadcastV2(BroadcastV2),
589600
}
590601

591602
pub struct DispatchRuntime<T> {
Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
[package]
2+
name = "probabilistic-set"
3+
version = "0.1.0"
4+
edition = "2024"
5+
6+
[dependencies]
7+
rand = { workspace = true }
8+
serde = { workspace = true }
9+
speedy = { workspace = true }
10+
11+
[lints]
12+
workspace = true
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
use rand::Rng;
2+
use serde::{Deserialize, Serialize};
3+
use speedy::{Readable, Writable};
4+
use std::collections::hash_map::DefaultHasher;
5+
use std::hash::{Hash, Hasher};
6+
7+
#[derive(Serialize, Debug, Deserialize, Clone, Readable, Writable)]
8+
pub struct ProbabilisticSet {
9+
// todo: consider making bits and array?
10+
bits: Vec<u8>,
11+
size_bits: usize,
12+
seed: u64, // Store the random seed
13+
}
14+
15+
impl ProbabilisticSet {
16+
pub fn new(expected_items: usize, bits_per_item: usize) -> Self {
17+
let size_bits = expected_items * bits_per_item;
18+
let seed = rand::thread_rng().r#gen(); // Generate random seed
19+
20+
ProbabilisticSet {
21+
bits: vec![0; (size_bits + 7) / 8],
22+
size_bits,
23+
seed,
24+
}
25+
}
26+
27+
// Create with a specific seed (useful for testing or controlled randomness)
28+
pub fn with_seed(expected_items: usize, bits_per_item: usize, seed: u64) -> Self {
29+
let size_bits: usize = expected_items * bits_per_item;
30+
ProbabilisticSet {
31+
bits: vec![0; (size_bits + 7) / 8],
32+
size_bits,
33+
seed,
34+
}
35+
}
36+
37+
pub fn insert(&mut self, item: u128) {
38+
let idx = self.hash(item) % self.size_bits;
39+
let byte_idx = idx / 8;
40+
let bit_idx = idx % 8;
41+
self.bits[byte_idx] |= 1 << bit_idx;
42+
}
43+
44+
pub fn contains(&self, item: u128) -> bool {
45+
let idx = self.hash(item) % self.size_bits;
46+
let byte_idx = idx / 8;
47+
let bit_idx = idx % 8;
48+
self.bits[byte_idx] & (1 << bit_idx) != 0
49+
}
50+
51+
fn hash(&self, item: u128) -> usize {
52+
let mut hasher = DefaultHasher::new();
53+
// Hash both the seed and the item
54+
self.seed.hash(&mut hasher);
55+
item.hash(&mut hasher);
56+
hasher.finish() as usize
57+
}
58+
59+
// Get current seed (useful for debugging)
60+
pub fn seed(&self) -> u64 {
61+
self.seed
62+
}
63+
64+
// Change the seed (and clear the bits since the hash function effectively changes)
65+
pub fn reseed(&mut self) {
66+
self.seed = rand::thread_rng().r#gen();
67+
self.bits.fill(0);
68+
}
69+
70+
pub fn size_bytes(&self) -> usize {
71+
// Include size of seed in total
72+
self.bits.len() + std::mem::size_of::<u64>()
73+
}
74+
}
75+
76+
#[cfg(test)]
77+
mod tests {}

0 commit comments

Comments
 (0)