Skip to content

Commit 3aa9219

Browse files
committed
[Ingress-Kafka] Refactor to use ingress-client
Summary: Refactor ingress-kafka to leverage on `ingress-client` implementation. This replaces the previous direct write to bifrost which allows: - Batching, which increases throughput - PP becomes the sole writer of its logs (WIP #3965)
1 parent 953b39a commit 3aa9219

File tree

15 files changed

+707
-575
lines changed

15 files changed

+707
-575
lines changed

Cargo.lock

Lines changed: 6 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/ingress-kafka/Cargo.toml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ options_schema = ["dep:schemars"]
1414
[dependencies]
1515
restate-workspace-hack = { workspace = true }
1616

17-
restate-bifrost = { workspace = true }
1817
restate-core = { workspace = true }
18+
restate-ingress-client = { workspace = true }
1919
restate-serde-util = { workspace = true }
2020
restate-storage-api = { workspace = true }
2121
restate-timer-queue = { workspace = true }
@@ -27,6 +27,7 @@ anyhow = { workspace = true }
2727
base64 = { workspace = true }
2828
bytes = { workspace = true }
2929
derive_more = { workspace = true }
30+
futures = { workspace = true }
3031
metrics = { workspace = true }
3132
opentelemetry = { workspace = true }
3233
opentelemetry_sdk = { workspace = true }
@@ -38,6 +39,7 @@ thiserror = { workspace = true }
3839
tokio = { workspace = true, features = ["sync", "rt"] }
3940
tracing = { workspace = true }
4041
tracing-opentelemetry = { workspace = true }
42+
xxhash-rust = { workspace = true, features = ["xxh3", "std"] }
4143

4244
[dev-dependencies]
4345
restate-types = { workspace = true, features = ["test-util"] }
Lines changed: 319 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,319 @@
1+
// Copyright (c) 2023 - 2025 Restate Software, Inc., Restate GmbH.
2+
// All rights reserved.
3+
//
4+
// Use of this software is governed by the Business Source License
5+
// included in the LICENSE file.
6+
//
7+
// As of the Change Date specified in that file, in accordance with
8+
// the Business Source License, use of this software will be governed
9+
// by the Apache License, Version 2.0.
10+
11+
use std::borrow::Borrow;
12+
13+
use anyhow::bail;
14+
use base64::Engine;
15+
use bytes::Bytes;
16+
use opentelemetry::propagation::{Extractor, TextMapPropagator};
17+
use opentelemetry::trace::{Span, SpanContext, TraceContextExt};
18+
use opentelemetry_sdk::propagation::TraceContextPropagator;
19+
use rdkafka::Message;
20+
use rdkafka::message::BorrowedMessage;
21+
use tracing::{info_span, trace};
22+
23+
use restate_storage_api::deduplication_table::DedupInformation;
24+
use restate_types::identifiers::{InvocationId, InvocationUuid, WithPartitionKey, partitioner};
25+
use restate_types::invocation::{Header, InvocationTarget, ServiceInvocation, SpanRelation};
26+
use restate_types::live::{self, Live};
27+
use restate_types::schema::Schema;
28+
use restate_types::schema::invocation_target::{DeploymentStatus, InvocationTargetResolver};
29+
use restate_types::schema::subscriptions::{EventInvocationTargetTemplate, Sink, Subscription};
30+
use restate_wal_protocol::{Command, Destination, Envelope, Source};
31+
32+
use crate::Error;
33+
34+
#[derive(Clone)]
35+
pub struct EnvelopeBuilder {
36+
subscription: Subscription,
37+
schema: Live<Schema>,
38+
subscription_id: String,
39+
}
40+
41+
impl EnvelopeBuilder {
42+
pub fn new(subscription: Subscription, schema: Live<Schema>) -> Self {
43+
Self {
44+
subscription_id: subscription.id().to_string(),
45+
subscription,
46+
schema,
47+
}
48+
}
49+
50+
pub fn subscription(&self) -> &Subscription {
51+
&self.subscription
52+
}
53+
54+
pub fn build(
55+
&mut self,
56+
producer_id: u128,
57+
consumer_group_id: &str,
58+
msg: BorrowedMessage<'_>,
59+
) -> Result<Envelope, Error> {
60+
// Prepare ingress span
61+
let ingress_span = info_span!(
62+
"kafka_ingress_consume",
63+
otel.name = "kafka_ingress_consume",
64+
messaging.system = "kafka",
65+
messaging.operation = "receive",
66+
messaging.source.name = msg.topic(),
67+
messaging.destination.name = %self.subscription.sink(),
68+
restate.subscription.id = %self.subscription.id(),
69+
messaging.consumer.group.name = consumer_group_id
70+
);
71+
72+
trace!(parent: &ingress_span, "Building Kafka ingress request");
73+
74+
let key = if let Some(k) = msg.key() {
75+
Bytes::copy_from_slice(k)
76+
} else {
77+
Bytes::default()
78+
};
79+
let payload = if let Some(p) = msg.payload() {
80+
Bytes::copy_from_slice(p)
81+
} else {
82+
Bytes::default()
83+
};
84+
85+
let headers = Self::generate_events_attributes(&msg, &self.subscription_id);
86+
let dedup = DedupInformation::producer(producer_id, msg.offset() as u64);
87+
88+
let invocation = InvocationBuilder::create(
89+
&self.subscription,
90+
producer_id,
91+
self.schema.pinned(),
92+
key,
93+
payload,
94+
headers,
95+
consumer_group_id,
96+
msg.topic(),
97+
msg.partition(),
98+
msg.offset(),
99+
)
100+
.map_err(|cause| Error::Event {
101+
topic: msg.topic().to_string(),
102+
partition: msg.partition(),
103+
offset: msg.offset(),
104+
cause,
105+
})?;
106+
107+
Ok(self.wrap_service_invocation_in_envelope(invocation, dedup))
108+
}
109+
110+
fn wrap_service_invocation_in_envelope(
111+
&self,
112+
service_invocation: Box<ServiceInvocation>,
113+
dedup_information: DedupInformation,
114+
) -> Envelope {
115+
let header = restate_wal_protocol::Header {
116+
source: Source::Ingress {},
117+
dest: Destination::Processor {
118+
partition_key: service_invocation.partition_key(),
119+
dedup: Some(dedup_information),
120+
},
121+
};
122+
123+
Envelope::new(header, Command::Invoke(service_invocation))
124+
}
125+
126+
fn generate_events_attributes(msg: &impl Message, subscription_id: &str) -> Vec<Header> {
127+
let mut headers = Vec::with_capacity(6);
128+
headers.push(Header::new("kafka.offset", msg.offset().to_string()));
129+
headers.push(Header::new("kafka.topic", msg.topic()));
130+
headers.push(Header::new("kafka.partition", msg.partition().to_string()));
131+
if let Some(timestamp) = msg.timestamp().to_millis() {
132+
headers.push(Header::new("kafka.timestamp", timestamp.to_string()));
133+
}
134+
headers.push(Header::new(
135+
"restate.subscription.id".to_string(),
136+
subscription_id,
137+
));
138+
139+
if let Some(key) = msg.key() {
140+
headers.push(Header::new(
141+
"kafka.key",
142+
&*base64::prelude::BASE64_URL_SAFE.encode(key),
143+
));
144+
}
145+
146+
headers
147+
}
148+
}
149+
150+
#[derive(Debug)]
151+
pub struct InvocationBuilder;
152+
153+
impl InvocationBuilder {
154+
#[allow(clippy::too_many_arguments)]
155+
pub fn create(
156+
subscription: &Subscription,
157+
producer_id: u128,
158+
schema: live::Pinned<Schema>,
159+
key: Bytes,
160+
payload: Bytes,
161+
headers: Vec<restate_types::invocation::Header>,
162+
consumer_group_id: &str,
163+
topic: &str,
164+
partition: i32,
165+
offset: i64,
166+
) -> Result<Box<ServiceInvocation>, anyhow::Error> {
167+
let Sink::Invocation {
168+
event_invocation_target_template,
169+
} = subscription.sink();
170+
171+
let invocation_target = match event_invocation_target_template {
172+
EventInvocationTargetTemplate::Service { name, handler } => {
173+
InvocationTarget::service(name.clone(), handler.clone())
174+
}
175+
EventInvocationTargetTemplate::VirtualObject {
176+
name,
177+
handler,
178+
handler_ty,
179+
} => InvocationTarget::virtual_object(
180+
name.clone(),
181+
std::str::from_utf8(&key)
182+
.map_err(|e| anyhow::anyhow!("The Kafka record key must be valid UTF-8: {e}"))?
183+
.to_owned(),
184+
handler.clone(),
185+
*handler_ty,
186+
),
187+
EventInvocationTargetTemplate::Workflow {
188+
name,
189+
handler,
190+
handler_ty,
191+
} => InvocationTarget::workflow(
192+
name.clone(),
193+
std::str::from_utf8(&key)
194+
.map_err(|e| anyhow::anyhow!("The Kafka record key must be valid UTF-8: {e}"))?
195+
.to_owned(),
196+
handler.clone(),
197+
*handler_ty,
198+
),
199+
};
200+
201+
// Compute the retention values
202+
let target = schema
203+
.resolve_latest_invocation_target(
204+
invocation_target.service_name(),
205+
invocation_target.handler_name(),
206+
)
207+
.ok_or_else(|| anyhow::anyhow!("Service and handler are not registered"))?;
208+
209+
if let DeploymentStatus::Deprecated(dp_id) = target.deployment_status {
210+
bail!(
211+
"the service {} is exposed by the deprecated deployment {dp_id}, please upgrade the SDK.",
212+
invocation_target.service_name()
213+
)
214+
}
215+
216+
let invocation_retention = target.compute_retention(false);
217+
218+
let dedup_ref = DedupRef {
219+
producer: &producer_id,
220+
offset: &offset,
221+
};
222+
223+
// partition key is computed normally from the target *key*. If the
224+
// key is None (in case of a service) we instead use the hash of the
225+
// dedup information to make sure this service invocation lands
226+
// on the same partition everytime
227+
let partition_key = invocation_target
228+
.key()
229+
.map(partitioner::HashPartitioner::compute_partition_key)
230+
.unwrap_or_else(|| partitioner::HashPartitioner::compute_partition_key(dedup_ref));
231+
232+
let invocation_id = InvocationId::from_parts(
233+
partition_key,
234+
InvocationUuid::generate(&invocation_target, None),
235+
);
236+
237+
// Figure out tracing span
238+
let ingress_span_context = prepare_tracing_span(
239+
&invocation_id,
240+
&invocation_target,
241+
&headers,
242+
consumer_group_id,
243+
topic,
244+
partition as i64,
245+
offset,
246+
);
247+
248+
// Finally generate service invocation
249+
let mut service_invocation = Box::new(ServiceInvocation::initialize(
250+
invocation_id,
251+
invocation_target,
252+
restate_types::invocation::Source::Subscription(subscription.id()),
253+
));
254+
service_invocation.with_related_span(SpanRelation::parent(ingress_span_context));
255+
service_invocation.argument = payload;
256+
service_invocation.headers = headers;
257+
service_invocation.with_retention(invocation_retention);
258+
259+
Ok(service_invocation)
260+
}
261+
}
262+
263+
#[derive(Hash)]
264+
struct DedupRef<'a> {
265+
producer: &'a u128,
266+
offset: &'a i64,
267+
}
268+
269+
#[allow(clippy::too_many_arguments)]
270+
pub(crate) fn prepare_tracing_span(
271+
invocation_id: &InvocationId,
272+
invocation_target: &InvocationTarget,
273+
headers: &[restate_types::invocation::Header],
274+
consumer_group_name: &str,
275+
topic: &str,
276+
partition: i64,
277+
offset: i64,
278+
) -> SpanContext {
279+
let tracing_context = TraceContextPropagator::new().extract(&HeaderExtractor(headers));
280+
let inbound_span = tracing_context.span();
281+
282+
let relation = if inbound_span.span_context().is_valid() {
283+
SpanRelation::parent(inbound_span.span_context())
284+
} else {
285+
SpanRelation::None
286+
};
287+
288+
let span = restate_tracing_instrumentation::info_invocation_span!(
289+
relation = relation,
290+
prefix = "ingress_kafka",
291+
id = invocation_id,
292+
target = invocation_target,
293+
tags = (
294+
messaging.system = "kafka",
295+
messaging.consumer.group.name = consumer_group_name.to_owned(),
296+
messaging.operation.type = "process",
297+
messaging.kafka.offset = offset,
298+
messaging.source.partition.id = partition,
299+
messaging.source.name = topic.to_owned()
300+
)
301+
);
302+
303+
span.span_context().clone()
304+
}
305+
306+
struct HeaderExtractor<'a>(pub &'a [restate_types::invocation::Header]);
307+
308+
impl Extractor for HeaderExtractor<'_> {
309+
fn get(&self, key: &str) -> Option<&str> {
310+
self.0
311+
.iter()
312+
.find(|h| h.name.eq_ignore_ascii_case(key))
313+
.map(|value| value.value.borrow())
314+
}
315+
316+
fn keys(&self) -> Vec<&str> {
317+
self.0.iter().map(|h| h.name.borrow()).collect::<Vec<_>>()
318+
}
319+
}

0 commit comments

Comments
 (0)