Skip to content

Commit

Permalink
Clear context before flux retry (#8456)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurit committed May 10, 2023
1 parent 178b285 commit 24b65ab
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
* https://github.com/opentracing-contrib/java-reactor/blob/master/src/main/java/io/opentracing/contrib/reactor/TracedSubscriber.java
*/
public class TracingSubscriber<T> implements CoreSubscriber<T> {
private static final Class<?> fluxRetrySubscriberClass = getFluxRetrySubscriberClass();
private static final Class<?> fluxRetryWhenSubscriberClass = getFluxRetryWhenSubscriberClass();
private final io.opentelemetry.context.Context traceContext;
private final Subscriber<? super T> subscriber;
private final Context context;
Expand Down Expand Up @@ -64,7 +66,15 @@ public void onNext(T o) {

@Override
public void onError(Throwable throwable) {
withActiveSpan(() -> subscriber.onError(throwable));
if (!hasContextToPropagate
&& (fluxRetrySubscriberClass == subscriber.getClass()
|| fluxRetryWhenSubscriberClass == subscriber.getClass())) {
// clear context for retry to avoid having retried operations run with currently active
// context as parent context
withActiveSpan(io.opentelemetry.context.Context.root(), () -> subscriber.onError(throwable));
} else {
withActiveSpan(() -> subscriber.onError(throwable));
}
}

@Override
Expand All @@ -78,12 +88,32 @@ public Context currentContext() {
}

private void withActiveSpan(Runnable runnable) {
if (hasContextToPropagate) {
try (Scope ignored = traceContext.makeCurrent()) {
withActiveSpan(hasContextToPropagate ? traceContext : null, runnable);
}

private static void withActiveSpan(io.opentelemetry.context.Context context, Runnable runnable) {
if (context != null) {
try (Scope ignored = context.makeCurrent()) {
runnable.run();
}
} else {
runnable.run();
}
}

private static Class<?> getFluxRetrySubscriberClass() {
try {
return Class.forName("reactor.core.publisher.FluxRetry$RetrySubscriber");
} catch (ClassNotFoundException exception) {
return null;
}
}

private static Class<?> getFluxRetryWhenSubscriberClass() {
try {
return Class.forName("reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber");
} catch (ClassNotFoundException exception) {
return null;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package io.opentelemetry.instrumentation.reactor.v3_1;

import static io.opentelemetry.instrumentation.testing.util.TelemetryDataUtil.orderByRootSpanName;
import static io.opentelemetry.sdk.testing.assertj.OpenTelemetryAssertions.attributeEntry;
import static java.lang.invoke.MethodType.methodType;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -20,12 +21,17 @@
import io.opentelemetry.instrumentation.testing.junit.LibraryInstrumentationExtension;
import java.lang.invoke.MethodHandle;
import java.lang.invoke.MethodHandles;
import java.lang.reflect.Method;
import java.time.Duration;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Function;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
import org.reactivestreams.Publisher;
import reactor.core.Scannable;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
Expand Down Expand Up @@ -406,6 +412,133 @@ void doesNotOverrideInnerCurrentSpansWithThereIsOuterCurrent() {
.hasAttributes(attributeEntry("onNext", true))));
}

@ParameterizedTest
@ValueSource(strings = {"retry", "retryWhen"})
void doesNotLeakContextOnRetry(String retryKind) {
// retry calls subscribe again from onError where we have active context, check that this
// context is not used as parent for retried operations
AtomicBoolean beforeRetry = new AtomicBoolean(true);
Flux<Integer> publish =
Flux.create(
sink -> {
for (int i = 0; i < 2; i++) {
int index = i;
testing.runWithSpan(
"produce " + (beforeRetry.get() ? "before" : "after") + " retry " + i,
() -> sink.next(index));
}
});

Flux<Integer> flux =
Flux.defer(() -> publish.delaySubscription(Duration.ofMillis(1)))
.doOnNext(message -> testing.runWithSpan("process " + message, () -> {}))
.handle(
(message, sink) -> {
if (message == 1 && beforeRetry.compareAndSet(true, false)) {
sink.error(new RuntimeException("Message has error"));
} else {
sink.next(message);
}
});

switch (retryKind) {
case "retry":
flux = flux.retry();
break;
case "retryWhen":
flux = retryWhen(flux);
break;
default:
throw new IllegalStateException("Unsupported retry kind " + retryKind);
}

flux.subscribe();

testing.waitAndAssertSortedTraces(
orderByRootSpanName(
"produce before retry 0",
"produce before retry 1",
"produce after retry 0",
"produce after retry 1"),
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("produce before retry 0").hasNoParent(),
span -> span.hasName("process 0").hasParent(trace.getSpan(0))),
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("produce before retry 1").hasNoParent(),
span -> span.hasName("process 1").hasParent(trace.getSpan(0))),
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("produce after retry 0").hasNoParent(),
span -> span.hasName("process 0").hasParent(trace.getSpan(0))),
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("produce after retry 1").hasNoParent(),
span -> span.hasName("process 1").hasParent(trace.getSpan(0))));
}

@Test
void retryWithParentSpan() {
AtomicBoolean beforeRetry = new AtomicBoolean(true);
Flux<Integer> publish =
Flux.create(
sink ->
testing.runWithSpan(
"produce " + (beforeRetry.get() ? "before" : "after") + " retry",
() -> sink.next(0)));

Flux<Object> flux =
Flux.defer(() -> publish.delaySubscription(Duration.ofMillis(1)))
.doOnNext(message -> testing.runWithSpan("process", () -> {}))
.handle(
(message, sink) -> {
if (beforeRetry.compareAndSet(true, false)) {
sink.error(new RuntimeException("Message has error"));
} else {
sink.next(message);
}
})
.retry();

testing.runWithSpan("parent", () -> flux.subscribe());

testing.waitAndAssertTraces(
trace ->
trace.hasSpansSatisfyingExactly(
span -> span.hasName("parent").hasNoParent(),
span -> span.hasName("produce before retry").hasParent(trace.getSpan(0)),
span -> span.hasName("process").hasParent(trace.getSpan(0)),
span -> span.hasName("produce after retry").hasParent(trace.getSpan(0)),
span -> span.hasName("process").hasParent(trace.getSpan(0))));
}

@SuppressWarnings("unchecked")
private static <T> Flux<T> retryWhen(Flux<T> flux) {
try {
Method method = Flux.class.getMethod("retryWhen", Function.class);
Function<Flux<Throwable>, ? extends Publisher<?>> function =
err -> Flux.create(sink -> sink.next(-1));
return (Flux<T>) method.invoke(flux, function);
} catch (NoSuchMethodException exception) {
// ignore
} catch (Exception exception) {
throw new IllegalStateException(exception);
}

try {
Class<?> retryClass = Class.forName("reactor.util.retry.Retry");
Method retryWhenMethod = Flux.class.getMethod("retryWhen", retryClass);
Method retrySpecMethod = retryClass.getMethod("indefinitely");
return (Flux<T>) retryWhenMethod.invoke(flux, retrySpecMethod.invoke(retryClass));
} catch (ClassNotFoundException | NoSuchMethodException exception) {
// ignore
} catch (Exception exception) {
throw new IllegalStateException(exception);
}
throw new IllegalStateException("Could not find retryWhen method");
}

@SuppressWarnings("unchecked")
private <T> Mono<T> monoSpan(Mono<T> mono, String spanName) {

Expand Down

0 comments on commit 24b65ab

Please sign in to comment.