Skip to content

Commit

Permalink
Fix a few bugs related to non event-loop thread writes.
Browse files Browse the repository at this point in the history
- Incorrect HTTP server metrics report for non event-loop thread writes #5222
- HTTP/2 push is not flushed when written from a non event-loop thread #5223
  • Loading branch information
vietj committed Jun 4, 2024
1 parent b01d60b commit fc3c4da
Show file tree
Hide file tree
Showing 7 changed files with 97 additions and 38 deletions.
23 changes: 16 additions & 7 deletions src/main/java/io/vertx/core/http/impl/Http2ServerStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
*/
package io.vertx.core.http.impl;

import io.netty.channel.EventLoop;
import io.netty.handler.codec.http.HttpHeaderNames;
import io.netty.handler.codec.http.HttpHeaderValues;
import io.netty.handler.codec.http2.Http2Headers;
Expand All @@ -18,12 +19,10 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.HttpFrame;
import io.vertx.core.http.HttpMethod;
import io.vertx.core.http.HttpServerRequest;
import io.vertx.core.http.StreamPriority;
import io.vertx.core.http.impl.headers.Http2HeadersAdaptor;
import io.vertx.core.impl.ContextInternal;
import io.vertx.core.net.HostAndPort;
import io.vertx.core.net.impl.HostAndPortImpl;
import io.vertx.core.spi.metrics.HttpServerMetrics;
import io.vertx.core.spi.metrics.Metrics;
import io.vertx.core.spi.observability.HttpRequest;
Expand Down Expand Up @@ -238,13 +237,23 @@ public Object metric() {
return metric;
}

public HttpServerRequest routed(String route) {
public void routed(String route) {
if (METRICS_ENABLED) {
HttpServerMetrics metrics = conn.metrics();
if (metrics != null && !responseEnded) {
metrics.requestRouted(metric, route);
EventLoop eventLoop = vertx.getOrCreateContext().nettyEventLoop();
synchronized (this) {
if (!eventLoop.inEventLoop()) {
eventLoop.execute(() -> routedInternal(route));
return;
}
}
routedInternal(route);
}
}

private void routedInternal(String route) {
HttpServerMetrics metrics = conn.metrics();
if (metrics != null && !responseEnded) {
metrics.requestRouted(metric, route);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -338,14 +338,8 @@ io.netty.util.concurrent.Future<Integer> writePushPromise(int streamId, Http2Hea
future.setFailure(fut.cause());
}
});
EventExecutor executor = chctx.executor();
if (executor.inEventLoop()) {
_writePushPromise(streamId, promisedStreamId, headers, promise);
} else {
executor.execute(() -> {
_writePushPromise(streamId, promisedStreamId, headers, promise);
});
}
_writePushPromise(streamId, promisedStreamId, headers, promise);
checkFlush();
return future;
}

Expand Down
6 changes: 3 additions & 3 deletions src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,6 @@ protected void writeQueueDrained() {
conn.consumeCredits(this.stream, len);
}
});
bytesRead += data.length();
handleData(data);
}
});
Expand Down Expand Up @@ -146,6 +145,7 @@ void onHeaders(Http2Headers headers, StreamPriority streamPriority) {
}

void onData(Buffer data) {
bytesRead += data.length();
conn.reportBytesRead(data.length());
context.execute(data, pending::write);
}
Expand Down Expand Up @@ -237,10 +237,10 @@ public void cancel(Throwable cause) {
}

void doWriteHeaders(Http2Headers headers, boolean end, boolean checkFlush, Promise<Void> promise) {
conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive(), checkFlush, (FutureListener<Void>) promise);
if (end) {
endWritten();
}
conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive(), checkFlush, (FutureListener<Void>) promise);
}

protected void endWritten() {
Expand Down Expand Up @@ -273,10 +273,10 @@ void doWriteData(ByteBuf buf, boolean end, Promise<Void> promise) {
int numOfBytes = chunk.readableBytes();
bytesWritten += numOfBytes;
conn.reportBytesWritten(numOfBytes);
conn.handler.writeData(stream, chunk, end, (FutureListener<Void>) promise);
if (end) {
endWritten();
}
conn.handler.writeData(stream, chunk, end, (FutureListener<Void>) promise);
}

final void writeReset(long code) {
Expand Down
7 changes: 6 additions & 1 deletion src/test/java/io/vertx/core/http/Http1xMetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,19 @@
*/
package io.vertx.core.http;

import io.vertx.core.ThreadingModel;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

public class Http1xMetricsTest extends HttpMetricsTestBase {

public Http1xMetricsTest() {
super(HttpVersion.HTTP_1_1);
this(ThreadingModel.EVENT_LOOP);
}

protected Http1xMetricsTest(ThreadingModel threadingModel) {
super(HttpVersion.HTTP_1_1, threadingModel);
}

@Test
Expand Down
23 changes: 23 additions & 0 deletions src/test/java/io/vertx/core/http/Http1xWorkerMetricsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright (c) 2011-2019 Contributors to the Eclipse Foundation
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http:https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0
* which is available at https://www.apache.org/licenses/LICENSE-2.0.
*
* SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
*/
package io.vertx.core.http;

import io.vertx.core.ThreadingModel;
import org.junit.Test;

import java.util.concurrent.CountDownLatch;

public class Http1xWorkerMetricsTest extends Http1xMetricsTest {

public Http1xWorkerMetricsTest() {
super(ThreadingModel.EVENT_LOOP);
}
}
23 changes: 14 additions & 9 deletions src/test/java/io/vertx/core/http/Http2MetricsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@
*/
package io.vertx.core.http;

import io.vertx.core.ThreadingModel;
import io.vertx.test.core.TestUtils;
import io.vertx.test.fakemetrics.*;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
Expand All @@ -27,19 +29,21 @@ public class Http2MetricsTest extends HttpMetricsTestBase {
public static Collection<Object[]> params() {
ArrayList<Object[]> params = new ArrayList<>();
// h2
params.add(new Object[] { Http2TestBase.createHttp2ClientOptions(), Http2TestBase.createHttp2ServerOptions(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST) });
params.add(new Object[] { Http2TestBase.createHttp2ClientOptions(), Http2TestBase.createHttp2ServerOptions(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST), ThreadingModel.EVENT_LOOP });
// h2 + worker
params.add(new Object[] { Http2TestBase.createHttp2ClientOptions(), Http2TestBase.createHttp2ServerOptions(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST), ThreadingModel.WORKER });
// h2c with upgrade
params.add(new Object[] { new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(true), new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST) });
// h2c directq
params.add(new Object[] { new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(false), new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST) });
params.add(new Object[] { new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(true), new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST), ThreadingModel.EVENT_LOOP });
// h2c direct
params.add(new Object[] { new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(false), new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST), ThreadingModel.EVENT_LOOP });
return params;
}

private HttpClientOptions clientOptions;
private HttpServerOptions serverOptions;

public Http2MetricsTest(HttpClientOptions clientOptions, HttpServerOptions serverOptions) {
super(HttpVersion.HTTP_2);
public Http2MetricsTest(HttpClientOptions clientOptions, HttpServerOptions serverOptions, ThreadingModel threadingModel) {
super(HttpVersion.HTTP_2, threadingModel);

this.clientOptions = clientOptions;
this.serverOptions = serverOptions.setHandle100ContinueAutomatically(true);
Expand Down Expand Up @@ -70,10 +74,11 @@ public void testPushPromise() throws Exception {
AtomicInteger numBuffer = new AtomicInteger(numBuffers);
vertx.setPeriodic(1, timerID -> {
if (numBuffer.getAndDecrement() == 0) {
pushedResp.end();
assertNull(serverMetrics.getResponseMetric("/wibble"));
pushedResp.end().onComplete(onSuccess(v -> {
assertNull(serverMetrics.getResponseMetric("/wibble"));
complete();
}));
vertx.cancelTimer(timerID);
complete();
} else {
pushedResp.write(TestUtils.randomBuffer(1000));
}
Expand Down
43 changes: 33 additions & 10 deletions src/test/java/io/vertx/core/http/HttpMetricsTestBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
import io.vertx.core.buffer.Buffer;
import io.vertx.core.http.impl.HttpClientInternal;
import io.vertx.core.http.impl.HttpServerRequestInternal;
import io.vertx.core.impl.VertxInternal;
import io.vertx.core.metrics.MetricsOptions;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.SocketAddress;
Expand Down Expand Up @@ -44,9 +45,19 @@
public abstract class HttpMetricsTestBase extends HttpTestBase {

private final HttpVersion protocol;
private final ThreadingModel threadingModel;

public HttpMetricsTestBase(HttpVersion protocol) {
public HttpMetricsTestBase(HttpVersion protocol, ThreadingModel threadingModel) {
this.protocol = protocol;
this.threadingModel = threadingModel;
}

@Override
protected void startServer(SocketAddress bindAddress, Context context, HttpServer server) throws Exception {
if (threadingModel == ThreadingModel.WORKER) {
context = ((VertxInternal) vertx).createWorkerContext();
}
super.startServer(bindAddress, context, server);
}

@Override
Expand Down Expand Up @@ -88,6 +99,8 @@ public void testHttpMetricsLifecycle() throws Exception {
assertTrue(serverMetric.get().socket.connected.get());
assertNull(serverMetric.get().route.get());
req.routed("/route/:param");
// Worker can wait
assertWaitUntil(() -> serverMetric.get().route.get() != null);
assertEquals("/route/:param", serverMetric.get().route.get());
req.bodyHandler(buff -> {
assertEquals(contentLength, buff.length());
Expand All @@ -98,14 +111,21 @@ public void testHttpMetricsLifecycle() throws Exception {
vertx.setPeriodic(1, timerID -> {
Buffer chunk = TestUtils.randomBuffer(chunkSize);
if (numBuffer.decrementAndGet() == 0) {
resp.end(chunk);
assertTrue(serverMetric.get().responseEnded.get());
assertEquals(contentLength, serverMetric.get().bytesWritten.get());
assertNull(serverMetrics.getRequestMetric(req));
resp
.end(chunk)
.onComplete(onSuccess(v -> {
assertTrue(serverMetric.get().responseEnded.get());
assertFalse(serverMetric.get().failed.get());
assertEquals(contentLength, serverMetric.get().bytesWritten.get());
assertNull(serverMetrics.getRequestMetric(req));
}));
vertx.cancelTimer(timerID);
} else {
resp.write(chunk);
assertSame(serverMetric.get().response.get(), resp);
resp
.write(chunk)
.onComplete(onSuccess(v -> {
assertSame(serverMetric.get().response.get(), resp);
}));
}
});
});
Expand Down Expand Up @@ -210,9 +230,7 @@ public void testHttpClientLifecycle() throws Exception {
});
});
});
CountDownLatch listenLatch = new CountDownLatch(1);
server.listen(HttpTestBase.DEFAULT_HTTP_PORT, "localhost").onComplete(onSuccess(s -> { listenLatch.countDown(); }));
awaitLatch(listenLatch);
startServer(testAddress);
FakeHttpClientMetrics clientMetrics = FakeMetricsBase.getMetrics(client);
CountDownLatch responseBeginLatch = new CountDownLatch(1);
CountDownLatch responseEndLatch = new CountDownLatch(1);
Expand Down Expand Up @@ -306,8 +324,13 @@ public void testRouteMetrics() throws Exception {
HttpServerMetric metric = metrics.getRequestMetric(req);
assertNull(metric.route.get());
req.routed("MyRoute");
// Worker can wait
assertWaitUntil(() -> metric.route.get() != null);
assertEquals("MyRoute", metric.route.get());
metric.route.set(null);
req.routed("MyRoute - rerouted");
// Worker can wait
assertWaitUntil(() -> metric.route.get() != null);
assertEquals("MyRoute - rerouted", metric.route.get());
req.response().end();
testComplete();
Expand Down

0 comments on commit fc3c4da

Please sign in to comment.