Skip to content

Commit f5cd36f

Browse files
committed
openTelemetry: propagate baggage
1 parent 8f61af3 commit f5cd36f

File tree

7 files changed

+289
-50
lines changed

7 files changed

+289
-50
lines changed

opentelemetry/build.gradle

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,15 @@ dependencies {
1515
libraries.auto.value.annotations
1616

1717
testImplementation project(':grpc-testing'),
18+
project(':grpc-testing-proto'),
1819
project(':grpc-inprocess'),
20+
project(':grpc-stub'),
21+
project(':grpc-protobuf'),
1922
testFixtures(project(':grpc-core')),
2023
testFixtures(project(':grpc-api')),
2124
libraries.opentelemetry.sdk.testing,
2225
libraries.assertj.core // opentelemetry.sdk.testing uses compileOnly for assertj
26+
libraries.protobuf.java
2327

2428
annotationProcessor libraries.auto.value
2529

opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -179,12 +179,14 @@ public void configureChannelBuilder(ManagedChannelBuilder<?> builder) {
179179
* @param serverBuilder the server builder to configure
180180
*/
181181
public void configureServerBuilder(ServerBuilder<?> serverBuilder) {
182-
serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory());
182+
/* To ensure baggage propagation to metrics, we need the tracing
183+
tracers to be initialised before metrics */
183184
if (ENABLE_OTEL_TRACING) {
184185
serverBuilder.addStreamTracerFactory(
185186
openTelemetryTracingModule.getServerTracerFactory());
186187
serverBuilder.intercept(openTelemetryTracingModule.getServerSpanPropagationInterceptor());
187188
}
189+
serverBuilder.addStreamTracerFactory(openTelemetryMetricsModule.getServerTracerFactory());
188190
}
189191

190192
@VisibleForTesting

opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java

Lines changed: 31 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.common.base.Preconditions.checkNotNull;
2020
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BACKEND_SERVICE_KEY;
21+
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY;
2122
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY;
2223
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY;
2324
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY;
@@ -44,7 +45,9 @@
4445
import io.grpc.Status;
4546
import io.grpc.Status.Code;
4647
import io.grpc.StreamTracer;
48+
import io.opentelemetry.api.baggage.Baggage;
4749
import io.opentelemetry.api.common.AttributesBuilder;
50+
import io.opentelemetry.context.Context;
4851
import java.util.ArrayList;
4952
import java.util.Collection;
5053
import java.util.Collections;
@@ -94,8 +97,8 @@ final class OpenTelemetryMetricsModule {
9497
private final ImmutableList<OpenTelemetryPlugin> plugins;
9598

9699
OpenTelemetryMetricsModule(Supplier<Stopwatch> stopwatchSupplier,
97-
OpenTelemetryMetricsResource resource, Collection<String> optionalLabels,
98-
List<OpenTelemetryPlugin> plugins) {
100+
OpenTelemetryMetricsResource resource,
101+
Collection<String> optionalLabels, List<OpenTelemetryPlugin> plugins) {
99102
this.resource = checkNotNull(resource, "resource");
100103
this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier");
101104
this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey());
@@ -128,6 +131,14 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
128131
return isGeneratedMethod ? fullMethodName : "other";
129132
}
130133

134+
private static Context otelContextWithBaggage() {
135+
Baggage baggage = BAGGAGE_KEY.get(io.grpc.Context.current());
136+
if (baggage == null) {
137+
return Context.current();
138+
}
139+
return Context.current().with(baggage);
140+
}
141+
131142
private static final class ClientTracer extends ClientStreamTracer {
132143
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
133144
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
@@ -243,6 +254,7 @@ public void streamClosed(Status status) {
243254
}
244255

245256
void recordFinishedAttempt() {
257+
Context otelContext = otelContextWithBaggage();
246258
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
247259
.put(METHOD_KEY, fullMethodName)
248260
.put(TARGET_KEY, target)
@@ -268,15 +280,15 @@ void recordFinishedAttempt() {
268280

269281
if (module.resource.clientAttemptDurationCounter() != null ) {
270282
module.resource.clientAttemptDurationCounter()
271-
.record(attemptNanos * SECONDS_PER_NANO, attribute);
283+
.record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext);
272284
}
273285
if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
274286
module.resource.clientTotalSentCompressedMessageSizeCounter()
275-
.record(outboundWireSize, attribute);
287+
.record(outboundWireSize, attribute, otelContext);
276288
}
277289
if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
278290
module.resource.clientTotalReceivedCompressedMessageSizeCounter()
279-
.record(inboundWireSize, attribute);
291+
.record(inboundWireSize, attribute, otelContext);
280292
}
281293
}
282294
}
@@ -408,6 +420,7 @@ void callEnded(Status status) {
408420
}
409421

410422
void recordFinishedCall() {
423+
Context otelContext = otelContextWithBaggage();
411424
if (attemptsPerCall.get() == 0) {
412425
ClientTracer tracer = newClientTracer(null);
413426
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
@@ -429,15 +442,17 @@ void recordFinishedCall() {
429442
callLatencyNanos * SECONDS_PER_NANO,
430443
baseAttributes.toBuilder()
431444
.put(STATUS_KEY, status.getCode().toString())
432-
.build()
445+
.build(),
446+
otelContext
433447
);
434448
}
435449

436450
// Retry counts
437451
if (module.resource.clientCallRetriesCounter() != null) {
438452
long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0);
439453
if (retriesPerCall > 0) {
440-
module.resource.clientCallRetriesCounter().record(retriesPerCall, baseAttributes);
454+
module.resource.clientCallRetriesCounter()
455+
.record(retriesPerCall, baseAttributes, otelContext);
441456
}
442457
}
443458

@@ -446,24 +461,25 @@ void recordFinishedCall() {
446461
long hedges = hedgedAttemptsPerCall.get();
447462
if (hedges > 0) {
448463
module.resource.clientCallHedgesCounter()
449-
.record(hedges, baseAttributes);
464+
.record(hedges, baseAttributes, otelContext);
450465
}
451466
}
452467

453468
// Transparent Retry counts
454469
if (module.resource.clientCallTransparentRetriesCounter() != null) {
455470
long transparentRetries = transparentRetriesPerCall.get();
456471
if (transparentRetries > 0) {
457-
module.resource.clientCallTransparentRetriesCounter().record(
458-
transparentRetries, baseAttributes);
472+
module.resource.clientCallTransparentRetriesCounter()
473+
.record(transparentRetries, baseAttributes, otelContext);
459474
}
460475
}
461476

462477
// Retry delay
463478
if (module.resource.clientCallRetryDelayCounter() != null) {
464479
module.resource.clientCallRetryDelayCounter().record(
465480
retryDelayNanos * SECONDS_PER_NANO,
466-
baseAttributes
481+
baseAttributes,
482+
otelContext
467483
);
468484
}
469485
}
@@ -562,6 +578,7 @@ public void inboundWireSize(long bytes) {
562578
*/
563579
@Override
564580
public void streamClosed(Status status) {
581+
Context otelContext = otelContextWithBaggage();
565582
if (streamClosedUpdater != null) {
566583
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
567584
return;
@@ -584,15 +601,15 @@ public void streamClosed(Status status) {
584601

585602
if (module.resource.serverCallDurationCounter() != null) {
586603
module.resource.serverCallDurationCounter()
587-
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes);
604+
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext);
588605
}
589606
if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
590607
module.resource.serverTotalSentCompressedMessageSizeCounter()
591-
.record(outboundWireSize, attributes);
608+
.record(outboundWireSize, attributes, otelContext);
592609
}
593610
if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
594611
module.resource.serverTotalReceivedCompressedMessageSizeCounter()
595-
.record(inboundWireSize, attributes);
612+
.record(inboundWireSize, attributes, otelContext);
596613
}
597614
}
598615
}

opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryTracingModule.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.common.base.Preconditions.checkNotNull;
2020
import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED;
2121
import static io.grpc.internal.GrpcUtil.IMPLEMENTATION_VERSION;
22+
import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY;
2223

2324
import com.google.common.annotations.VisibleForTesting;
2425
import io.grpc.Attributes;
@@ -60,10 +61,6 @@ final class OpenTelemetryTracingModule {
6061
@VisibleForTesting
6162
final io.grpc.Context.Key<Span> otelSpan = io.grpc.Context.key("opentelemetry-span-key");
6263

63-
@VisibleForTesting
64-
final io.grpc.Context.Key<Baggage> baggageKey =
65-
io.grpc.Context.key("opentelemetry-baggage-key");
66-
6764
@Nullable
6865
private static final AtomicIntegerFieldUpdater<CallAttemptsTracerFactory> callEndedUpdater;
6966
@Nullable
@@ -248,13 +245,15 @@ private final class ServerTracer extends ServerStreamTracer {
248245
private final Span span;
249246
volatile int streamClosed;
250247
private int seqNo;
248+
private Baggage baggage;
251249

252-
ServerTracer(String fullMethodName, @Nullable Span remoteSpan) {
250+
ServerTracer(String fullMethodName, @Nullable Span remoteSpan, Baggage baggage) {
253251
checkNotNull(fullMethodName, "fullMethodName");
254252
this.span =
255253
otelTracer.spanBuilder(generateTraceSpanName(true, fullMethodName))
256254
.setParent(remoteSpan == null ? null : Context.current().with(remoteSpan))
257255
.startSpan();
256+
this.baggage = baggage;
258257
}
259258

260259
/**
@@ -280,7 +279,9 @@ public void streamClosed(io.grpc.Status status) {
280279

281280
@Override
282281
public io.grpc.Context filterContext(io.grpc.Context context) {
283-
return context.withValue(otelSpan, span);
282+
return context
283+
.withValue(otelSpan, span)
284+
.withValue(BAGGAGE_KEY, baggage);
284285
}
285286

286287
@Override
@@ -320,7 +321,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
320321
if (remoteSpan == Span.getInvalid()) {
321322
remoteSpan = null;
322323
}
323-
return new ServerTracer(fullMethodName, remoteSpan);
324+
Baggage baggage = Baggage.fromContext(context);
325+
return new ServerTracer(fullMethodName, remoteSpan, baggage);
324326
}
325327
}
326328

@@ -335,9 +337,12 @@ public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, Re
335337
+ "tracing must be set.");
336338
return next.startCall(call, headers);
337339
}
338-
Context serverCallContext = Context.current().with(span);
339-
Baggage baggage = baggageKey.get(io.grpc.Context.current());
340-
serverCallContext = serverCallContext.with(baggage);
340+
Context serverCallContext = Context.current();
341+
serverCallContext = serverCallContext.with(span);
342+
Baggage baggage = BAGGAGE_KEY.get(io.grpc.Context.current());
343+
if (baggage != null) {
344+
serverCallContext = serverCallContext.with(baggage);
345+
}
341346
try (Scope scope = serverCallContext.makeCurrent()) {
342347
return new ContextServerCallListener<>(next.startCall(call, headers), serverCallContext);
343348
}

opentelemetry/src/main/java/io/grpc/opentelemetry/internal/OpenTelemetryConstants.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,9 @@
1616

1717
package io.grpc.opentelemetry.internal;
1818

19+
import com.google.common.annotations.VisibleForTesting;
1920
import com.google.common.collect.ImmutableList;
21+
import io.opentelemetry.api.baggage.Baggage;
2022
import io.opentelemetry.api.common.AttributeKey;
2123
import java.util.List;
2224

@@ -42,6 +44,10 @@ public final class OpenTelemetryConstants {
4244
public static final AttributeKey<String> SECURITY_LEVEL_KEY =
4345
AttributeKey.stringKey("grpc.security_level");
4446

47+
@VisibleForTesting
48+
public static final io.grpc.Context.Key<Baggage> BAGGAGE_KEY =
49+
io.grpc.Context.key("opentelemetry-baggage-key");
50+
4551
public static final List<Double> LATENCY_BUCKETS =
4652
ImmutableList.of(
4753
0d, 0.00001d, 0.00005d, 0.0001d, 0.0003d, 0.0006d, 0.0008d, 0.001d, 0.002d,

0 commit comments

Comments
 (0)