Skip to content

Commit

Permalink
[FLINK-11551][rpc] Allow RpcEndpoint to execute asynchronous stop ope…
Browse files Browse the repository at this point in the history
…rations

Replace RpcEndpoin#postStop with RpcEndpoint#onStop method which can execute asynchronous
operations in the main thread executor. onStop returns a future which will terminate the
RpcEndpoint once it is completed.

The new stop semantics allow to execute complex shut down logic which requires asynchronous
operations.

This closes apache#7665.
  • Loading branch information
tillrohrmann committed Feb 12, 2019
1 parent 00c3730 commit 3d20007
Show file tree
Hide file tree
Showing 23 changed files with 449 additions and 181 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,8 @@ private CompletableFuture<Void> stopSupportingActorsAsync() {
}

@Override
public CompletableFuture<Void> postStop() {
return stopSupportingActorsAsync().thenCompose((ignored) -> super.postStop());
public CompletableFuture<Void> onStop() {
return stopSupportingActorsAsync().thenCompose((ignored) -> super.onStop());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,7 @@ public Dispatcher(
//------------------------------------------------------

@Override
public CompletableFuture<Void> postStop() {
public CompletableFuture<Void> onStop() {
log.info("Stopping dispatcher {}.", getAddress());

final CompletableFuture<Void> allJobManagerRunnersTerminationFuture = terminateJobManagerRunnersAndGetTerminationFuture();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -347,7 +347,7 @@ public CompletableFuture<Acknowledge> suspend(final Exception cause) {
* Suspend the job and shutdown all other services including rpc.
*/
@Override
public CompletableFuture<Void> postStop() {
public CompletableFuture<Void> onStop() {
log.info("Stopping the JobMaster for job {}({}).", jobGraph.getName(), jobGraph.getJobID());

// disconnect from all registered TaskExecutors
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ public void start(JobMasterId jobMasterId, String newJobManagerAddress) throws E
}

@Override
public CompletableFuture<Void> postStop() {
public CompletableFuture<Void> onStop() {
log.info("Stopping SlotPool.");
// cancel all pending allocations
Set<AllocationID> allocationIds = pendingRequests.keySetB();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ public void start() throws Exception {
}

@Override
public CompletableFuture<Void> postStop() {
public CompletableFuture<Void> onStop() {
Exception exception = null;

taskManagerHeartbeatManager.stop();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,9 @@ protected final void stop() {
* @return Future which is completed once all post stop actions are completed. If an error
* occurs this future is completed exceptionally
*/
public abstract CompletableFuture<Void> postStop();
public CompletableFuture<Void> onStop() {
return CompletableFuture.completedFuture(null);
}

/**
* Triggers the shut down of the rpc endpoint. The shut down is executed asynchronously.
Expand All @@ -159,6 +161,11 @@ public final void shutDown() {
rpcService.stopServer(rpcServer);
}

public final CompletableFuture<Void> terminate() {
rpcService.stopServer(rpcServer);
return getTerminationFuture();
}

// ------------------------------------------------------------------------
// Basic RPC endpoint properties
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
Expand Down Expand Up @@ -172,12 +171,12 @@ public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout

@Override
public void start() {
rpcEndpoint.tell(Processing.START, ActorRef.noSender());
rpcEndpoint.tell(ControlMessages.START, ActorRef.noSender());
}

@Override
public void stop() {
rpcEndpoint.tell(Processing.STOP, ActorRef.noSender());
rpcEndpoint.tell(ControlMessages.STOP, ActorRef.noSender());
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaHandshakeException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcInvalidStateException;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.exceptions.RpcConnectionException;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.HandshakeSuccessMessage;
Expand All @@ -38,12 +38,16 @@
import org.apache.flink.util.SerializedValue;

import akka.actor.ActorRef;
import akka.actor.Kill;
import akka.actor.Status;
import akka.actor.UntypedActor;
import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
Expand All @@ -59,16 +63,16 @@

/**
* Akka rpc actor which receives {@link LocalRpcInvocation}, {@link RunAsync} and {@link CallAsync}
* {@link Processing} messages.
* {@link ControlMessages} messages.
*
* <p>The {@link LocalRpcInvocation} designates a rpc and is dispatched to the given {@link RpcEndpoint}
* instance.
*
* <p>The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed
* in the context of the actor thread.
*
* <p>The {@link Processing} message controls the processing behaviour of the akka rpc actor. A
* {@link Processing#START} starts processing incoming messages. A {@link Processing#STOP} message
* <p>The {@link ControlMessages} message controls the processing behaviour of the akka rpc actor. A
* {@link ControlMessages#START} starts processing incoming messages. A {@link ControlMessages#STOP} message
* stops processing messages. All messages which arrive when the processing is stopped, will be
* discarded.
*
Expand All @@ -90,8 +94,12 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {

private final long maximumFramesize;

@Nonnull
private State state;

@Nullable
private CompletableFuture<Void> rpcEndpointTerminationFuture;

AkkaRpcActor(
final T rpcEndpoint,
final CompletableFuture<Boolean> terminationFuture,
Expand All @@ -104,50 +112,39 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
this.terminationFuture = checkNotNull(terminationFuture);
this.version = version;
this.maximumFramesize = maximumFramesize;
this.state = State.STOPPED;
this.state = StoppedState.INSTANCE;
this.rpcEndpointTerminationFuture = null;
}

@Override
public void postStop() throws Exception {
mainThreadValidator.enterMainThread();
super.postStop();

try {
CompletableFuture<Void> postStopFuture;
try {
postStopFuture = rpcEndpoint.postStop();
} catch (Throwable throwable) {
postStopFuture = FutureUtils.completedExceptionally(throwable);
}

super.postStop();

// IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise
// we would complete the future and let the actor system restart the actor with a completed
// future.
// Complete the termination future so that others know that we've stopped.

postStopFuture.whenComplete(
if (rpcEndpointTerminationFuture != null && rpcEndpointTerminationFuture.isDone()) {
rpcEndpointTerminationFuture.whenComplete(
(Void value, Throwable throwable) -> {
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(null);
}
});
} finally {
mainThreadValidator.exitMainThread();
} else {
terminationFuture.completeExceptionally(
new AkkaRpcException(
String.format("RpcEndpoint %s has not been properly stopped.", rpcEndpoint.getEndpointId())));
}

state = state.finishTermination();
}

@Override
public void onReceive(final Object message) {
if (message instanceof RemoteHandshakeMessage) {
handleHandshakeMessage((RemoteHandshakeMessage) message);
} else if (message.equals(Processing.START)) {
state = State.STARTED;
} else if (message.equals(Processing.STOP)) {
state = State.STOPPED;
} else if (state == State.STARTED) {
} else if (message instanceof ControlMessages) {
handleControlMessage(((ControlMessages) message));
} else if (state.isRunning()) {
mainThreadValidator.enterMainThread();

try {
Expand All @@ -165,6 +162,28 @@ public void onReceive(final Object message) {
}
}

private void handleControlMessage(ControlMessages controlMessage) {
switch (controlMessage) {
case START:
state = state.start();
break;
case STOP:
state = state.stop();
break;
case TERMINATE:
state.terminate(this);
break;
default:
handleUnknownControlMessage(controlMessage);
}
}

private void handleUnknownControlMessage(ControlMessages controlMessage) {
final String message = String.format("Received unknown control message %s. Dropping this message!", controlMessage);
log.warn(message);
sendErrorIfSender(new AkkaUnknownMessageException(message));
}

protected void handleRpcMessage(Object message) {
if (message instanceof RunAsync) {
handleRunAsync((RunAsync) message);
Expand Down Expand Up @@ -438,8 +457,115 @@ protected Object envelopeSelfMessage(Object message) {
return message;
}

enum State {
STARTED,
STOPPED
// ---------------------------------------------------------------------------
// Internal state machine
// ---------------------------------------------------------------------------

interface State {
default State start() {
throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StartedState.INSTANCE));
}

default State stop() {
throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(StoppedState.INSTANCE));
}

default State terminate(AkkaRpcActor<?> akkaRpcActor) {
throw new AkkaRpcInvalidStateException(invalidStateTransitionMessage(TerminatingState.INSTANCE));
}

default State finishTermination() {
return TerminatedState.INSTANCE;
}

default boolean isRunning() {
return false;
}

default String invalidStateTransitionMessage(State targetState) {
return String.format("AkkaRpcActor is currently in state %s and cannot go into state %s.", this, targetState);
}
}

@SuppressWarnings("Singleton")
enum StartedState implements State {
INSTANCE;

@Override
public State start() {
return INSTANCE;
}

@Override
public State stop() {
return StoppedState.INSTANCE;
}

@Override
public State terminate(AkkaRpcActor<?> akkaRpcActor) {
akkaRpcActor.mainThreadValidator.enterMainThread();

try {
akkaRpcActor.rpcEndpointTerminationFuture = akkaRpcActor.rpcEndpoint.onStop();
} catch (Throwable t) {
akkaRpcActor.rpcEndpointTerminationFuture = FutureUtils.completedExceptionally(
new AkkaRpcException(
String.format("Failure while stopping RpcEndpoint %s.", akkaRpcActor.rpcEndpoint.getEndpointId()),
t));
} finally {
akkaRpcActor.mainThreadValidator.exitMainThread();
}

// IMPORTANT: This only works if we don't use a restarting supervisor strategy. Otherwise
// we would complete the future and let the actor system restart the actor with a completed
// future.
// Complete the termination future so that others know that we've stopped.

akkaRpcActor.rpcEndpointTerminationFuture.whenComplete((ignored, throwable) -> akkaRpcActor.getSelf().tell(Kill.getInstance(), ActorRef.noSender()));

return TerminatingState.INSTANCE;
}

@Override
public boolean isRunning() {
return true;
}
}

@SuppressWarnings("Singleton")
enum StoppedState implements State {
INSTANCE;

@Override
public State start() {
return StartedState.INSTANCE;
}

@Override
public State stop() {
return INSTANCE;
}

@Override
public State terminate(AkkaRpcActor<?> akkaRpcActor) {
akkaRpcActor.rpcEndpointTerminationFuture = CompletableFuture.completedFuture(null);
akkaRpcActor.getSelf().tell(Kill.getInstance(), ActorRef.noSender());

return TerminatingState.INSTANCE;
}
}

@SuppressWarnings("Singleton")
enum TerminatingState implements State {
INSTANCE;

@Override
public boolean isRunning() {
return true;
}
}

enum TerminatedState implements State {
INSTANCE
}
}
Loading

0 comments on commit 3d20007

Please sign in to comment.