Skip to content

Commit 484bced

Browse files
committed
[Ingress] ingress-client crate
- `ingress-client` implements the runtime layer that receives ingress traffic, fans it out to the correct partition, and tracks completion. It exposes: - `Ingress`, enforces inflight budgets, and resolves partition IDs before sending work downstream. - The session subsystem that batches `IngestRecords`, retries connections, and reports commit status to callers. - `ingress-core` only ingests records and notify the caller once the record is "committed" to bifrost by the PP. This makes it useful to implement kafka ingress and other external ingestion
1 parent 3369991 commit 484bced

File tree

9 files changed

+880
-0
lines changed

9 files changed

+880
-0
lines changed

Cargo.lock

Lines changed: 18 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ restate-types = { path = "crates/types" }
8585
restate-utoipa = { path = "crates/utoipa" }
8686
restate-wal-protocol = { path = "crates/wal-protocol" }
8787
restate-worker = { path = "crates/worker" }
88+
restate-ingress-client = { path = "crates/ingress-client" }
8889

8990
# this workspace-hack package is overridden by a patch below to use workspace-hack subdir when building in this repo
9091
# outside this repo, the crates.io restate-workspace-hack (an empty package) will be used instead

crates/ingress-client/Cargo.toml

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
[package]
2+
name = "restate-ingress-client"
3+
version.workspace = true
4+
authors.workspace = true
5+
edition.workspace = true
6+
rust-version.workspace = true
7+
license.workspace = true
8+
publish = false
9+
10+
[dependencies]
11+
arc-swap = { workspace = true }
12+
dashmap = { workspace = true }
13+
futures = { workspace = true }
14+
pin-project-lite = { workspace = true }
15+
thiserror = { workspace = true }
16+
tokio = { workspace = true }
17+
tokio-stream = { workspace = true }
18+
tokio-util = { workspace = true }
19+
tracing = { workspace = true }
20+
21+
restate-workspace-hack = { workspace = true }
22+
restate-core = { workspace = true }
23+
restate-types = { workspace = true }
Lines changed: 93 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,93 @@
1+
use tokio::time::{Sleep, sleep};
2+
use tokio_stream::adapters::Fuse;
3+
use tokio_stream::{Stream, StreamExt};
4+
5+
use core::future::Future;
6+
use core::pin::Pin;
7+
use core::task::{Context, Poll, ready};
8+
use pin_project_lite::pin_project;
9+
use std::time::Duration;
10+
11+
// This file is a copy from `tokio_stream` until PR https://github.com/tokio-rs/tokio/pull/7715 is released
12+
13+
pin_project! {
14+
/// Stream returned by the [`chunks_timeout`](super::StreamExt::chunks_timeout) method.
15+
#[must_use = "streams do nothing unless polled"]
16+
#[derive(Debug)]
17+
pub struct ChunksTimeout<S: Stream> {
18+
#[pin]
19+
stream: Fuse<S>,
20+
#[pin]
21+
deadline: Option<Sleep>,
22+
duration: Duration,
23+
items: Vec<S::Item>,
24+
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
25+
}
26+
}
27+
28+
impl<S: Stream> ChunksTimeout<S> {
29+
pub fn new(stream: S, max_size: usize, duration: Duration) -> Self {
30+
ChunksTimeout {
31+
stream: stream.fuse(),
32+
deadline: None,
33+
duration,
34+
items: Vec::with_capacity(max_size),
35+
cap: max_size,
36+
}
37+
}
38+
/// Drains the buffered items, returning them without waiting for the timeout or capacity limit.
39+
pub fn into_remainder(mut self: Pin<&mut Self>) -> Vec<S::Item> {
40+
let me = self.as_mut().project();
41+
std::mem::take(me.items)
42+
}
43+
}
44+
45+
impl<S: Stream> Stream for ChunksTimeout<S> {
46+
type Item = Vec<S::Item>;
47+
48+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
49+
let mut me = self.as_mut().project();
50+
loop {
51+
match me.stream.as_mut().poll_next(cx) {
52+
Poll::Pending => break,
53+
Poll::Ready(Some(item)) => {
54+
if me.items.is_empty() {
55+
me.deadline.set(Some(sleep(*me.duration)));
56+
me.items.reserve_exact(*me.cap);
57+
}
58+
me.items.push(item);
59+
if me.items.len() >= *me.cap {
60+
return Poll::Ready(Some(std::mem::take(me.items)));
61+
}
62+
}
63+
Poll::Ready(None) => {
64+
// Returning Some here is only correct because we fuse the inner stream.
65+
let last = if me.items.is_empty() {
66+
None
67+
} else {
68+
Some(std::mem::take(me.items))
69+
};
70+
71+
return Poll::Ready(last);
72+
}
73+
}
74+
}
75+
76+
if !me.items.is_empty() {
77+
if let Some(deadline) = me.deadline.as_pin_mut() {
78+
ready!(deadline.poll(cx));
79+
}
80+
return Poll::Ready(Some(std::mem::take(me.items)));
81+
}
82+
83+
Poll::Pending
84+
}
85+
86+
fn size_hint(&self) -> (usize, Option<usize>) {
87+
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
88+
let (lower, upper) = self.stream.size_hint();
89+
let lower = (lower / self.cap).saturating_add(chunk_len);
90+
let upper = upper.and_then(|x| x.checked_add(chunk_len));
91+
(lower, upper)
92+
}
93+
}
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use std::sync::Arc;
12+
13+
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
14+
15+
use restate_core::{
16+
network::{Networking, TransportConnect},
17+
partitions::PartitionRouting,
18+
};
19+
use restate_types::{
20+
identifiers::PartitionKey,
21+
live::Live,
22+
net::ingress::IngestRecord,
23+
partitions::{FindPartition, PartitionTable, PartitionTableError},
24+
};
25+
26+
use crate::{RecordCommit, SessionOptions, session::SessionManager};
27+
28+
/// Errors that can be observed when interacting with the ingress facade.
29+
#[derive(Debug, thiserror::Error)]
30+
pub enum IngestionError {
31+
#[error("Ingress closed")]
32+
Closed,
33+
#[error(transparent)]
34+
PartitionTableError(#[from] PartitionTableError),
35+
}
36+
37+
/// High-level ingress entry point that allocates permits and hands out session handles per partition.
38+
/// IngressClient can be cloned and shared across different routines. All users will share the same budget
39+
/// and underlying partition sessions.
40+
#[derive(Clone)]
41+
pub struct IngressClient<T> {
42+
manager: SessionManager<T>,
43+
partition_table: Live<PartitionTable>,
44+
// budget for inflight invocations.
45+
// this should be a memory budget but it's
46+
// not possible atm to compute the serialization
47+
// size of an invocation.
48+
permits: Arc<Semaphore>,
49+
}
50+
51+
impl<T> IngressClient<T> {
52+
/// Builds a new ingress facade with the provided networking stack, partition metadata, and
53+
/// budget for inflight records.
54+
pub fn new(
55+
networking: Networking<T>,
56+
partition_table: Live<PartitionTable>,
57+
partition_routing: PartitionRouting,
58+
budget: usize,
59+
opts: Option<SessionOptions>,
60+
) -> Self {
61+
Self {
62+
manager: SessionManager::new(networking, partition_routing, opts),
63+
partition_table,
64+
permits: Arc::new(Semaphore::new(budget)),
65+
}
66+
}
67+
}
68+
69+
impl<T> IngressClient<T>
70+
where
71+
T: TransportConnect,
72+
{
73+
/// Reserves capacity to send exactly one record.
74+
pub async fn reserve(&self) -> Result<IngressPermit<'_, T>, IngestionError> {
75+
let permit = self
76+
.permits
77+
.clone()
78+
.acquire_owned()
79+
.await
80+
.map_err(|_| IngestionError::Closed)?;
81+
82+
Ok(IngressPermit {
83+
permit,
84+
ingress: self,
85+
})
86+
}
87+
88+
/// Once closed, calls to ingest will return [`IngestionError::Closed`].
89+
/// Inflight records might still get committed.
90+
pub fn close(&self) {
91+
self.permits.close();
92+
self.manager.close();
93+
}
94+
}
95+
96+
/// Permit that owns capacity for a single record ingest against an [`Ingress`] instance.
97+
pub struct IngressPermit<'a, T> {
98+
permit: OwnedSemaphorePermit,
99+
ingress: &'a IngressClient<T>,
100+
}
101+
102+
impl<'a, T> IngressPermit<'a, T>
103+
where
104+
T: TransportConnect,
105+
{
106+
/// Sends a record to the partition derived from the supplied [`PartitionKey`], consuming the permit.
107+
pub fn ingest(
108+
self,
109+
partition_key: PartitionKey,
110+
record: impl Into<IngestRecord>,
111+
) -> Result<RecordCommit, IngestionError> {
112+
let partition_id = self
113+
.ingress
114+
.partition_table
115+
.pinned()
116+
.find_partition_id(partition_key)?;
117+
118+
let handle = self.ingress.manager.get(partition_id);
119+
120+
handle
121+
.ingest(self.permit, record.into())
122+
.map_err(|_| IngestionError::Closed)
123+
}
124+
}

crates/ingress-client/src/lib.rs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
mod chunks_timeout;
12+
mod ingress;
13+
mod session;
14+
15+
pub use ingress::{IngestionError, IngressClient, IngressPermit};
16+
pub use session::{CommitError, RecordCommit, SessionOptions};

0 commit comments

Comments
 (0)