Skip to content

Commit

Permalink
Propagate Context instead of SpanContext (#4806)
Browse files Browse the repository at this point in the history
* Propagate Context instead of SpanContext

* Update spring-kafka

* Fix nesting

* Comment
  • Loading branch information
trask committed Dec 6, 2021
1 parent 7b26133 commit 31fddb7
Show file tree
Hide file tree
Showing 12 changed files with 68 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,17 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.instrumentation.api.instrumenter.SpanKey;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import java.util.Iterator;
Expand Down Expand Up @@ -63,10 +65,13 @@ public static class IterableAdvice {
public static <K, V> void wrap(
@Advice.This ConsumerRecords<?, ?> records,
@Advice.Return(readOnly = false) Iterable<ConsumerRecord<K, V>> iterable) {
if (iterable != null) {
SpanContext receiveSpanContext =
VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records);
iterable = TracingIterable.wrap(iterable, receiveSpanContext);
// typically, span key suppression should happen inside the Instrumenter, but receiveContext
// is being used as the parent context for the span instead of the current context
if (iterable != null
&& SpanKey.CONSUMER_PROCESS.fromContextOrNull(currentContext()) == null) {
Context receiveContext =
VirtualField.find(ConsumerRecords.class, Context.class).get(records);
iterable = TracingIterable.wrap(iterable, receiveContext);
}
}
}
Expand All @@ -79,9 +84,9 @@ public static <K, V> void wrap(
@Advice.This ConsumerRecords<?, ?> records,
@Advice.Return(readOnly = false) List<ConsumerRecord<K, V>> list) {
if (list != null) {
SpanContext receiveSpanContext =
VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records);
list = TracingList.wrap(list, receiveSpanContext);
Context receiveContext =
VirtualField.find(ConsumerRecords.class, Context.class).get(records);
list = TracingList.wrap(list, receiveContext);
}
}
}
Expand All @@ -93,10 +98,13 @@ public static class IteratorAdvice {
public static <K, V> void wrap(
@Advice.This ConsumerRecords<?, ?> records,
@Advice.Return(readOnly = false) Iterator<ConsumerRecord<K, V>> iterator) {
if (iterator != null) {
SpanContext receiveSpanContext =
VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records);
iterator = TracingIterator.wrap(iterator, receiveSpanContext);
// typically, span key suppression should happen inside the Instrumenter, but receiveContext
// is being used as the parent context for the span instead of the current context
if (iterator != null
&& SpanKey.CONSUMER_PROCESS.fromContextOrNull(currentContext()) == null) {
Context receiveContext =
VirtualField.find(ConsumerRecords.class, Context.class).get(records);
iterator = TracingIterator.wrap(iterator, receiveContext);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.spanFromContext;
import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerReceiveInstrumenter;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.instrumentation.kafka.internal.ReceivedRecords;
Expand Down Expand Up @@ -73,9 +71,9 @@ public static void onExit(
// context even though the span has ended
// this is the suggested behavior according to the spec batch receive scenario:
// https://github.com/open-telemetry/opentelemetry-specification/blob/main/specification/trace/semantic_conventions/messaging.md#batch-receiving
VirtualField<ConsumerRecords, SpanContext> consumerRecordsSpan =
VirtualField.find(ConsumerRecords.class, SpanContext.class);
consumerRecordsSpan.set(records, spanFromContext(context).getSpanContext());
VirtualField<ConsumerRecords, Context> consumerRecordsContext =
VirtualField.find(ConsumerRecords.class, Context.class);
consumerRecordsContext.set(records, context);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessWrapper;
import java.util.Iterator;
Expand All @@ -16,19 +16,19 @@ public class TracingIterable<K, V>
implements Iterable<ConsumerRecord<K, V>>,
KafkaClientsConsumerProcessWrapper<Iterable<ConsumerRecord<K, V>>> {
private final Iterable<ConsumerRecord<K, V>> delegate;
@Nullable private final SpanContext receiveSpanContext;
@Nullable private final Context receiveContext;
private boolean firstIterator = true;

protected TracingIterable(
Iterable<ConsumerRecord<K, V>> delegate, @Nullable SpanContext receiveSpanContext) {
Iterable<ConsumerRecord<K, V>> delegate, @Nullable Context receiveContext) {
this.delegate = delegate;
this.receiveSpanContext = receiveSpanContext;
this.receiveContext = receiveContext;
}

public static <K, V> Iterable<ConsumerRecord<K, V>> wrap(
Iterable<ConsumerRecord<K, V>> delegate, @Nullable SpanContext receiveSpanContext) {
Iterable<ConsumerRecord<K, V>> delegate, @Nullable Context receiveContext) {
if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
return new TracingIterable<>(delegate, receiveSpanContext);
return new TracingIterable<>(delegate, receiveContext);
}
return delegate;
}
Expand All @@ -40,7 +40,7 @@ public Iterator<ConsumerRecord<K, V>> iterator() {
// However, this is not thread-safe, but usually the first (hopefully only) traversal of
// ConsumerRecords is performed in the same thread that called poll()
if (firstIterator) {
it = TracingIterator.wrap(delegate.iterator(), receiveSpanContext);
it = TracingIterator.wrap(delegate.iterator(), receiveContext);
firstIterator = false;
} else {
it = delegate.iterator();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import static io.opentelemetry.javaagent.instrumentation.kafkaclients.KafkaSingletons.consumerProcessInstrumenter;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
Expand All @@ -32,21 +30,17 @@ public class TracingIterator<K, V>
@Nullable private Scope currentScope;

private TracingIterator(
Iterator<ConsumerRecord<K, V>> delegateIterator, @Nullable SpanContext receiveSpanContext) {
Iterator<ConsumerRecord<K, V>> delegateIterator, @Nullable Context receiveContext) {
this.delegateIterator = delegateIterator;

// use the receive CONSUMER span as parent if it's available
Context parentContext = Context.current();
if (receiveSpanContext != null) {
parentContext = parentContext.with(Span.wrap(receiveSpanContext));
}
this.parentContext = parentContext;
// use the receive CONSUMER as parent if it's available
this.parentContext = receiveContext != null ? receiveContext : Context.current();
}

public static <K, V> Iterator<ConsumerRecord<K, V>> wrap(
Iterator<ConsumerRecord<K, V>> delegateIterator, @Nullable SpanContext receiveSpanContext) {
Iterator<ConsumerRecord<K, V>> delegateIterator, @Nullable Context receiveContext) {
if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
return new TracingIterator<>(delegateIterator, receiveSpanContext);
return new TracingIterator<>(delegateIterator, receiveContext);
}
return delegateIterator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.kafkaclients;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import java.util.Collection;
import java.util.List;
Expand All @@ -16,16 +16,15 @@
public class TracingList<K, V> extends TracingIterable<K, V> implements List<ConsumerRecord<K, V>> {
private final List<ConsumerRecord<K, V>> delegate;

private TracingList(
List<ConsumerRecord<K, V>> delegate, @Nullable SpanContext receiveSpanContext) {
super(delegate, receiveSpanContext);
private TracingList(List<ConsumerRecord<K, V>> delegate, @Nullable Context receiveContext) {
super(delegate, receiveContext);
this.delegate = delegate;
}

public static <K, V> List<ConsumerRecord<K, V>> wrap(
List<ConsumerRecord<K, V>> delegate, @Nullable SpanContext receiveSpanContext) {
List<ConsumerRecord<K, V>> delegate, @Nullable Context receiveContext) {
if (KafkaClientsConsumerProcessTracing.wrappingEnabled()) {
return new TracingList<>(delegate, receiveSpanContext);
return new TracingList<>(delegate, receiveContext);
}
return delegate;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,13 @@
package io.opentelemetry.javaagent.instrumentation.kafkastreams;

import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.currentContext;
import static io.opentelemetry.javaagent.instrumentation.api.Java8BytecodeBridge.wrapSpan;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.KafkaStreamsSingletons.instrumenter;
import static io.opentelemetry.javaagent.instrumentation.kafkastreams.StateHolder.HOLDER;
import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPackagePrivate;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
Expand Down Expand Up @@ -58,13 +56,11 @@ public static void onExit(@Advice.Return StampedRecord record) {
return;
}

Context receiveContext =
VirtualField.find(ConsumerRecord.class, Context.class).get(record.value);

// use the receive CONSUMER span as parent if it's available
Context parentContext = currentContext();
SpanContext receiveSpanContext =
VirtualField.find(ConsumerRecord.class, SpanContext.class).get(record.value);
if (receiveSpanContext != null) {
parentContext = parentContext.with(wrapSpan(receiveSpanContext));
}
Context parentContext = receiveContext != null ? receiveContext : currentContext();

if (!instrumenter().shouldStart(parentContext, record.value)) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
Expand Down Expand Up @@ -62,9 +62,9 @@ public static void onExit(
}

// copy the receive CONSUMER span association
VirtualField<ConsumerRecord, SpanContext> singleRecordReceiveSpan =
VirtualField.find(ConsumerRecord.class, SpanContext.class);
singleRecordReceiveSpan.set(result, singleRecordReceiveSpan.get(incoming));
VirtualField<ConsumerRecord, Context> singleRecordReceiveContext =
VirtualField.find(ConsumerRecord.class, Context.class);
singleRecordReceiveContext.set(result, singleRecordReceiveContext.get(incoming));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
Expand Down Expand Up @@ -57,9 +57,9 @@ public static void saveHeaders(
}

// copy the receive CONSUMER span association
VirtualField<ConsumerRecord, SpanContext> singleRecordReceiveSpan =
VirtualField.find(ConsumerRecord.class, SpanContext.class);
singleRecordReceiveSpan.set(result, singleRecordReceiveSpan.get(incoming));
VirtualField<ConsumerRecord, Context> singleRecordReceiveContext =
VirtualField.find(ConsumerRecord.class, Context.class);
singleRecordReceiveContext.set(result, singleRecordReceiveContext.get(incoming));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.returns;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.javaagent.bootstrap.kafka.KafkaClientsConsumerProcessTracing;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
Expand Down Expand Up @@ -47,17 +47,16 @@ public static void onExit(@Advice.Return ConsumerRecords<?, ?> records) {
return;
}

SpanContext receiveSpanContext =
VirtualField.find(ConsumerRecords.class, SpanContext.class).get(records);
if (receiveSpanContext == null) {
Context receiveContext = VirtualField.find(ConsumerRecords.class, Context.class).get(records);
if (receiveContext == null) {
return;
}

VirtualField<ConsumerRecord<?, ?>, SpanContext> singleRecordReceiveSpan =
VirtualField.find(ConsumerRecord.class, SpanContext.class);
VirtualField<ConsumerRecord<?, ?>, Context> singleRecordReceiveContext =
VirtualField.find(ConsumerRecord.class, Context.class);

for (ConsumerRecord<?, ?> record : records) {
singleRecordReceiveSpan.set(record, receiveSpanContext);
singleRecordReceiveContext.set(record, receiveContext);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArguments;

import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.api.field.VirtualField;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
Expand Down Expand Up @@ -43,11 +43,11 @@ public static class GetBatchInterceptorAdvice {
@Advice.OnMethodExit(suppress = Throwable.class)
public static void onExit(@Advice.Return(readOnly = false) BatchInterceptor<?, ?> interceptor) {
if (!(interceptor instanceof InstrumentedBatchInterceptor)) {
VirtualField receiveSpanVirtualField =
VirtualField.find(ConsumerRecords.class, SpanContext.class);
VirtualField receiveContextVirtualField =
VirtualField.find(ConsumerRecords.class, Context.class);
VirtualField stateStore = VirtualField.find(ConsumerRecords.class, State.class);
interceptor =
new InstrumentedBatchInterceptor<>(receiveSpanVirtualField, stateStore, interceptor);
new InstrumentedBatchInterceptor<>(receiveContextVirtualField, stateStore, interceptor);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@

import static io.opentelemetry.javaagent.instrumentation.spring.kafka.SpringKafkaSingletons.processInstrumenter;

import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.SpanContext;
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.api.field.VirtualField;
Expand All @@ -18,15 +16,15 @@
import org.springframework.kafka.listener.BatchInterceptor;

public final class InstrumentedBatchInterceptor<K, V> implements BatchInterceptor<K, V> {
private final VirtualField<ConsumerRecords<K, V>, SpanContext> receiveSpanVirtualField;
private final VirtualField<ConsumerRecords<K, V>, Context> receiveContextVirtualField;
private final VirtualField<ConsumerRecords<K, V>, State<K, V>> stateStore;
@Nullable private final BatchInterceptor<K, V> decorated;

public InstrumentedBatchInterceptor(
VirtualField<ConsumerRecords<K, V>, SpanContext> receiveSpanVirtualField,
VirtualField<ConsumerRecords<K, V>, Context> receiveContextVirtualField,
VirtualField<ConsumerRecords<K, V>, State<K, V>> stateStore,
@Nullable BatchInterceptor<K, V> decorated) {
this.receiveSpanVirtualField = receiveSpanVirtualField;
this.receiveContextVirtualField = receiveContextVirtualField;
this.stateStore = stateStore;
this.decorated = decorated;
}
Expand All @@ -45,13 +43,10 @@ public ConsumerRecords<K, V> intercept(ConsumerRecords<K, V> records, Consumer<K
}

private Context getParentContext(ConsumerRecords<K, V> records) {
Context parentContext = Context.current();
Context receiveContext = receiveContextVirtualField.get(records);

// use the receive CONSUMER span as parent if it's available
SpanContext receiveSpanContext = receiveSpanVirtualField.get(records);
if (receiveSpanContext != null) {
parentContext = parentContext.with(Span.wrap(receiveSpanContext));
}
return parentContext;
return receiveContext != null ? receiveContext : Context.current();
}

@Override
Expand Down
Loading

0 comments on commit 31fddb7

Please sign in to comment.