Skip to content

Commit

Permalink
Fix reactor-netty memory/connection leak (open-telemetry#4867)
Browse files Browse the repository at this point in the history
  • Loading branch information
Mateusz Rzeszutek authored and RashmiRam committed May 23, 2022
1 parent c252875 commit c8c5444
Show file tree
Hide file tree
Showing 2 changed files with 49 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,10 @@
import javax.annotation.Nullable;
import reactor.core.publisher.Mono;
import reactor.netty.Connection;
import reactor.netty.ConnectionObserver;
import reactor.netty.http.client.HttpClient;
import reactor.netty.http.client.HttpClientConfig;
import reactor.netty.http.client.HttpClientRequest;
import reactor.netty.http.client.HttpClientResponse;
import reactor.netty.http.client.HttpClientState;

public final class HttpResponseReceiverInstrumenter {

Expand All @@ -46,8 +44,9 @@ public static HttpClient.ResponseReceiver<?> instrument(HttpClient.ResponseRecei
client
.mapConnect(new StartOperation(contextHolder, config))
.doOnRequest(new PropagateContext(contextHolder))
.doOnRequestError(new EndOperationWithError(contextHolder, config))
.observe(new EndOperation(contextHolder, config));
.doOnRequestError(new EndOperationWithRequestError(contextHolder, config))
.doOnResponseError(new EndOperationWithResponseError(contextHolder, config))
.doAfterResponseSuccess(new EndOperationWithSuccess(contextHolder, config));

// modified should always be an HttpClientFinalizer too
if (modified instanceof HttpClient.ResponseReceiver) {
Expand Down Expand Up @@ -122,12 +121,13 @@ public void accept(HttpClientRequest httpClientRequest, Connection connection) {
}
}

static final class EndOperationWithError implements BiConsumer<HttpClientRequest, Throwable> {
static final class EndOperationWithRequestError
implements BiConsumer<HttpClientRequest, Throwable> {

private final ContextHolder contextHolder;
private final HttpClientConfig config;

EndOperationWithError(ContextHolder contextHolder, HttpClientConfig config) {
EndOperationWithRequestError(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
}
Expand All @@ -142,42 +142,44 @@ public void accept(HttpClientRequest httpClientRequest, Throwable error) {
}
}

static final class EndOperation implements ConnectionObserver {
static final class EndOperationWithResponseError
implements BiConsumer<HttpClientResponse, Throwable> {

private final ContextHolder contextHolder;
private final HttpClientConfig config;

EndOperation(ContextHolder contextHolder, HttpClientConfig config) {
EndOperationWithResponseError(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
}

@Override
public void onStateChange(Connection connection, State newState) {
if (newState != HttpClientState.RESPONSE_COMPLETED) {
return;
}

public void accept(HttpClientResponse response, Throwable error) {
Context context = contextHolder.context;
if (context == null) {
return;
}
instrumenter().end(context, config, response, error);
}
}

// connection is actually an instance of HttpClientOperations - a package private class that
// implements both Connection and HttpClientResponse
if (connection instanceof HttpClientResponse) {
HttpClientResponse response = (HttpClientResponse) connection;
instrumenter().end(context, config, response, null);
}
static final class EndOperationWithSuccess implements BiConsumer<HttpClientResponse, Connection> {

private final ContextHolder contextHolder;
private final HttpClientConfig config;

EndOperationWithSuccess(ContextHolder contextHolder, HttpClientConfig config) {
this.contextHolder = contextHolder;
this.config = config;
}

@Override
public void onUncaughtException(Connection connection, Throwable error) {
public void accept(HttpClientResponse response, Connection connection) {
Context context = contextHolder.context;
if (context == null) {
return;
}
instrumenter().end(context, config, null, error);
instrumenter().end(context, config, response, null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,32 @@ abstract class AbstractReactorNettyHttpClientTest extends HttpClientTest<HttpCli
}
}

def "should not leak connections"() {
given:
def uniqueChannelHashes = new HashSet<>()
def httpClient = createHttpClient()
.doOnConnect({ uniqueChannelHashes.add(it.channelHash())})
def uri = "http:https://localhost:${server.httpPort()}/success"

def count = 100

when:
(1..count).forEach({
runWithSpan("parent") {
def status = httpClient.get().uri(uri)
.responseSingle { resp, content ->
// Make sure to consume content since that's when we close the span.
content.map { resp.status().code() }
}.block()
assert status == 200
}
})

then:
traces.size() == count
uniqueChannelHashes.size() == 1
}

static void assertSameSpan(SpanData expected, AtomicReference<Span> actual) {
def expectedSpanContext = expected.spanContext
def actualSpanContext = actual.get().spanContext
Expand Down

0 comments on commit c8c5444

Please sign in to comment.