Skip to content

Commit

Permalink
[FLINK-10415] Fail response future if RestClient connection becomes idle
Browse files Browse the repository at this point in the history
This commit adds a IdleStateHandler to the Netty pipeline of the RestClient. The
IdleStateHandler sends an IdleStateEvent if it detects that the connection is idle
for too long. If we see an IdleStateEvent, then we close the connection and fail
the json response future.
  • Loading branch information
tillrohrmann committed Sep 27, 2018
1 parent 559b8f1 commit 28591f3
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 13 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/rest_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
<td style="word-wrap: break-word;">15000</td>
<td>The maximum time in ms for the client to establish a TCP connection.</td>
</tr>
<tr>
<td><h5>rest.idleness-timeout</h5></td>
<td style="word-wrap: break-word;">300000</td>
<td>The maximum time in ms for a connection to stay idle before failing.</td>
</tr>
<tr>
<td><h5>rest.port</h5></td>
<td style="word-wrap: break-word;">8081</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,14 @@ public class RestOptions {
.defaultValue(15_000L)
.withDescription("The maximum time in ms for the client to establish a TCP connection.");

/**
* The maximum time in ms for a connection to stay idle before failing.
*/
public static final ConfigOption<Long> IDLENESS_TIMEOUT =
key("rest.idleness-timeout")
.defaultValue(5L * 60L * 1_000L) // 5 minutes
.withDescription("The maximum time in ms for a connection to stay idle before failing.");

/**
* The maximum content length that the server will handle.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.rest;

import java.io.IOException;

/**
* Exception which is thrown by the {@link RestClient} if a connection
* becomes idle.
*/
public class ConnectionIdleException extends IOException {

private static final long serialVersionUID = 5103778538635217293L;

public ConnectionIdleException(String message) {
super(message);
}

public ConnectionIdleException(String message, Throwable cause) {
super(message, cause);
}

public ConnectionIdleException(Throwable cause) {
super(cause);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.apache.flink.runtime.rest.versioning.RestAPIVersion;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -72,6 +73,8 @@
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.multipart.MemoryAttribute;
import org.apache.flink.shaded.netty4.io.netty.handler.ssl.SslHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.stream.ChunkedWriteHandler;
import org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateEvent;
import org.apache.flink.shaded.netty4.io.netty.handler.timeout.IdleStateHandler;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -118,16 +121,22 @@ public RestClient(RestClientConfiguration configuration, Executor executor) {
ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) {
// SSL should be the first handler in the pipeline
if (sslEngineFactory != null) {
socketChannel.pipeline().addLast("ssl", new SslHandler(sslEngineFactory.createSSLEngine()));
}
try {
// SSL should be the first handler in the pipeline
if (sslEngineFactory != null) {
socketChannel.pipeline().addLast("ssl", new SslHandler(sslEngineFactory.createSSLEngine()));
}

socketChannel.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
.addLast(new ChunkedWriteHandler()) // required for multipart-requests
.addLast(new ClientHandler());
socketChannel.pipeline()
.addLast(new HttpClientCodec())
.addLast(new HttpObjectAggregator(configuration.getMaxContentLength()))
.addLast(new ChunkedWriteHandler()) // required for multipart-requests
.addLast(new IdleStateHandler(configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), configuration.getIdlenessTimeout(), TimeUnit.MILLISECONDS))
.addLast(new ClientHandler());
} catch (Throwable t) {
t.printStackTrace();
ExceptionUtils.rethrow(t);
}
}
};
NioEventLoopGroup group = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-client-netty"));
Expand Down Expand Up @@ -453,6 +462,16 @@ public void channelInactive(ChannelHandlerContext ctx) {
ctx.close();
}

@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {
if (evt instanceof IdleStateEvent) {
jsonFuture.completeExceptionally(new ConnectionIdleException("Channel became idle."));
ctx.close();
} else {
super.userEventTriggered(ctx, evt);
}
}

@Override
public void exceptionCaught(final ChannelHandlerContext ctx, final Throwable cause) {
if (cause instanceof TooLongFrameException) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,15 +40,19 @@ public final class RestClientConfiguration {

private final long connectionTimeout;

private final long idlenessTimeout;

private final int maxContentLength;

private RestClientConfiguration(
@Nullable final SSLEngineFactory sslEngineFactory,
final long connectionTimeout,
final long idlenessTimeout,
final int maxContentLength) {
checkArgument(maxContentLength > 0, "maxContentLength must be positive, was: %d", maxContentLength);
this.sslEngineFactory = sslEngineFactory;
this.connectionTimeout = connectionTimeout;
this.idlenessTimeout = idlenessTimeout;
this.maxContentLength = maxContentLength;
}

Expand All @@ -63,12 +67,19 @@ public SSLEngineFactory getSslEngineFactory() {
}

/**
* @see RestOptions#CONNECTION_TIMEOUT
* {@see RestOptions#CONNECTION_TIMEOUT}.
*/
public long getConnectionTimeout() {
return connectionTimeout;
}

/**
* {@see RestOptions#IDLENESS_TIMEOUT}.
*/
public long getIdlenessTimeout() {
return idlenessTimeout;
}

/**
* Returns the max content length that the REST client endpoint could handle.
*
Expand Down Expand Up @@ -102,8 +113,10 @@ public static RestClientConfiguration fromConfiguration(Configuration config) th

final long connectionTimeout = config.getLong(RestOptions.CONNECTION_TIMEOUT);

final long idlenessTimeout = config.getLong(RestOptions.IDLENESS_TIMEOUT);

int maxContentLength = config.getInteger(RestOptions.CLIENT_MAX_CONTENT_LENGTH);

return new RestClientConfiguration(sslEngineFactory, connectionTimeout, maxContentLength);
return new RestClientConfiguration(sslEngineFactory, connectionTimeout, idlenessTimeout, maxContentLength);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -101,8 +101,10 @@ public void testInvalidVersionRejection() throws Exception {
*/
@Test
public void testConnectionClosedHandling() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);
try (final ServerSocket serverSocket = new ServerSocket(0);
final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) {
final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), TestingUtils.defaultExecutor())) {

final String targetAddress = "localhost";
final int targetPort = serverSocket.getLocalPort();
Expand Down Expand Up @@ -147,11 +149,13 @@ public void testConnectionClosedHandling() throws Exception {
*/
@Test
public void testRestClientClosedHandling() throws Exception {
final Configuration config = new Configuration();
config.setLong(RestOptions.IDLENESS_TIMEOUT, 5000L);

Socket connectionSocket = null;

try (final ServerSocket serverSocket = new ServerSocket(0);
final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(new Configuration()), TestingUtils.defaultExecutor())) {
final RestClient restClient = new RestClient(RestClientConfiguration.fromConfiguration(config), TestingUtils.defaultExecutor())) {

final String targetAddress = "localhost";
final int targetPort = serverSocket.getLocalPort();
Expand Down

0 comments on commit 28591f3

Please sign in to comment.