Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change Async Servlet span end logic to fix race condition on Undertow #2992

Merged
merged 12 commits into from
May 26, 2021
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public class Jetty11HandlerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This Object source,
@Advice.Argument(value = 2, readOnly = false) HttpServletRequest request,
@Advice.Argument(2) HttpServletRequest request,
@Advice.Argument(3) HttpServletResponse response,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

Expand All @@ -31,6 +32,9 @@ public static void onEnter(

context = tracer().startServerSpan(request);
scope = context.makeCurrent();

// Must be set here since Jetty handlers can use startAsync outside of servlet scope.
tracer().setAsyncListenerResponse(request, response);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need to add calls to setAsyncListenerResponse in each app server instrumentation, or can we do it once in servlet instrumentation?

Copy link
Contributor Author

@agoallikmaa agoallikmaa May 23, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, since it is possible for requests to be handled entirely within app server handlers without servlet advices ever triggering.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can those requests have servlet async listeners?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Jetty handlers and Tomcat valves both expose the ServletRequest and are allowed to generate the response without a servlet ever being invoked. I have not tested using startAsync in them, but the documentation does not say that either of them would only support synchronous responses.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you add comments to the code explaining this? otherwise worried these lines could be removed thinking they aren't really needed, and I don't think we have any tests that it would break

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added those comments for all the appservers.

}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public class Jetty8HandlerAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void onEnter(
@Advice.This Object source,
@Advice.Argument(value = 2, readOnly = false) HttpServletRequest request,
@Advice.Argument(2) HttpServletRequest request,
@Advice.Argument(3) HttpServletResponse response,
@Advice.Local("otelContext") Context context,
@Advice.Local("otelScope") Scope scope) {

Expand All @@ -31,6 +32,9 @@ public static void onEnter(

context = tracer().startServerSpan(request);
scope = context.makeCurrent();

// Must be set here since Jetty handlers can use startAsync outside of servlet scope.
tracer().setAsyncListenerResponse(request, response);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,5 @@ apply from: "$rootDir/gradle/instrumentation.gradle"

dependencies {
api(project(':instrumentation:servlet:servlet-common:library'))
implementation(project(':instrumentation:servlet:servlet-common:javaagent'))
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.instrumentation.servlet.ServletAccessor;
import io.opentelemetry.instrumentation.servlet.ServletHttpServerTracer;
import io.opentelemetry.instrumentation.servlet.TagSettingAsyncListener;
import java.util.concurrent.atomic.AtomicBoolean;
import io.opentelemetry.javaagent.instrumentation.servlet.common.service.ServletAndFilterAdviceHelper;

public class JettyHandlerAdviceHelper {
/** Shared method exit implementation for Jetty handler advices. */
Expand Down Expand Up @@ -45,23 +43,7 @@ public static <REQUEST, RESPONSE> void stopSpan(
return;
}

AtomicBoolean responseHandled = new AtomicBoolean(false);
ServletAccessor<REQUEST, RESPONSE> servletAccessor = tracer.getServletAccessor();

// In case of async servlets wait for the actual response to be ready
if (servletAccessor.isRequestAsyncStarted(request)) {
try {
servletAccessor.addRequestAsyncListener(
request, new TagSettingAsyncListener<>(tracer, responseHandled, context));
} catch (IllegalStateException e) {
// org.eclipse.jetty.server.Request may throw an exception here if request became
// finished after check above. We just ignore that exception and move on.
}
}

// Check again in case the request finished before adding the listener.
if (!servletAccessor.isRequestAsyncStarted(request)
&& responseHandled.compareAndSet(false, true)) {
if (ServletAndFilterAdviceHelper.mustEndOnHandlerMethodExit(tracer, request)) {
tracer.end(context, response);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,6 @@ apply from: "$rootDir/gradle/instrumentation.gradle"
dependencies {
compileOnly "javax.servlet:javax.servlet-api:3.0.1"

implementation project(':instrumentation:servlet:servlet-common:javaagent')
implementation project(':instrumentation:servlet:servlet-3.0:javaagent')
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,7 @@

import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import io.opentelemetry.javaagent.instrumentation.servlet.v3_0.TagSettingAsyncListener;
import java.util.concurrent.atomic.AtomicBoolean;
import io.opentelemetry.javaagent.instrumentation.servlet.common.service.ServletAndFilterAdviceHelper;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
Expand All @@ -33,7 +32,7 @@ public static void onEnter(
// it is a bit too early to start span at this point because calling
// some methods on HttpServletRequest will give a NPE
// just remember the request and use it a bit later to start the span
ThreadLocalContext.startRequest(httpServletRequest);
ThreadLocalContext.startRequest(httpServletRequest, (HttpServletResponse) response);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down Expand Up @@ -68,17 +67,7 @@ public static void stopSpan(
return;
}

AtomicBoolean responseHandled = new AtomicBoolean(false);

// In case of async servlets wait for the actual response to be ready
if (request.isAsyncStarted()) {
request
.getAsyncContext()
.addListener(new TagSettingAsyncListener(responseHandled, context), request, response);
}

// Check again in case the request finished before adding the listener.
if (!request.isAsyncStarted() && responseHandled.compareAndSet(false, true)) {
if (ServletAndFilterAdviceHelper.mustEndOnHandlerMethodExit(tracer(), request)) {
tracer().end(context, response);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@ public static void onEnter() {

ctx.setContext(context);
ctx.setScope(scope);

// Must be set here since Liberty RequestProcessors can use startAsync outside of servlet scope.
tracer().setAsyncListenerResponse(ctx.getRequest(), ctx.getResponse());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,21 @@
import io.opentelemetry.context.Context;
import io.opentelemetry.context.Scope;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

public class ThreadLocalContext {

private static final ThreadLocal<ThreadLocalContext> local = new ThreadLocal<>();

private final HttpServletRequest req;
private final HttpServletRequest request;
private final HttpServletResponse response;
private Context context;
private Scope scope;
private boolean started;

private ThreadLocalContext(HttpServletRequest req) {
this.req = req;
private ThreadLocalContext(HttpServletRequest request, HttpServletResponse response) {
this.request = request;
this.response = response;
}

public Context getContext() {
Expand All @@ -39,7 +42,11 @@ public void setScope(Scope scope) {
}

public HttpServletRequest getRequest() {
return req;
return request;
}

public HttpServletResponse getResponse() {
return response;
}

/**
Expand All @@ -53,8 +60,8 @@ public boolean startSpan() {
return !b;
}

public static void startRequest(HttpServletRequest req) {
ThreadLocalContext ctx = new ThreadLocalContext(req);
public static void startRequest(HttpServletRequest request, HttpServletResponse response) {
ThreadLocalContext ctx = new ThreadLocalContext(request, response);
local.set(ctx);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,11 @@ public Integer getRequestRemotePort(HttpServletRequest httpServletRequest) {
return null;
}

@Override
public boolean isRequestAsyncStarted(HttpServletRequest request) {
return false;
}

@Override
public void addRequestAsyncListener(
HttpServletRequest request, ServletAsyncListener<ResponseWithStatus> listener) {
HttpServletRequest request,
ServletAsyncListener<ResponseWithStatus> listener,
Object response) {
throw new UnsupportedOperationException();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ public static void onEnter(

context = tracer().startSpan(httpServletRequest, mappingResolver, servlet);
scope = context.makeCurrent();

tracer().setAsyncListenerResponse(httpServletRequest, (HttpServletResponse) response);
}

@Advice.OnMethodExit(onThrowable = Throwable.class, suppress = Throwable.class)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Copyright The OpenTelemetry Authors
* SPDX-License-Identifier: Apache-2.0
*/

package io.opentelemetry.javaagent.instrumentation.servlet.v3_0;

import static io.opentelemetry.instrumentation.servlet.v3_0.Servlet3HttpServerTracer.tracer;

import io.opentelemetry.javaagent.instrumentation.api.CallDepthThreadLocalMap;
import javax.servlet.AsyncContext;
import javax.servlet.ServletRequest;
import javax.servlet.http.HttpServletRequest;
import net.bytebuddy.asm.Advice;

public class Servlet3AsyncStartAdvice {
@Advice.OnMethodEnter(suppress = Throwable.class)
public static void startAsyncEnter() {
// This allows to detect the outermost invocation of startAsync in method exit
trask marked this conversation as resolved.
Show resolved Hide resolved
CallDepthThreadLocalMap.incrementCallDepth(AsyncContext.class);
}

@Advice.OnMethodExit(suppress = Throwable.class)
public static void startAsyncExit(@Advice.This ServletRequest servletRequest) {
int callDepth = CallDepthThreadLocalMap.decrementCallDepth(AsyncContext.class);

if (callDepth != 0) {
// This is not the outermost invocation, ignore.
return;
}

if (servletRequest instanceof HttpServletRequest) {
HttpServletRequest request = (HttpServletRequest) servletRequest;

if (!tracer().isAsyncListenerAttached(request)) {
tracer().attachAsyncListener(request);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.async.AsyncContextInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.async.AsyncStartInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.service.ServletAndFilterInstrumentation;
import java.util.List;

Expand All @@ -30,7 +31,8 @@ public List<TypeInstrumentation> typeInstrumentations() {
BASE_PACKAGE,
adviceClassName(".Servlet3Advice"),
adviceClassName(".Servlet3InitAdvice"),
adviceClassName(".Servlet3FilterInitAdvice")));
adviceClassName(".Servlet3FilterInitAdvice")),
new AsyncStartInstrumentation(BASE_PACKAGE, adviceClassName(".Servlet3AsyncStartAdvice")));
}

private static String adviceClassName(String suffix) {
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,18 @@ public Integer getRequestRemotePort(HttpServletRequest request) {
return request.getRemotePort();
}

@Override
public boolean isRequestAsyncStarted(HttpServletRequest request) {
return request.isAsyncStarted();
}

@Override
public void addRequestAsyncListener(
HttpServletRequest request, ServletAsyncListener<HttpServletResponse> listener) {
request.getAsyncContext().addListener(new Listener(listener));
HttpServletRequest request,
ServletAsyncListener<HttpServletResponse> listener,
Object response) {
if (response instanceof HttpServletResponse) {
request
.getAsyncContext()
.addListener(new Listener(listener), request, (HttpServletResponse) response);
} else {
request.getAsyncContext().addListener(new Listener(listener));
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import io.opentelemetry.javaagent.extension.instrumentation.InstrumentationModule;
import io.opentelemetry.javaagent.extension.instrumentation.TypeInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.async.AsyncContextInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.async.AsyncStartInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.response.HttpServletResponseInstrumentation;
import io.opentelemetry.javaagent.instrumentation.servlet.common.service.ServletAndFilterInstrumentation;
import java.util.Arrays;
Expand All @@ -27,6 +28,7 @@ public List<TypeInstrumentation> typeInstrumentations() {
return Arrays.asList(
new AsyncContextInstrumentation(
BASE_PACKAGE, adviceClassName(".async.AsyncDispatchAdvice")),
new AsyncStartInstrumentation(BASE_PACKAGE, adviceClassName(".async.AsyncStartAdvice")),
new ServletAndFilterInstrumentation(
BASE_PACKAGE,
adviceClassName(".service.JakartaServletServiceAdvice"),
Expand Down
Loading