Skip to content

Commit

Permalink
Instrument ContextPropagationOperator to bridge lib/agent calls (#4786)
Browse files Browse the repository at this point in the history
* Instrument ContextPropagationOperator to bridge lib/agent calls

* more tests

* clean up

* up

* lint

* more lint

* make runWithContext(Flux, ..) public

* lint
  • Loading branch information
lmolkova committed Dec 7, 2021
1 parent ebe4c65 commit 1995852
Show file tree
Hide file tree
Showing 6 changed files with 309 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ public static io.opentelemetry.context.Context getAgentContext(Context applicati
return io.opentelemetry.context.Context.root();
}

public static Context toApplicationContext(io.opentelemetry.context.Context agentContext) {
return new AgentContextWrapper(agentContext);
}

public static Context newContextWrapper(
io.opentelemetry.context.Context agentContext, Context applicationContext) {
if (applicationContext instanceof AgentContextWrapper) {
Expand Down Expand Up @@ -227,6 +231,10 @@ private static class AgentContextWrapper implements Context {
final io.opentelemetry.context.Context agentContext;
final Context applicationContext;

AgentContextWrapper(io.opentelemetry.context.Context agentContext) {
this(agentContext, agentContext.get(APPLICATION_CONTEXT));
}

AgentContextWrapper(io.opentelemetry.context.Context agentContext, Context applicationContext) {
if (applicationContext instanceof AgentContextWrapper) {
throw new IllegalStateException("Expected unwrapped context");
Expand Down
9 changes: 7 additions & 2 deletions instrumentation/reactor-3.1/javaagent/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ muzzle {
group.set("io.projectreactor")
module.set("reactor-core")
versions.set("[3.1.0.RELEASE,)")
extraDependency("io.opentelemetry:opentelemetry-api:1.0.0")
assertInverse.set(true)
}
}
Expand All @@ -18,11 +19,15 @@ tasks.withType<Test>().configureEach {

dependencies {
implementation(project(":instrumentation:reactor-3.1:library"))
library("io.projectreactor:reactor-core:3.1.0.RELEASE")

implementation(project(":instrumentation:opentelemetry-api:opentelemetry-api-1.0:javaagent"))

compileOnly(project(":javaagent-tooling"))
compileOnly(project(":instrumentation-api-annotation-support"))
compileOnly(project(path = ":opentelemetry-api-shaded-for-instrumenting", configuration = "shadow"))

testLibrary("io.projectreactor:reactor-core:3.1.0.RELEASE")
testLibrary("io.projectreactor:reactor-test:3.1.0.RELEASE")

testImplementation(project(":instrumentation:reactor-3.1:testing"))
testImplementation("io.opentelemetry:opentelemetry-extension-annotations")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.reactor;

import static net.bytebuddy.matcher.ElementMatchers.isMethod;
import static net.bytebuddy.matcher.ElementMatchers.isPublic;
import static net.bytebuddy.matcher.ElementMatchers.isStatic;
import static net.bytebuddy.matcher.ElementMatchers.named;
import static net.bytebuddy.matcher.ElementMatchers.namedOneOf;
import static net.bytebuddy.matcher.ElementMatchers.returns;
import static net.bytebuddy.matcher.ElementMatchers.takesArgument;

import application.io.opentelemetry.context.Context;
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.extension.instrumentation.TypeTransformer;
import io.opentelemetry.javaagent.instrumentation.opentelemetryapi.context.AgentContextStorage;
import net.bytebuddy.asm.Advice;
import net.bytebuddy.description.type.TypeDescription;
import net.bytebuddy.matcher.ElementMatcher;

public class ContextPropagationOperatorInstrumentation implements TypeInstrumentation {
@Override
public ElementMatcher<TypeDescription> typeMatcher() {
return named("application.io.opentelemetry.instrumentation.reactor.ContextPropagationOperator");
}

@Override
public void transform(TypeTransformer transformer) {
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(isStatic())
.and(named("storeOpenTelemetryContext"))
.and(takesArgument(0, named("reactor.util.context.Context")))
.and(takesArgument(1, named("application.io.opentelemetry.context.Context")))
.and(returns(named("reactor.util.context.Context"))),
ContextPropagationOperatorInstrumentation.class.getName() + "$StoreAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(isStatic())
.and(named("getOpenTelemetryContext"))
.and(takesArgument(0, named("reactor.util.context.Context")))
.and(takesArgument(1, named("application.io.opentelemetry.context.Context")))
.and(returns(named("application.io.opentelemetry.context.Context"))),
ContextPropagationOperatorInstrumentation.class.getName() + "$GetAdvice");
transformer.applyAdviceToMethod(
isMethod()
.and(isPublic())
.and(isStatic())
.and(named("runWithContext"))
.and(
takesArgument(
0, namedOneOf("reactor.core.publisher.Mono", "reactor.core.publisher.Flux")))
.and(takesArgument(1, named("application.io.opentelemetry.context.Context")))
.and(returns(namedOneOf("reactor.core.publisher.Mono", "reactor.core.publisher.Flux"))),
ContextPropagationOperatorInstrumentation.class.getName() + "$RunWithAdvice");
}

@SuppressWarnings("unused")
public static class StoreAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class, skipOn = Advice.OnDefaultValue.class)
public static boolean methodEnter() {
return false;
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit(
@Advice.Argument(0) reactor.util.context.Context reactorContext,
@Advice.Argument(1) Context applicationContext,
@Advice.Return(readOnly = false) reactor.util.context.Context updatedReactorContext) {
updatedReactorContext =
ContextPropagationOperator.storeOpenTelemetryContext(
reactorContext, AgentContextStorage.getAgentContext(applicationContext));
}
}

@SuppressWarnings("unused")
public static class GetAdvice {
@Advice.OnMethodEnter(skipOn = Advice.OnDefaultValue.class)
public static boolean methodEnter() {
return false;
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void methodExit(
@Advice.Argument(0) reactor.util.context.Context reactorContext,
@Advice.Argument(1) Context defaultContext,
@Advice.Return(readOnly = false) Context applicationContext) {

io.opentelemetry.context.Context agentContext =
ContextPropagationOperator.getOpenTelemetryContext(reactorContext, null);
if (agentContext == null) {
applicationContext = defaultContext;
} else {
applicationContext = AgentContextStorage.toApplicationContext(agentContext);
}
}
}

@SuppressWarnings("unused")
public static class RunWithAdvice {
@Advice.OnMethodEnter
public static void methodEnter(
@Advice.FieldValue(value = "enabled", readOnly = false) boolean enabled) {
enabled = true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package io.opentelemetry.javaagent.instrumentation.reactor;

import static java.util.Collections.singletonList;
import static java.util.Arrays.asList;

import com.google.auto.service.AutoService;
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
Expand All @@ -21,6 +21,6 @@ public ReactorInstrumentationModule() {

@Override
public List<TypeInstrumentation> typeInstrumentations() {
return singletonList(new HooksInstrumentation());
return asList(new HooksInstrumentation(), new ContextPropagationOperatorInstrumentation());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

import io.opentelemetry.api.GlobalOpenTelemetry
import io.opentelemetry.api.trace.Span
import io.opentelemetry.api.trace.SpanKind
import io.opentelemetry.context.Context
import io.opentelemetry.instrumentation.reactor.ContextPropagationOperator
import io.opentelemetry.instrumentation.reactor.TracedWithSpan
import io.opentelemetry.instrumentation.test.AgentInstrumentationSpecification
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import reactor.test.StepVerifier

import java.time.Duration

class ContextPropagationOperatorInstrumentationTest extends AgentInstrumentationSpecification {
def "store and get context"() {

def reactorContext = reactor.util.context.Context.empty()
def traceContext = Context.root()
setup:
runWithSpan("parent") { ->
reactorContext = ContextPropagationOperator.storeOpenTelemetryContext(reactorContext, Context.current())
traceContext = ContextPropagationOperator.getOpenTelemetryContext(reactorContext, null)
assert traceContext != null
Span.fromContext(traceContext).setAttribute("foo", "bar")
}

expect:
assert reactorContext.stream().count() == 1
assertTraces(1) {
trace(0, 1) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()

attributes {
"foo" "bar"
}
}
}
}
}

def "get missing context"() {
def traceContext = Context.root()
setup:
runWithSpan("parent") { ->
assert ContextPropagationOperator.getOpenTelemetryContext(reactor.util.context.Context.empty(), null) == null
traceContext = ContextPropagationOperator.getOpenTelemetryContext(reactor.util.context.Context.empty(), Context.current())
Span.fromContext(traceContext).setAttribute("foo", "bar")
}

expect:
assertTraces(1) {
trace(0, 1) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()

attributes {
"foo" "bar"
}
}
}
}
}

def "run Mono with context forces it to become current"() {
setup:
def result = Mono.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("parent").startSpan()
def outer = Mono.defer({ -> new TracedWithSpan().mono(Mono.just("Value") )});
return ContextPropagationOperator
.runWithContext(outer, Context.current().with(span))
.doFinally({ i -> span.end() })
})

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

expect:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
span(1) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
}
}
}

def "run Flux with context forces it to become current"() {
setup:
def result = Flux.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("parent").startSpan()
def outer = Flux.defer({ -> new TracedWithSpan().flux(Flux.just("Value") )});
return ContextPropagationOperator
.runWithContext(outer, Context.current().with(span))
.doFinally({ i -> span.end() })
})

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

expect:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
span(1) {
name "TracedWithSpan.flux"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
}
}
}

def "store context forces it to become current"() {
setup:
def result = Mono.defer({ ->
Span span = GlobalOpenTelemetry.getTracer("test").spanBuilder("parent").startSpan()

Mono.delay(Duration.ofMillis(1))
.flatMap({ t ->
// usual trick to force this to run under new TracingSubscriber with context written in the next call
new TracedWithSpan().mono(Mono.just("Value"))
})
.subscriberContext({ ctx ->
ContextPropagationOperator.storeOpenTelemetryContext(ctx, Context.current().with(span))
})
.doFinally({ i -> span.end() })
})

StepVerifier.create(result)
.expectNext("Value")
.verifyComplete()

expect:
assertTraces(1) {
trace(0, 2) {
span(0) {
name "parent"
kind SpanKind.INTERNAL
hasNoParent()
}
span(1) {
name "TracedWithSpan.mono"
kind SpanKind.INTERNAL
childOf span(0)
attributes {
}
}
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public static <T> Mono<T> runWithContext(Mono<T> publisher, Context tracingConte
}

/** Forces Flux to run in traceContext scope. */
static <T> Flux<T> runWithContext(Flux<T> publisher, Context tracingContext) {
public static <T> Flux<T> runWithContext(Flux<T> publisher, Context tracingContext) {
if (!enabled) {
return publisher;
}
Expand Down

0 comments on commit 1995852

Please sign in to comment.