Skip to content

Commit

Permalink
[FLINK-8665] [rest] Let RpcEndpoint#postStop return completion future
Browse files Browse the repository at this point in the history
The RpcEndpoint#postStop method returns a CompletableFuture<Void> which is
completed once all post stop actions have completed. The termination future
of the respective RpcEndpoint is only completed afterwards.

This closes apache#5498.
  • Loading branch information
tillrohrmann committed Feb 23, 2018
1 parent bb306b9 commit d9b28e8
Show file tree
Hide file tree
Showing 17 changed files with 170 additions and 113 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -333,8 +333,7 @@ private void recoverWorkers() throws Exception {
}

@Override
public void postStop() throws Exception {
Exception exception = null;
public CompletableFuture<Void> postStop() {
FiniteDuration stopTimeout = new FiniteDuration(5L, TimeUnit.SECONDS);

CompletableFuture<Boolean> stopTaskMonitorFuture = stopActor(taskMonitor, stopTimeout);
Expand All @@ -355,22 +354,11 @@ public void postStop() throws Exception {
stopLaunchCoordinatorFuture,
stopReconciliationCoordinatorFuture);

// wait for the future to complete or to time out
try {
stopFuture.get();
} catch (Exception e) {
exception = e;
}

try {
super.postStop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}
final CompletableFuture<Void> terminationFuture = super.postStop();

if (exception != null) {
throw new ResourceManagerException("Could not properly shut down the ResourceManager.", exception);
}
return stopFuture.thenCombine(
terminationFuture,
(Void voidA, Void voidB) -> null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,40 +156,40 @@ public Dispatcher(
//------------------------------------------------------

@Override
public void postStop() throws Exception {
public CompletableFuture<Void> postStop() {
log.info("Stopping dispatcher {}.", getAddress());
Throwable exception = null;

clearState();
Exception exception = null;

try {
jobManagerSharedServices.shutdown();
} catch (Throwable t) {
exception = ExceptionUtils.firstOrSuppressed(t, exception);
clearState();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

try {
submittedJobGraphStore.stop();
jobManagerSharedServices.shutdown();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

try {
leaderElectionService.stop();
submittedJobGraphStore.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

try {
super.postStop();
leaderElectionService.stop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

if (exception != null) {
throw new FlinkException("Could not properly terminate the Dispatcher.", exception);
return FutureUtils.completedExceptionally(
new FlinkException("Could not properly terminate the Dispatcher.", exception));
} else {
return CompletableFuture.completedFuture(null);
}
log.info("Stopped dispatcher {}.", getAddress());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,6 @@
import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
import org.apache.flink.runtime.util.clock.SystemClock;
import org.apache.flink.runtime.webmonitor.WebMonitorUtils;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
Expand All @@ -123,7 +122,6 @@
import java.util.concurrent.CompletionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -387,8 +385,8 @@ public CompletableFuture<Acknowledge> suspend(final Exception cause, final Time
* Suspend the job and shutdown all other services including rpc.
*/
@Override
public void postStop() throws Exception {
log.info("Stopping the JobMaster for job " + jobGraph.getName() + '(' + jobGraph.getJobID() + ").");
public CompletableFuture<Void> postStop() {
log.info("Stopping the JobMaster for job {}({}).", jobGraph.getName(), jobGraph.getJobID());

// disconnect from all registered TaskExecutors
final Set<ResourceID> taskManagerResourceIds = new HashSet<>(registeredTaskManagers.keySet());
Expand All @@ -407,28 +405,8 @@ public void postStop() throws Exception {

// shut down will internally release all registered slots
slotPool.shutDown();
CompletableFuture<Void> terminationFuture = slotPool.getTerminationFuture();

Exception exception = null;

// wait for the slot pool shut down
try {
terminationFuture.get(rpcTimeout.toMilliseconds(), TimeUnit.MILLISECONDS);
} catch (Exception e) {
exception = e;
}

try {
super.postStop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

if (exception != null) {
throw exception;
}

log.info("Stopped the JobMaster for job " + jobGraph.getName() + '(' + jobGraph.getJobID() + ").");
return slotPool.getTerminationFuture();
}

//----------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws E
}

@Override
public void postStop() throws Exception {
public CompletableFuture<Void> postStop() {
// cancel all pending allocations
Set<AllocationID> allocationIds = pendingRequests.keySetB();

Expand All @@ -214,7 +214,7 @@ public void postStop() throws Exception {

clear();

super.postStop();
return CompletableFuture.completedFuture(null);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ public void start() throws Exception {
}

@Override
public void postStop() throws Exception {
public CompletableFuture<Void> postStop() {
Exception exception = null;

taskManagerHeartbeatManager.stop();
Expand All @@ -240,14 +240,11 @@ public void postStop() throws Exception {

clearState();

try {
super.postStop();
} catch (Exception e) {
exception = ExceptionUtils.firstOrSuppressed(e, exception);
}

if (exception != null) {
ExceptionUtils.rethrowException(exception, "Error while shutting the ResourceManager down.");
return FutureUtils.completedExceptionally(
new FlinkException("Could not properly shut down the ResourceManager.", exception));
} else {
return CompletableFuture.completedFuture(null);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
*
* @param <F> type of the fencing token
*/
public class FencedRpcEndpoint<F extends Serializable> extends RpcEndpoint {
public abstract class FencedRpcEndpoint<F extends Serializable> extends RpcEndpoint {

private volatile F fencingToken;
private volatile MainThreadExecutor fencedMainThreadExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,9 +141,10 @@ protected final void stop() {
*
* <p>IMPORTANT: This method should never be called directly by the user.
*
* @throws Exception if an error occurs. The exception is returned as result of the termination future.
* @return Future which is completed once all post stop actions are completed. If an error
* occurs this future is completed exceptionally
*/
public void postStop() throws Exception {}
public abstract CompletableFuture<Void> postStop();

/**
* Triggers the shut down of the rpc endpoint. The shut down is executed asynchronously.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.rpc.akka;

import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.MainThreadValidatorUtil;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
Expand Down Expand Up @@ -90,12 +91,11 @@ public void postStop() throws Exception {
mainThreadValidator.enterMainThread();

try {
Throwable shutdownThrowable = null;

CompletableFuture<Void> postStopFuture;
try {
rpcEndpoint.postStop();
postStopFuture = rpcEndpoint.postStop();
} catch (Throwable throwable) {
shutdownThrowable = throwable;
postStopFuture = FutureUtils.completedExceptionally(throwable);
}

super.postStop();
Expand All @@ -105,11 +105,14 @@ public void postStop() throws Exception {
// future.
// Complete the termination future so that others know that we've stopped.

if (shutdownThrowable != null) {
terminationFuture.completeExceptionally(shutdownThrowable);
} else {
terminationFuture.complete(null);
}
postStopFuture.whenComplete(
(Void value, Throwable throwable) -> {
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(null);
}
});
} finally {
mainThreadValidator.exitMainThread();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,7 +254,7 @@ public void start() throws Exception {
* Called to shut down the TaskManager. The method closes all TaskManager services.
*/
@Override
public void postStop() throws Exception {
public CompletableFuture<Void> postStop() {
log.info("Stopping TaskManager {}.", getAddress());

Throwable throwable = null;
Expand All @@ -281,17 +281,11 @@ public void postStop() throws Exception {
throwable = ExceptionUtils.firstOrSuppressed(t, throwable);
}

try {
super.postStop();
} catch (Throwable e) {
throwable = ExceptionUtils.firstOrSuppressed(e, throwable);
}

if (throwable != null) {
ExceptionUtils.rethrowException(throwable, "Error while shutting the TaskExecutor down.");
return FutureUtils.completedExceptionally(new FlinkException("Error while shutting the TaskExecutor down.", throwable));
} else {
return CompletableFuture.completedFuture(null);
}

log.info("Stopped TaskManager {}.", getAddress());
}

// ======================================================================
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ public void anotherCall() {
public boolean hasConcurrentAccess() {
return concurrentAccess;
}

@Override
public CompletableFuture<Void> postStop() {
return CompletableFuture.completedFuture(null);
}
}

public interface FencedTestGateway extends FencedRpcGateway<UUID> {
Expand Down Expand Up @@ -384,5 +389,10 @@ public CompletableFuture<Acknowledge> setNewFencingToken(UUID fencingToken, Time

return CompletableFuture.completedFuture(Acknowledge.get());
}

@Override
public CompletableFuture<Void> postStop() {
return CompletableFuture.completedFuture(null);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ protected FencedTestingEndpoint(RpcService rpcService, String value) {
this(rpcService, value, null);
}

@Override
public CompletableFuture<Void> postStop() {
return CompletableFuture.completedFuture(null);
}

protected FencedTestingEndpoint(RpcService rpcService, String value, UUID initialFencingToken) {
super(rpcService);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,11 @@ protected BaseEndpoint(RpcService rpcService, int foobarValue) {
public CompletableFuture<Integer> foobar() {
return CompletableFuture.completedFuture(foobarValue);
}

@Override
public CompletableFuture<Void> postStop() {
return CompletableFuture.completedFuture(null);
}
}

public static class ExtendedEndpoint extends BaseEndpoint implements ExtendedGateway, DifferentGateway {
Expand Down
Loading

0 comments on commit d9b28e8

Please sign in to comment.