Skip to content

Commit

Permalink
[FLINK-8392] [rpc] Let termination future be completed by AkkaRpcActo…
Browse files Browse the repository at this point in the history
…r#postStop

Revert the changes introduced by FLINK-7754. An RpcEndpoint's termination future is now
completed from the AkkaRpcActor#postStop method.

This closes apache#5266.
  • Loading branch information
tillrohrmann committed Jan 10, 2018
1 parent 6033de0 commit 51a2787
Show file tree
Hide file tree
Showing 7 changed files with 40 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,8 @@

import akka.actor.ActorRef;

import java.util.concurrent.CompletableFuture;

/**
* Interface for Akka based rpc gateways
* Interface for Akka based rpc gateways.
*/
interface AkkaBasedEndpoint extends RpcGateway {

Expand All @@ -35,11 +33,4 @@ interface AkkaBasedEndpoint extends RpcGateway {
* @return the {@link ActorRef} of the underlying RPC actor
*/
ActorRef getActorRef();

/**
* Returns the internal termination future.
*
* @return Internal termination future
*/
CompletableFuture<Void> getInternalTerminationFuture();
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,24 @@

package org.apache.flink.runtime.rpc.akka;

import akka.actor.ActorRef;
import akka.pattern.Patterns;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.concurrent.FutureUtils;
import org.apache.flink.runtime.rpc.FencedRpcGateway;
import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.RpcServer;
import org.apache.flink.runtime.rpc.RpcTimeout;
import org.apache.flink.runtime.rpc.StartStoppable;
import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.messages.CallAsync;
import org.apache.flink.runtime.rpc.messages.LocalRpcInvocation;
import org.apache.flink.runtime.rpc.akka.messages.Processing;
import org.apache.flink.runtime.rpc.messages.RemoteRpcInvocation;
import org.apache.flink.runtime.rpc.messages.RpcInvocation;
import org.apache.flink.runtime.rpc.messages.RunAsync;
import org.apache.flink.util.Preconditions;

import akka.actor.ActorRef;
import akka.pattern.Patterns;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -49,8 +50,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeoutException;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Invocation handler to be used with an {@link AkkaRpcActor}. The invocation handler wraps the
Expand Down Expand Up @@ -85,18 +86,13 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
@Nullable
private final CompletableFuture<Boolean> terminationFuture;

// null if gateway; otherwise non-null
@Nullable
private final CompletableFuture<Void> internalTerminationFuture;

AkkaInvocationHandler(
String address,
String hostname,
ActorRef rpcEndpoint,
Time timeout,
long maximumFramesize,
@Nullable CompletableFuture<Boolean> terminationFuture,
@Nullable CompletableFuture<Void> internalTerminationFuture) {
@Nullable CompletableFuture<Boolean> terminationFuture) {

this.address = Preconditions.checkNotNull(address);
this.hostname = Preconditions.checkNotNull(hostname);
Expand All @@ -105,7 +101,6 @@ class AkkaInvocationHandler implements InvocationHandler, AkkaBasedEndpoint, Rpc
this.timeout = Preconditions.checkNotNull(timeout);
this.maximumFramesize = maximumFramesize;
this.terminationFuture = terminationFuture;
this.internalTerminationFuture = internalTerminationFuture;
}

@Override
Expand Down Expand Up @@ -159,7 +154,7 @@ public void scheduleRunAsync(Runnable runnable, long delayMillis) {

@Override
public <V> CompletableFuture<V> callAsync(Callable<V> callable, Time callTimeout) {
if(isLocal) {
if (isLocal) {
@SuppressWarnings("unchecked")
CompletableFuture<V> resultFuture = (CompletableFuture<V>) ask(new CallAsync(callable), callTimeout);

Expand Down Expand Up @@ -208,7 +203,7 @@ private Object invokeRpc(Method method, Object[] args) throws Exception {
tell(rpcInvocation);

result = null;
} else if (Objects.equals(returnType,CompletableFuture.class)) {
} else if (Objects.equals(returnType, CompletableFuture.class)) {
// execute an asynchronous call
result = ask(rpcInvocation, futureTimeout);
} else {
Expand Down Expand Up @@ -298,7 +293,7 @@ private static Time extractRpcTimeout(Annotation[][] parameterAnnotations, Objec
}

/**
* Checks whether any of the annotations is of type {@link RpcTimeout}
* Checks whether any of the annotations is of type {@link RpcTimeout}.
*
* @param annotations Array of annotations
* @return True if {@link RpcTimeout} was found; otherwise false
Expand Down Expand Up @@ -349,9 +344,4 @@ public String getHostname() {
public CompletableFuture<Boolean> getTerminationFuture() {
return terminationFuture;
}

@Override
public CompletableFuture<Void> getInternalTerminationFuture() {
return internalTerminationFuture;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,22 +53,22 @@
/**
* Akka rpc actor which receives {@link LocalRpcInvocation}, {@link RunAsync} and {@link CallAsync}
* {@link Processing} messages.
* <p>
* The {@link LocalRpcInvocation} designates a rpc and is dispatched to the given {@link RpcEndpoint}
*
* <p>The {@link LocalRpcInvocation} designates a rpc and is dispatched to the given {@link RpcEndpoint}
* instance.
* <p>
* The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed
*
* <p>The {@link RunAsync} and {@link CallAsync} messages contain executable code which is executed
* in the context of the actor thread.
* <p>
* The {@link Processing} message controls the processing behaviour of the akka rpc actor. A
*
* <p>The {@link Processing} message controls the processing behaviour of the akka rpc actor. A
* {@link Processing#START} starts processing incoming messages. A {@link Processing#STOP} message
* stops processing messages. All messages which arrive when the processing is stopped, will be
* discarded.
*
* @param <T> Type of the {@link RpcEndpoint}
*/
class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {

protected final Logger log = LoggerFactory.getLogger(getClass());

/** the endpoint to invoke the methods on. */
Expand All @@ -77,12 +77,12 @@ class AkkaRpcActor<T extends RpcEndpoint & RpcGateway> extends UntypedActor {
/** the helper that tracks whether calls come from the main thread. */
private final MainThreadValidatorUtil mainThreadValidator;

private final CompletableFuture<Void> internalTerminationFuture;
private final CompletableFuture<Boolean> terminationFuture;

AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Void> internalTerminationFuture) {
AkkaRpcActor(final T rpcEndpoint, final CompletableFuture<Boolean> terminationFuture) {
this.rpcEndpoint = checkNotNull(rpcEndpoint, "rpc endpoint");
this.mainThreadValidator = new MainThreadValidatorUtil(rpcEndpoint);
this.internalTerminationFuture = checkNotNull(internalTerminationFuture);
this.terminationFuture = checkNotNull(terminationFuture);
}

@Override
Expand All @@ -106,9 +106,9 @@ public void postStop() throws Exception {
// Complete the termination future so that others know that we've stopped.

if (shutdownThrowable != null) {
internalTerminationFuture.completeExceptionally(shutdownThrowable);
terminationFuture.completeExceptionally(shutdownThrowable);
} else {
internalTerminationFuture.complete(null);
terminationFuture.complete(null);
}
} finally {
mainThreadValidator.exitMainThread();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,15 @@
import java.io.Serializable;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;

import scala.Option;
Expand Down Expand Up @@ -165,7 +162,6 @@ public <C extends RpcGateway> CompletableFuture<C> connect(
actorRef,
timeout,
maximumFramesize,
null,
null);
});
}
Expand All @@ -186,7 +182,6 @@ public <F extends Serializable, C extends FencedRpcGateway<F>> CompletableFuture
timeout,
maximumFramesize,
null,
null,
() -> fencingToken);
});
}
Expand All @@ -196,13 +191,12 @@ public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint)
checkNotNull(rpcEndpoint, "rpc endpoint");

CompletableFuture<Boolean> terminationFuture = new CompletableFuture<>();
CompletableFuture<Void> internalTerminationFuture = new CompletableFuture<>();
final Props akkaRpcActorProps;

if (rpcEndpoint instanceof FencedRpcEndpoint) {
akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, internalTerminationFuture);
akkaRpcActorProps = Props.create(FencedAkkaRpcActor.class, rpcEndpoint, terminationFuture);
} else {
akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, internalTerminationFuture);
akkaRpcActorProps = Props.create(AkkaRpcActor.class, rpcEndpoint, terminationFuture);
}

ActorRef actorRef;
Expand Down Expand Up @@ -240,7 +234,6 @@ public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint)
timeout,
maximumFramesize,
terminationFuture,
internalTerminationFuture,
((FencedRpcEndpoint<?>) rpcEndpoint)::getFencingToken);

implementedRpcGateways.add(FencedMainThreadExecutable.class);
Expand All @@ -251,8 +244,7 @@ public <C extends RpcEndpoint & RpcGateway> RpcServer startServer(C rpcEndpoint)
actorRef,
timeout,
maximumFramesize,
terminationFuture,
internalTerminationFuture);
terminationFuture);
}

// Rather than using the System ClassLoader directly, we derive the ClassLoader
Expand Down Expand Up @@ -280,7 +272,6 @@ public <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F
timeout,
maximumFramesize,
null,
null,
() -> fencingToken);

// Rather than using the System ClassLoader directly, we derive the ClassLoader
Expand All @@ -300,43 +291,19 @@ public <F extends Serializable> RpcServer fenceRpcServer(RpcServer rpcServer, F
@Override
public void stopServer(RpcServer selfGateway) {
if (selfGateway instanceof AkkaBasedEndpoint) {
AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint) selfGateway;
final AkkaBasedEndpoint akkaClient = (AkkaBasedEndpoint) selfGateway;
final RpcEndpoint rpcEndpoint;

boolean fromThisService;
synchronized (lock) {
if (stopped) {
return;
} else {
fromThisService = actors.remove(akkaClient.getActorRef()) != null;
rpcEndpoint = actors.remove(akkaClient.getActorRef());
}
}

if (fromThisService) {
ActorRef selfActorRef = akkaClient.getActorRef();
LOG.info("Trigger shut down of RPC endpoint {}.", selfGateway.getAddress());

CompletableFuture<Boolean> akkaTerminationFuture = FutureUtils.toJava(
Patterns.gracefulStop(
selfActorRef,
FutureUtils.toFiniteDuration(timeout),
Kill.getInstance()));

akkaTerminationFuture
.thenCombine(
akkaClient.getInternalTerminationFuture(),
(Boolean terminated, Void ignored) -> true)
.whenComplete(
(Boolean terminated, Throwable throwable) -> {
if (throwable != null) {
LOG.debug("Graceful RPC endpoint shutdown failed. Shutting endpoint down hard now.", throwable);

actorSystem.stop(selfActorRef);
selfGateway.getTerminationFuture().completeExceptionally(throwable);
} else {
LOG.info("RPC endpoint {} has been shut down.", selfGateway.getAddress());
selfGateway.getTerminationFuture().complete(null);
}
});
if (rpcEndpoint != null) {
akkaClient.getActorRef().tell(Kill.getInstance(), ActorRef.noSender());
} else {
LOG.debug("RPC endpoint {} already stopped or from different RPC service", selfGateway.getAddress());
}
Expand All @@ -347,44 +314,20 @@ public void stopServer(RpcServer selfGateway) {
public void stopService() {
LOG.info("Stopping Akka RPC service.");

final List<RpcEndpoint> actorsToTerminate;

synchronized (lock) {
if (stopped) {
return;
}

stopped = true;

actorSystem.shutdown();

actorsToTerminate = new ArrayList<>(actors.values());

actors.clear();
}

actorSystem.shutdown();
actorSystem.awaitTermination();

// complete the termination futures of all actors
for (RpcEndpoint rpcEndpoint : actorsToTerminate) {
final CompletableFuture<Boolean> terminationFuture = rpcEndpoint.getTerminationFuture();

AkkaBasedEndpoint akkaBasedEndpoint = rpcEndpoint.getSelfGateway(AkkaBasedEndpoint.class);

CompletableFuture<Void> internalTerminationFuture = akkaBasedEndpoint.getInternalTerminationFuture();

internalTerminationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
terminationFuture.completeExceptionally(throwable);
} else {
terminationFuture.complete(true);
}
});

// make sure that if the internal termination futures haven't completed yet, then they time out
internalTerminationFuture.completeExceptionally(
new TimeoutException("The RpcEndpoint " + rpcEndpoint.getAddress() + " did not terminate in time."));
synchronized (lock) {
actors.clear();
}

LOG.info("Stopped Akka RPC service.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,8 @@ public FencedAkkaInvocationHandler(
Time timeout,
long maximumFramesize,
@Nullable CompletableFuture<Boolean> terminationFuture,
@Nullable CompletableFuture<Void> internalTerminationFuture,
Supplier<F> fencingTokenSupplier) {
super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture, internalTerminationFuture);
super(address, hostname, rpcEndpoint, timeout, maximumFramesize, terminationFuture);

this.fencingTokenSupplier = Preconditions.checkNotNull(fencingTokenSupplier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
package org.apache.flink.runtime.rpc.akka;

import org.apache.flink.runtime.rpc.FencedRpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.akka.exceptions.AkkaUnknownMessageException;
import org.apache.flink.runtime.rpc.exceptions.FencingTokenException;
import org.apache.flink.runtime.rpc.RpcGateway;
import org.apache.flink.runtime.rpc.messages.FencedMessage;
import org.apache.flink.runtime.rpc.messages.UnfencedMessage;

Expand All @@ -38,8 +38,8 @@
*/
public class FencedAkkaRpcActor<F extends Serializable, T extends FencedRpcEndpoint<F> & RpcGateway> extends AkkaRpcActor<T> {

public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Void> internalTerminationFuture) {
super(rpcEndpoint, internalTerminationFuture);
public FencedAkkaRpcActor(T rpcEndpoint, CompletableFuture<Boolean> terminationFuture) {
super(rpcEndpoint, terminationFuture);
}

@Override
Expand Down
Loading

0 comments on commit 51a2787

Please sign in to comment.