Skip to content

Commit

Permalink
[FLINK-8675] Add non-blocking shut down method to RestServerEndpoint
Browse files Browse the repository at this point in the history
Make shut down method of RestServerEndpoint non blocking.

This closes #5511.
  • Loading branch information
tillrohrmann committed Feb 18, 2018
1 parent c7c4198 commit c1aabd5
Show file tree
Hide file tree
Showing 6 changed files with 137 additions and 69 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -595,9 +595,12 @@ private class TestRestServerEndpoint extends RestServerEndpoint implements AutoC
return handlers;
}

@Override
protected void startInternal() throws Exception {}

@Override
public void close() throws Exception {
shutdown(Time.seconds(5));
shutDownAsync().get();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.Executor;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -284,6 +285,20 @@ public static CompletableFuture<Void> runAfterwards(CompletableFuture<?> future,
return runAfterwardsAsync(future, runnable, Executors.directExecutor());
}

/**
* Run the given action after the completion of the given future. The given future can be
* completed normally or exceptionally. In case of an exceptional completion the, the
* action's exception will be added to the initial exception.
*
* @param future to wait for its completion
* @param runnable action which is triggered after the future's completion
* @return Future which is completed after the action has completed. This future can contain an exception,
* if an error occurred in the given future or action.
*/
public static CompletableFuture<Void> runAfterwardsAsync(CompletableFuture<?> future, RunnableWithException runnable) {
return runAfterwardsAsync(future, runnable, ForkJoinPool.commonPool());
}

/**
* Run the given action after the completion of the given future. The given future can be
* completed normally or exceptionally. In case of an exceptional completion the, the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,7 @@ protected void stopClusterComponents() throws Exception {
Throwable exception = null;

if (webMonitorEndpoint != null) {
webMonitorEndpoint.shutdown(Time.seconds(10L));
webMonitorEndpoint.shutDownAsync().get();
}

if (dispatcherLeaderRetrievalService != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rest.handler.PipelineErrorHandler;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.RouterHandler;
Expand All @@ -45,13 +46,15 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;
import javax.net.ssl.SSLEngine;

import java.io.IOException;
import java.io.Serializable;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
Expand All @@ -73,11 +76,13 @@ public abstract class RestServerEndpoint {
private final SSLEngine sslEngine;
private final Path uploadDir;

private final CompletableFuture<Void> terminationFuture;

private ServerBootstrap bootstrap;
private Channel serverChannel;
private String restAddress;

private volatile boolean started;
private State state = State.CREATED;

public RestServerEndpoint(RestServerEndpointConfiguration configuration) throws IOException {
Preconditions.checkNotNull(configuration);
Expand All @@ -88,9 +93,9 @@ public RestServerEndpoint(RestServerEndpointConfiguration configuration) throws
this.uploadDir = configuration.getUploadDir();
createUploadDir(uploadDir, log);

this.restAddress = null;
terminationFuture = new CompletableFuture<>();

this.started = false;
this.restAddress = null;
}

/**
Expand All @@ -107,12 +112,9 @@ public RestServerEndpoint(RestServerEndpointConfiguration configuration) throws
*
* @throws Exception if we cannot start the RestServerEndpoint
*/
public void start() throws Exception {
public final void start() throws Exception {
synchronized (lock) {
if (started) {
// RestServerEndpoint already started
return;
}
Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted.");

log.info("Starting rest endpoint.");

Expand Down Expand Up @@ -192,28 +194,40 @@ protected void initChannel(SocketChannel ch) {

restAddressFuture.complete(restAddress);

started = true;
state = State.RUNNING;

startInternal();
}
}

/**
* Hook to start sub class specific services.
*
* @throws Exception if an error occurred
*/
protected abstract void startInternal() throws Exception;

/**
* Returns the address on which this endpoint is accepting requests.
*
* @return address on which this endpoint is accepting requests
* @return address on which this endpoint is accepting requests or null if none
*/
@Nullable
public InetSocketAddress getServerAddress() {
Preconditions.checkState(started, "The RestServerEndpoint has not been started yet.");
Channel server = this.serverChannel;

if (server != null) {
try {
return ((InetSocketAddress) server.localAddress());
} catch (Exception e) {
log.error("Cannot access local server address", e);
synchronized (lock) {
Preconditions.checkState(state != State.CREATED, "The RestServerEndpoint has not been started yet.");
Channel server = this.serverChannel;

if (server != null) {
try {
return ((InetSocketAddress) server.localAddress());
} catch (Exception e) {
log.error("Cannot access local server address", e);
}
}
}

return null;
return null;
}
}

/**
Expand All @@ -222,26 +236,49 @@ public InetSocketAddress getServerAddress() {
* @return REST address of this endpoint
*/
public String getRestAddress() {
Preconditions.checkState(started, "The RestServerEndpoint has not been started yet.");
return restAddress;
synchronized (lock) {
Preconditions.checkState(state != State.CREATED, "The RestServerEndpoint has not been started yet.");
return restAddress;
}
}

public final CompletableFuture<Void> shutDownAsync() {
synchronized (lock) {
log.info("Shutting down rest endpoint.");

if (state == State.RUNNING) {
final CompletableFuture<Void> shutDownFuture = shutDownInternal();

shutDownFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(null);
}
});
state = State.SHUTDOWN;
} else if (state == State.CREATED) {
terminationFuture.complete(null);
state = State.SHUTDOWN;
}

return terminationFuture;
}
}

/**
* Stops this REST server endpoint.
*
* @return Future which is completed once the shut down has been finished.
*/
public void shutdown(Time timeout) {
protected CompletableFuture<Void> shutDownInternal() {

synchronized (lock) {
if (!started) {
// RestServerEndpoint has not been started
return;
}

log.info("Shutting down rest endpoint.");

CompletableFuture<?> channelFuture = new CompletableFuture<>();
if (this.serverChannel != null) {
this.serverChannel.close().addListener(finished -> {
if (serverChannel != null) {
serverChannel.close().addListener(finished -> {
if (finished.isSuccess()) {
channelFuture.complete(null);
} else {
Expand All @@ -252,11 +289,12 @@ public void shutdown(Time timeout) {
}
CompletableFuture<?> groupFuture = new CompletableFuture<>();
CompletableFuture<?> childGroupFuture = new CompletableFuture<>();
final Time gracePeriod = Time.seconds(10L);

channelFuture.thenRun(() -> {
if (bootstrap != null) {
if (bootstrap.group() != null) {
bootstrap.group().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
bootstrap.group().shutdownGracefully(0L, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS)
.addListener(finished -> {
if (finished.isSuccess()) {
groupFuture.complete(null);
Expand All @@ -266,7 +304,7 @@ public void shutdown(Time timeout) {
});
}
if (bootstrap.childGroup() != null) {
bootstrap.childGroup().shutdownGracefully(0L, timeout.toMilliseconds(), TimeUnit.MILLISECONDS)
bootstrap.childGroup().shutdownGracefully(0L, gracePeriod.toMilliseconds(), TimeUnit.MILLISECONDS)
.addListener(finished -> {
if (finished.isSuccess()) {
childGroupFuture.complete(null);
Expand All @@ -283,22 +321,15 @@ public void shutdown(Time timeout) {
}
});

try {
CompletableFuture.allOf(groupFuture, childGroupFuture).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS);
log.info("Rest endpoint shutdown complete.");
} catch (Exception e) {
log.warn("Rest endpoint shutdown failed.", e);
}

restAddress = null;
started = false;
final CompletableFuture<Void> channelTerminationFuture = FutureUtils.completeAll(
Arrays.asList(groupFuture, childGroupFuture));

try {
log.info("Cleaning upload directory {}", uploadDir);
FileUtils.cleanDirectory(uploadDir.toFile());
} catch (IOException e) {
log.warn("Error while cleaning upload directory {}", uploadDir, e);
}
return FutureUtils.runAfterwards(
channelTerminationFuture,
() -> {
log.info("Cleaning upload directory {}", uploadDir);
FileUtils.cleanDirectory(uploadDir.toFile());
});
}
}

Expand Down Expand Up @@ -433,4 +464,10 @@ public int compare(String s1, String s2) {
}
}
}

private enum State {
CREATED,
RUNNING,
SHUTDOWN
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
Expand Down Expand Up @@ -113,6 +114,7 @@
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;

Expand Down Expand Up @@ -607,31 +609,39 @@ private ChannelInboundHandler createStaticFileHandler(
}

@Override
public void start() throws Exception {
super.start();
public void startInternal() throws Exception {
leaderElectionService.start(this);
}

@Override
public void shutdown(Time timeout) {
protected CompletableFuture<Void> shutDownInternal() {
executionGraphCache.close();

final File tmpDir = restConfiguration.getTmpDir();

try {
log.info("Removing cache directory {}", tmpDir);
FileUtils.deleteDirectory(tmpDir);
} catch (Throwable t) {
log.warn("Error while deleting cache directory {}", tmpDir, t);
}
final CompletableFuture<Void> shutdownFuture = super.shutDownInternal();

try {
leaderElectionService.stop();
} catch (Exception e) {
log.warn("Error while stopping leaderElectionService", e);
}
final File tmpDir = restConfiguration.getTmpDir();

super.shutdown(timeout);
return FutureUtils.runAfterwardsAsync(
shutdownFuture,
() -> {
Exception exception = null;
try {
log.info("Removing cache directory {}", tmpDir);
FileUtils.deleteDirectory(tmpDir);
} catch (Exception e) {
exception = e;
}

try {
leaderElectionService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

if (exception != null) {
throw exception;
}
});
}

//-------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,14 +136,14 @@ public void setup() throws Exception {
}

@After
public void teardown() {
public void teardown() throws Exception {
if (restClient != null) {
restClient.shutdown(timeout);
restClient = null;
}

if (serverEndpoint != null) {
serverEndpoint.shutdown(timeout);
serverEndpoint.shutDownAsync().get();
serverEndpoint = null;
}
}
Expand Down Expand Up @@ -316,6 +316,9 @@ protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initiali
Tuple2.of(new TestHeaders(), testHandler),
Tuple2.of(TestUploadHeaders.INSTANCE, testUploadHandler));
}

@Override
protected void startInternal() throws Exception {}
}

private static class TestHandler extends AbstractRestHandler<RestfulGateway, TestRequest, TestResponse, TestParameters> {
Expand Down

0 comments on commit c1aabd5

Please sign in to comment.