Skip to content

Commit e8839b2

Browse files
committed
[Ingress] ingress-client crate
- `ingress-client` implements the runtime layer that receives ingress traffic, fans it out to the correct partition, and tracks completion. It exposes: - `Ingress`, enforces inflight budgets, and resolves partition IDs before sending work downstream. - The session subsystem that batches `IngestRecords`, retries connections, and reports commit status to callers. - `ingress-core` only ingests records and notify the caller once the record is "committed" to bifrost by the PP. This makes it useful to implement kafka ingress and other external ingestion
1 parent c05c1de commit e8839b2

File tree

9 files changed

+1132
-0
lines changed

9 files changed

+1132
-0
lines changed

Cargo.lock

Lines changed: 21 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ restate-utoipa = { path = "crates/utoipa" }
8686
restate-vqueues = { path = "crates/vqueues" }
8787
restate-wal-protocol = { path = "crates/wal-protocol" }
8888
restate-worker = { path = "crates/worker" }
89+
restate-ingestion-client = { path = "crates/ingestion-client" }
8990

9091
# this workspace-hack package is overridden by a patch below to use workspace-hack subdir when building in this repo
9192
# outside this repo, the crates.io restate-workspace-hack (an empty package) will be used instead

crates/ingestion-client/Cargo.toml

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
[package]
2+
name = "restate-ingestion-client"
3+
version.workspace = true
4+
authors.workspace = true
5+
edition.workspace = true
6+
rust-version.workspace = true
7+
license.workspace = true
8+
publish = false
9+
10+
[dependencies]
11+
arc-swap = { workspace = true }
12+
dashmap = { workspace = true }
13+
futures = { workspace = true }
14+
pin-project-lite = { workspace = true }
15+
thiserror = { workspace = true }
16+
tokio-stream = { workspace = true }
17+
tokio-util = { workspace = true }
18+
tokio = { workspace = true }
19+
tracing = { workspace = true }
20+
21+
restate-core = { workspace = true }
22+
restate-types = { workspace = true }
23+
restate-workspace-hack = { workspace = true }
24+
25+
[dev-dependencies]
26+
bytes = { workspace = true }
27+
googletest = { workspace = true }
28+
test-log = { workspace = true }
29+
30+
restate-core = { workspace = true, features = ["test-util"] }
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use tokio_stream::adapters::Fuse;
12+
use tokio_stream::{Stream, StreamExt};
13+
14+
use core::pin::Pin;
15+
use core::task::{Context, Poll};
16+
use pin_project_lite::pin_project;
17+
18+
pin_project! {
19+
#[must_use = "streams do nothing unless polled"]
20+
#[derive(Debug)]
21+
pub struct ChunksSize<F, S: Stream> {
22+
#[pin]
23+
stream: Fuse<S>,
24+
items: Vec<S::Item>,
25+
size: usize,
26+
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
27+
size_fn: F,
28+
}
29+
}
30+
31+
impl<F, S: Stream> ChunksSize<F, S>
32+
where
33+
F: Fn(&S::Item) -> usize,
34+
{
35+
pub fn new(stream: S, max_size: usize, size_fn: F) -> Self {
36+
ChunksSize {
37+
stream: stream.fuse(),
38+
items: Vec::default(),
39+
size: 0,
40+
cap: max_size,
41+
size_fn,
42+
}
43+
}
44+
45+
/// Drains the buffered items, returning them without waiting for the timeout or capacity limit.
46+
pub fn into_remainder(self) -> Vec<S::Item> {
47+
self.items
48+
}
49+
}
50+
51+
impl<F, S: Stream> Stream for ChunksSize<F, S>
52+
where
53+
F: Fn(&S::Item) -> usize,
54+
{
55+
type Item = Vec<S::Item>;
56+
57+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
58+
let mut me = self.as_mut().project();
59+
loop {
60+
match me.stream.as_mut().poll_next(cx) {
61+
Poll::Pending if me.items.is_empty() => return Poll::Pending,
62+
Poll::Pending => {
63+
*me.size = 0;
64+
return Poll::Ready(Some(std::mem::take(me.items)));
65+
}
66+
Poll::Ready(Some(item)) => {
67+
let item_size = (me.size_fn)(&item);
68+
69+
if me.items.is_empty() || *me.size + item_size <= *me.cap {
70+
me.items.push(item);
71+
} else {
72+
// not empty and adding the item will go over the cap
73+
let items = std::mem::replace(me.items, vec![item]);
74+
*me.size = item_size;
75+
return Poll::Ready(Some(items));
76+
}
77+
78+
if *me.size >= *me.cap {
79+
*me.size = 0;
80+
return Poll::Ready(Some(std::mem::take(me.items)));
81+
}
82+
}
83+
Poll::Ready(None) => {
84+
// Returning Some here is only correct because we fuse the inner stream.
85+
let last = if me.items.is_empty() {
86+
None
87+
} else {
88+
Some(std::mem::take(me.items))
89+
};
90+
91+
return Poll::Ready(last);
92+
}
93+
}
94+
}
95+
}
96+
97+
fn size_hint(&self) -> (usize, Option<usize>) {
98+
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
99+
let (lower, upper) = self.stream.size_hint();
100+
let lower = (lower / self.cap).saturating_add(chunk_len);
101+
let upper = upper.and_then(|x| x.checked_add(chunk_len));
102+
(lower, upper)
103+
}
104+
}

0 commit comments

Comments
 (0)