Skip to content

Commit

Permalink
[hotfix] Add AkkaUtils#terminateActorSystem
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Oct 10, 2018
1 parent 78aeac0 commit dc95c39
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ private FunctionUtils() {

private static final Function<Object, Void> NULL_FN = ignored -> null;

private static final Consumer<Object> IGNORE_FN = ignored -> {};

/**
* Function which returns {@code null} (type: Void).
*
Expand All @@ -45,6 +47,17 @@ public static <T> Function<T, Void> nullFn() {
return (Function<T, Void>) NULL_FN;
}

/**
* Consumer which ignores the input.
*
* @param <T> type of the input
* @return Ignoring {@link Consumer}
*/
@SuppressWarnings("unchecked")
public static <T> Consumer<T> ignoreFn() {
return (Consumer<T>) IGNORE_FN;
}

/**
* Convert at {@link FunctionWithException} into a {@link Function}.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,17 @@ package org.apache.flink.runtime.akka

import java.io.IOException
import java.net._
import java.util.concurrent.{Callable, TimeUnit}
import java.util.concurrent.{Callable, CompletableFuture, TimeUnit}

import akka.actor._
import akka.pattern.{ask => akkaAsk}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration.{AkkaOptions, Configuration, IllegalConfigurationException, SecurityOptions}
import org.apache.flink.runtime.concurrent.FutureUtils
import org.apache.flink.runtime.net.SSLUtils
import org.apache.flink.util.NetUtils
import org.apache.flink.util.function.FunctionUtils
import org.jboss.netty.channel.ChannelException
import org.jboss.netty.logging.{InternalLoggerFactory, Slf4JLoggerFactory}
import org.slf4j.{Logger, LoggerFactory}
Expand Down Expand Up @@ -852,5 +854,15 @@ object AkkaUtils {
case f => f
}
}

/**
* Terminates the given [[ActorSystem]] and returns its termination future.
*
* @param actorSystem to terminate
* @return Termination future
*/
def terminateActorSystem(actorSystem: ActorSystem): CompletableFuture[Void] = {
FutureUtils.toJava(actorSystem.terminate).thenAccept(FunctionUtils.ignoreFn())
}
}

0 comments on commit dc95c39

Please sign in to comment.