Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ restate-utoipa = { path = "crates/utoipa" }
restate-vqueues = { path = "crates/vqueues" }
restate-wal-protocol = { path = "crates/wal-protocol" }
restate-worker = { path = "crates/worker" }
restate-ingestion-client = { path = "crates/ingestion-client" }

# this workspace-hack package is overridden by a patch below to use workspace-hack subdir when building in this repo
# outside this repo, the crates.io restate-workspace-hack (an empty package) will be used instead
Expand Down
30 changes: 30 additions & 0 deletions crates/ingestion-client/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
[package]
name = "restate-ingestion-client"
version.workspace = true
authors.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
publish = false

[dependencies]
arc-swap = { workspace = true }
dashmap = { workspace = true }
futures = { workspace = true }
pin-project-lite = { workspace = true }
thiserror = { workspace = true }
tokio-stream = { workspace = true }
tokio-util = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }

restate-core = { workspace = true }
restate-types = { workspace = true }
restate-workspace-hack = { workspace = true }

[dev-dependencies]
bytes = { workspace = true }
googletest = { workspace = true }
test-log = { workspace = true }

restate-core = { workspace = true, features = ["test-util"] }
104 changes: 104 additions & 0 deletions crates/ingestion-client/src/chunks_size.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
// All rights reserved.
//
// Use of this software is governed by the Business Source License
// included in the LICENSE file.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use tokio_stream::adapters::Fuse;
use tokio_stream::{Stream, StreamExt};

use core::pin::Pin;
use core::task::{Context, Poll};
use pin_project_lite::pin_project;

pin_project! {
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct ChunksSize<F, S: Stream> {
#[pin]
stream: Fuse<S>,
items: Vec<S::Item>,
size: usize,
cap: usize, // https://github.com/rust-lang/futures-rs/issues/1475
size_fn: F,
}
}

impl<F, S: Stream> ChunksSize<F, S>
where
F: Fn(&S::Item) -> usize,
{
pub fn new(stream: S, max_size: usize, size_fn: F) -> Self {
ChunksSize {
stream: stream.fuse(),
items: Vec::default(),
size: 0,
cap: max_size,
size_fn,
}
}

/// Drains the buffered items, returning them without waiting for the timeout or capacity limit.
pub fn into_remainder(self) -> Vec<S::Item> {
self.items
}
}

impl<F, S: Stream> Stream for ChunksSize<F, S>
where
F: Fn(&S::Item) -> usize,
{
type Item = Vec<S::Item>;

fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.as_mut().project();
loop {
match me.stream.as_mut().poll_next(cx) {
Poll::Pending if me.items.is_empty() => return Poll::Pending,
Poll::Pending => {
*me.size = 0;
return Poll::Ready(Some(std::mem::take(me.items)));
}
Poll::Ready(Some(item)) => {
let item_size = (me.size_fn)(&item);

if me.items.is_empty() || *me.size + item_size <= *me.cap {
me.items.push(item);
} else {
// not empty and adding the item will go over the cap
let items = std::mem::replace(me.items, vec![item]);
*me.size = item_size;
return Poll::Ready(Some(items));
}

if *me.size >= *me.cap {
*me.size = 0;
return Poll::Ready(Some(std::mem::take(me.items)));
}
}
Poll::Ready(None) => {
// Returning Some here is only correct because we fuse the inner stream.
let last = if me.items.is_empty() {
None
} else {
Some(std::mem::take(me.items))
};

return Poll::Ready(last);
}
}
}
}

fn size_hint(&self) -> (usize, Option<usize>) {
let chunk_len = if self.items.is_empty() { 0 } else { 1 };
let (lower, upper) = self.stream.size_hint();
let lower = (lower / self.cap).saturating_add(chunk_len);
let upper = upper.and_then(|x| x.checked_add(chunk_len));
(lower, upper)
}
}
Loading
Loading