diff --git a/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java b/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java index fc1f40fd0e2..b8a1ae18bc0 100644 --- a/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java +++ b/src/main/java/io/vertx/core/http/impl/Http2ServerStream.java @@ -10,6 +10,7 @@ */ package io.vertx.core.http.impl; +import io.netty.channel.EventLoop; import io.netty.handler.codec.http.HttpHeaderNames; import io.netty.handler.codec.http.HttpHeaderValues; import io.netty.handler.codec.http2.Http2Headers; @@ -18,12 +19,10 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpFrame; import io.vertx.core.http.HttpMethod; -import io.vertx.core.http.HttpServerRequest; import io.vertx.core.http.StreamPriority; import io.vertx.core.http.impl.headers.Http2HeadersAdaptor; import io.vertx.core.impl.ContextInternal; import io.vertx.core.net.HostAndPort; -import io.vertx.core.net.impl.HostAndPortImpl; import io.vertx.core.spi.metrics.HttpServerMetrics; import io.vertx.core.spi.metrics.Metrics; import io.vertx.core.spi.observability.HttpRequest; @@ -238,13 +237,23 @@ public Object metric() { return metric; } - public HttpServerRequest routed(String route) { + public void routed(String route) { if (METRICS_ENABLED) { - HttpServerMetrics metrics = conn.metrics(); - if (metrics != null && !responseEnded) { - metrics.requestRouted(metric, route); + EventLoop eventLoop = vertx.getOrCreateContext().nettyEventLoop(); + synchronized (this) { + if (!eventLoop.inEventLoop()) { + eventLoop.execute(() -> routedInternal(route)); + return; + } } + routedInternal(route); + } + } + + private void routedInternal(String route) { + HttpServerMetrics metrics = conn.metrics(); + if (metrics != null && !responseEnded) { + metrics.requestRouted(metric, route); } - return null; } } diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java index 31e3c95c9bf..bcd4c9bfe25 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2ConnectionHandler.java @@ -338,14 +338,8 @@ io.netty.util.concurrent.Future writePushPromise(int streamId, Http2Hea future.setFailure(fut.cause()); } }); - EventExecutor executor = chctx.executor(); - if (executor.inEventLoop()) { - _writePushPromise(streamId, promisedStreamId, headers, promise); - } else { - executor.execute(() -> { - _writePushPromise(streamId, promisedStreamId, headers, promise); - }); - } + _writePushPromise(streamId, promisedStreamId, headers, promise); + checkFlush(); return future; } diff --git a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java index 7d7c2c6a2eb..652e66141f4 100644 --- a/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java +++ b/src/main/java/io/vertx/core/http/impl/VertxHttp2Stream.java @@ -98,7 +98,6 @@ protected void writeQueueDrained() { conn.consumeCredits(this.stream, len); } }); - bytesRead += data.length(); handleData(data); } }); @@ -146,6 +145,7 @@ void onHeaders(Http2Headers headers, StreamPriority streamPriority) { } void onData(Buffer data) { + bytesRead += data.length(); conn.reportBytesRead(data.length()); context.execute(data, pending::write); } @@ -237,10 +237,10 @@ public void cancel(Throwable cause) { } void doWriteHeaders(Http2Headers headers, boolean end, boolean checkFlush, Promise promise) { - conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive(), checkFlush, (FutureListener) promise); if (end) { endWritten(); } + conn.handler.writeHeaders(stream, headers, end, priority.getDependency(), priority.getWeight(), priority.isExclusive(), checkFlush, (FutureListener) promise); } protected void endWritten() { @@ -273,10 +273,10 @@ void doWriteData(ByteBuf buf, boolean end, Promise promise) { int numOfBytes = chunk.readableBytes(); bytesWritten += numOfBytes; conn.reportBytesWritten(numOfBytes); - conn.handler.writeData(stream, chunk, end, (FutureListener) promise); if (end) { endWritten(); } + conn.handler.writeData(stream, chunk, end, (FutureListener) promise); } final void writeReset(long code) { diff --git a/src/test/java/io/vertx/core/http/Http1xMetricsTest.java b/src/test/java/io/vertx/core/http/Http1xMetricsTest.java index c4b42ca084b..4153037bb1f 100644 --- a/src/test/java/io/vertx/core/http/Http1xMetricsTest.java +++ b/src/test/java/io/vertx/core/http/Http1xMetricsTest.java @@ -10,6 +10,7 @@ */ package io.vertx.core.http; +import io.vertx.core.ThreadingModel; import org.junit.Test; import java.util.concurrent.CountDownLatch; @@ -17,7 +18,11 @@ public class Http1xMetricsTest extends HttpMetricsTestBase { public Http1xMetricsTest() { - super(HttpVersion.HTTP_1_1); + this(ThreadingModel.EVENT_LOOP); + } + + protected Http1xMetricsTest(ThreadingModel threadingModel) { + super(HttpVersion.HTTP_1_1, threadingModel); } @Test diff --git a/src/test/java/io/vertx/core/http/Http1xWorkerMetricsTest.java b/src/test/java/io/vertx/core/http/Http1xWorkerMetricsTest.java new file mode 100644 index 00000000000..4b3ce819a30 --- /dev/null +++ b/src/test/java/io/vertx/core/http/Http1xWorkerMetricsTest.java @@ -0,0 +1,23 @@ +/* + * Copyright (c) 2011-2019 Contributors to the Eclipse Foundation + * + * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 + * which is available at https://www.apache.org/licenses/LICENSE-2.0. + * + * SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 + */ +package io.vertx.core.http; + +import io.vertx.core.ThreadingModel; +import org.junit.Test; + +import java.util.concurrent.CountDownLatch; + +public class Http1xWorkerMetricsTest extends Http1xMetricsTest { + + public Http1xWorkerMetricsTest() { + super(ThreadingModel.EVENT_LOOP); + } +} diff --git a/src/test/java/io/vertx/core/http/Http2MetricsTest.java b/src/test/java/io/vertx/core/http/Http2MetricsTest.java index bca4433174d..a60e0bff1d2 100644 --- a/src/test/java/io/vertx/core/http/Http2MetricsTest.java +++ b/src/test/java/io/vertx/core/http/Http2MetricsTest.java @@ -10,8 +10,10 @@ */ package io.vertx.core.http; +import io.vertx.core.ThreadingModel; import io.vertx.test.core.TestUtils; import io.vertx.test.fakemetrics.*; +import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -27,19 +29,21 @@ public class Http2MetricsTest extends HttpMetricsTestBase { public static Collection params() { ArrayList params = new ArrayList<>(); // h2 - params.add(new Object[] { Http2TestBase.createHttp2ClientOptions(), Http2TestBase.createHttp2ServerOptions(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST) }); + params.add(new Object[] { Http2TestBase.createHttp2ClientOptions(), Http2TestBase.createHttp2ServerOptions(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST), ThreadingModel.EVENT_LOOP }); + // h2 + worker + params.add(new Object[] { Http2TestBase.createHttp2ClientOptions(), Http2TestBase.createHttp2ServerOptions(HttpTestBase.DEFAULT_HTTP_PORT, HttpTestBase.DEFAULT_HTTP_HOST), ThreadingModel.WORKER }); // h2c with upgrade - params.add(new Object[] { new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(true), new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST) }); - // h2c directq - params.add(new Object[] { new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(false), new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST) }); + params.add(new Object[] { new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(true), new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST), ThreadingModel.EVENT_LOOP }); + // h2c direct + params.add(new Object[] { new HttpClientOptions().setProtocolVersion(HttpVersion.HTTP_2).setHttp2ClearTextUpgrade(false), new HttpServerOptions().setPort(HttpTestBase.DEFAULT_HTTP_PORT).setHost(HttpTestBase.DEFAULT_HTTP_HOST), ThreadingModel.EVENT_LOOP }); return params; } private HttpClientOptions clientOptions; private HttpServerOptions serverOptions; - public Http2MetricsTest(HttpClientOptions clientOptions, HttpServerOptions serverOptions) { - super(HttpVersion.HTTP_2); + public Http2MetricsTest(HttpClientOptions clientOptions, HttpServerOptions serverOptions, ThreadingModel threadingModel) { + super(HttpVersion.HTTP_2, threadingModel); this.clientOptions = clientOptions; this.serverOptions = serverOptions.setHandle100ContinueAutomatically(true); @@ -70,10 +74,11 @@ public void testPushPromise() throws Exception { AtomicInteger numBuffer = new AtomicInteger(numBuffers); vertx.setPeriodic(1, timerID -> { if (numBuffer.getAndDecrement() == 0) { - pushedResp.end(); - assertNull(serverMetrics.getResponseMetric("/wibble")); + pushedResp.end().onComplete(onSuccess(v -> { + assertNull(serverMetrics.getResponseMetric("/wibble")); + complete(); + })); vertx.cancelTimer(timerID); - complete(); } else { pushedResp.write(TestUtils.randomBuffer(1000)); } diff --git a/src/test/java/io/vertx/core/http/HttpMetricsTestBase.java b/src/test/java/io/vertx/core/http/HttpMetricsTestBase.java index 04e539d719c..e1db3122946 100644 --- a/src/test/java/io/vertx/core/http/HttpMetricsTestBase.java +++ b/src/test/java/io/vertx/core/http/HttpMetricsTestBase.java @@ -15,6 +15,7 @@ import io.vertx.core.buffer.Buffer; import io.vertx.core.http.impl.HttpClientInternal; import io.vertx.core.http.impl.HttpServerRequestInternal; +import io.vertx.core.impl.VertxInternal; import io.vertx.core.metrics.MetricsOptions; import io.vertx.core.net.NetClient; import io.vertx.core.net.SocketAddress; @@ -44,9 +45,19 @@ public abstract class HttpMetricsTestBase extends HttpTestBase { private final HttpVersion protocol; + private final ThreadingModel threadingModel; - public HttpMetricsTestBase(HttpVersion protocol) { + public HttpMetricsTestBase(HttpVersion protocol, ThreadingModel threadingModel) { this.protocol = protocol; + this.threadingModel = threadingModel; + } + + @Override + protected void startServer(SocketAddress bindAddress, Context context, HttpServer server) throws Exception { + if (threadingModel == ThreadingModel.WORKER) { + context = ((VertxInternal) vertx).createWorkerContext(); + } + super.startServer(bindAddress, context, server); } @Override @@ -88,6 +99,8 @@ public void testHttpMetricsLifecycle() throws Exception { assertTrue(serverMetric.get().socket.connected.get()); assertNull(serverMetric.get().route.get()); req.routed("/route/:param"); + // Worker can wait + assertWaitUntil(() -> serverMetric.get().route.get() != null); assertEquals("/route/:param", serverMetric.get().route.get()); req.bodyHandler(buff -> { assertEquals(contentLength, buff.length()); @@ -98,14 +111,21 @@ public void testHttpMetricsLifecycle() throws Exception { vertx.setPeriodic(1, timerID -> { Buffer chunk = TestUtils.randomBuffer(chunkSize); if (numBuffer.decrementAndGet() == 0) { - resp.end(chunk); - assertTrue(serverMetric.get().responseEnded.get()); - assertEquals(contentLength, serverMetric.get().bytesWritten.get()); - assertNull(serverMetrics.getRequestMetric(req)); + resp + .end(chunk) + .onComplete(onSuccess(v -> { + assertTrue(serverMetric.get().responseEnded.get()); + assertFalse(serverMetric.get().failed.get()); + assertEquals(contentLength, serverMetric.get().bytesWritten.get()); + assertNull(serverMetrics.getRequestMetric(req)); + })); vertx.cancelTimer(timerID); } else { - resp.write(chunk); - assertSame(serverMetric.get().response.get(), resp); + resp + .write(chunk) + .onComplete(onSuccess(v -> { + assertSame(serverMetric.get().response.get(), resp); + })); } }); }); @@ -210,9 +230,7 @@ public void testHttpClientLifecycle() throws Exception { }); }); }); - CountDownLatch listenLatch = new CountDownLatch(1); - server.listen(HttpTestBase.DEFAULT_HTTP_PORT, "localhost").onComplete(onSuccess(s -> { listenLatch.countDown(); })); - awaitLatch(listenLatch); + startServer(testAddress); FakeHttpClientMetrics clientMetrics = FakeMetricsBase.getMetrics(client); CountDownLatch responseBeginLatch = new CountDownLatch(1); CountDownLatch responseEndLatch = new CountDownLatch(1); @@ -306,8 +324,13 @@ public void testRouteMetrics() throws Exception { HttpServerMetric metric = metrics.getRequestMetric(req); assertNull(metric.route.get()); req.routed("MyRoute"); + // Worker can wait + assertWaitUntil(() -> metric.route.get() != null); assertEquals("MyRoute", metric.route.get()); + metric.route.set(null); req.routed("MyRoute - rerouted"); + // Worker can wait + assertWaitUntil(() -> metric.route.get() != null); assertEquals("MyRoute - rerouted", metric.route.get()); req.response().end(); testComplete();