Skip to content

Commit

Permalink
Instrument Netty addTask to ensure complete coverage of async Runnabl…
Browse files Browse the repository at this point in the history
…es (#1348)

Newer versions of Netty introduce variants like execute(Runnable, boolean) which
aren't covered by the core execute(Runnable) instrumentation.  Fortunately they all
flow through to addTask(Runnable), which allows us to carry the context through properly.
  • Loading branch information
johnbley committed Oct 9, 2020
1 parent 9605789 commit b34fd49
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,11 @@ public Map<? extends ElementMatcher<? super MethodDescription>, String> transfor
transformers.put(
named("execute").and(takesArgument(0, Runnable.class)).and(takesArguments(1)),
JavaExecutorInstrumentation.class.getName() + "$SetExecuteRunnableStateAdvice");
// Netty uses addTask as the acutal core of their submission; there are non-standard variations
// like execute(Runnable,boolean) that aren't caught by standard instrumentation
transformers.put(
named("addTask").and(takesArgument(0, Runnable.class)).and(takesArguments(1)),
JavaExecutorInstrumentation.class.getName() + "$SetExecuteRunnableStateAdvice");
transformers.put(
named("execute").and(takesArgument(0, ForkJoinTask.class)),
JavaExecutorInstrumentation.class.getName() + "$SetJavaForkJoinStateAdvice");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,24 @@ import static io.opentelemetry.auto.test.utils.TraceUtils.basicSpan
import static io.opentelemetry.auto.test.utils.TraceUtils.runUnderTrace
import static org.asynchttpclient.Dsl.asyncHttpClient

import io.netty.bootstrap.Bootstrap
import io.netty.buffer.Unpooled
import io.netty.channel.AbstractChannel
import io.netty.channel.Channel
import io.netty.channel.ChannelHandler
import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInitializer
import io.netty.channel.ChannelPipeline
import io.netty.channel.EventLoopGroup
import io.netty.channel.embedded.EmbeddedChannel
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioSocketChannel
import io.netty.handler.codec.http.DefaultFullHttpRequest
import io.netty.handler.codec.http.HttpClientCodec
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpMethod
import io.netty.handler.codec.http.HttpVersion
import io.opentelemetry.auto.test.base.HttpClientTest
import io.opentelemetry.instrumentation.auto.netty.v4_1.client.HttpClientTracingHandler
import java.util.concurrent.ExecutionException
Expand Down Expand Up @@ -70,6 +81,68 @@ class Netty41ClientTest extends HttpClientTest {
return false
}

def "test connection reuse and second request with lazy execute"() {
setup:
//Create a simple Netty pipeline
EventLoopGroup group = new NioEventLoopGroup()
Bootstrap b = new Bootstrap()
b.group(group)
.channel(NioSocketChannel)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
ChannelPipeline pipeline = socketChannel.pipeline()
pipeline.addLast(new HttpClientCodec())
}
})
def request = new DefaultFullHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, server.address.resolve("/success").toString(), Unpooled.EMPTY_BUFFER)
request.headers().set(HttpHeaderNames.HOST, server.address.host)
request.headers().set(HttpHeaderNames.USER_AGENT, userAgent())
Channel ch = null

when:
// note that this is a purely asynchronous request
runUnderTrace("parent1") {
ch = b.connect(server.address.host, server.address.port).sync().channel()
ch.write(request)
ch.flush()
}
// This is a cheap/easy way to block/ensure that the first request has finished and check reported spans midway through
// the complex sequence of events
assertTraces(1) {
trace(0, 3) {
basicSpan(it, 0, "parent1")
clientSpan(it, 1, span(0))
serverSpan(it, 2, span(1))
}
}

then:
// now run a second request through the same channel
runUnderTrace("parent2") {
ch.write(request)
ch.flush()
}

assertTraces(2) {
trace(0, 3) {
basicSpan(it, 0, "parent1")
clientSpan(it, 1, span(0))
serverSpan(it, 2, span(1))
}
trace(1, 3) {
basicSpan(it, 0, "parent2")
clientSpan(it, 1, span(0))
serverSpan(it, 2, span(1))
}
}


cleanup:
group.shutdownGracefully()
}


def "connection error (unopened port)"() {
given:
def uri = new URI("http://localhost:$UNUSABLE_PORT/")
Expand Down

0 comments on commit b34fd49

Please sign in to comment.