From dc95c397e18ce3362df3a433414b00d7cf53541d Mon Sep 17 00:00:00 2001 From: Till Rohrmann Date: Wed, 10 Oct 2018 10:33:54 +0200 Subject: [PATCH] [hotfix] Add AkkaUtils#terminateActorSystem --- .../apache/flink/util/function/FunctionUtils.java | 13 +++++++++++++ .../org/apache/flink/runtime/akka/AkkaUtils.scala | 14 +++++++++++++- 2 files changed, 26 insertions(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java index 83846a62b64af..b777308043f5f 100644 --- a/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java +++ b/flink-core/src/main/java/org/apache/flink/util/function/FunctionUtils.java @@ -34,6 +34,8 @@ private FunctionUtils() { private static final Function NULL_FN = ignored -> null; + private static final Consumer IGNORE_FN = ignored -> {}; + /** * Function which returns {@code null} (type: Void). * @@ -45,6 +47,17 @@ public static Function nullFn() { return (Function) NULL_FN; } + /** + * Consumer which ignores the input. + * + * @param type of the input + * @return Ignoring {@link Consumer} + */ + @SuppressWarnings("unchecked") + public static Consumer ignoreFn() { + return (Consumer) IGNORE_FN; + } + /** * Convert at {@link FunctionWithException} into a {@link Function}. * diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala index 9ce1865204fe7..524ef73f88c97 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/akka/AkkaUtils.scala @@ -20,15 +20,17 @@ package org.apache.flink.runtime.akka import java.io.IOException import java.net._ -import java.util.concurrent.{Callable, TimeUnit} +import java.util.concurrent.{Callable, CompletableFuture, TimeUnit} import akka.actor._ import akka.pattern.{ask => akkaAsk} import com.typesafe.config.{Config, ConfigFactory} import org.apache.flink.api.common.time.Time import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, SecurityOptions} +import org.apache.flink.runtime.concurrent.FutureUtils import org.apache.flink.runtime.net.SSLUtils import org.apache.flink.util.NetUtils +import org.apache.flink.util.function.FunctionUtils import org.jboss.netty.channel.ChannelException import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory} import org.slf4j.{Logger, LoggerFactory} @@ -852,5 +854,15 @@ object AkkaUtils { case f => f } } + + /** + * Terminates the given [[ActorSystem]] and returns its termination future. + * + * @param actorSystem to terminate + * @return Termination future + */ + def terminateActorSystem(actorSystem: ActorSystem): CompletableFuture[Void] = { + FutureUtils.toJava(actorSystem.terminate).thenAccept(FunctionUtils.ignoreFn()) + } }