Skip to content

Commit

Permalink
[FLINK-10253] Add ActorSystemExecutorConfiguration to configure Actor…
Browse files Browse the repository at this point in the history
…System's executor

Add MetricUtilsTest#testStartMetricActorSystemRespectsThreadPriority

This closes apache#6839.
  • Loading branch information
tillrohrmann committed Oct 17, 2018
1 parent 3932433 commit f81297a
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 89 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,12 @@
package org.apache.flink.runtime.clusterframework;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.AkkaOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
Expand Down Expand Up @@ -99,7 +99,7 @@ public static ActorSystem startActorSystem(
listeningAddress,
portRangeDefinition,
logger,
ActorSystemExecutorMode.FORK_JOIN_EXECUTOR);
ForkJoinExecutorConfiguration.fromConfiguration(configuration));
}

/**
Expand All @@ -109,7 +109,7 @@ public static ActorSystem startActorSystem(
* @param listeningAddress The address to listen at.
* @param portRangeDefinition The port range to choose a port from.
* @param logger The logger to output log information.
* @param executorMode The executor mode of Akka actor system.
* @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor
* @return The ActorSystem which has been started
* @throws Exception Thrown when actor system cannot be started in specified port range
*/
Expand All @@ -118,14 +118,14 @@ public static ActorSystem startActorSystem(
String listeningAddress,
String portRangeDefinition,
Logger logger,
@Nonnull ActorSystemExecutorMode executorMode) throws Exception {
@Nonnull ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception {
return startActorSystem(
configuration,
AkkaUtils.getFlinkActorSystemName(),
listeningAddress,
portRangeDefinition,
logger,
executorMode);
actorSystemExecutorConfiguration);
}

/**
Expand All @@ -136,7 +136,7 @@ public static ActorSystem startActorSystem(
* @param listeningAddress The address to listen at.
* @param portRangeDefinition The port range to choose a port from.
* @param logger The logger to output log information.
* @param executorMode The executor mode of Akka actor system.
* @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor
* @return The ActorSystem which has been started
* @throws Exception Thrown when actor system cannot be started in specified port range
*/
Expand All @@ -146,7 +146,7 @@ public static ActorSystem startActorSystem(
String listeningAddress,
String portRangeDefinition,
Logger logger,
@Nonnull ActorSystemExecutorMode executorMode) throws Exception {
@Nonnull ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception {

// parse port range definition and create port iterator
Iterator<Integer> portsIterator;
Expand Down Expand Up @@ -178,7 +178,7 @@ public static ActorSystem startActorSystem(
listeningAddress,
port,
logger,
executorMode);
actorSystemExecutorConfiguration);
}
catch (Exception e) {
// we can continue to try if this contains a netty channel exception
Expand Down Expand Up @@ -210,7 +210,12 @@ public static ActorSystem startActorSystem(
String listeningAddress,
int listeningPort,
Logger logger) throws Exception {
return startActorSystem(configuration, listeningAddress, listeningPort, logger, ActorSystemExecutorMode.FORK_JOIN_EXECUTOR);
return startActorSystem(
configuration,
listeningAddress,
listeningPort,
logger,
ForkJoinExecutorConfiguration.fromConfiguration(configuration));
}

/**
Expand All @@ -219,7 +224,7 @@ public static ActorSystem startActorSystem(
* @param listeningAddress The address to listen at.
* @param listeningPort The port to listen at.
* @param logger the logger to output log information.
* @param executorMode The executor mode of Akka actor system.
* @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor
* @return The ActorSystem which has been started.
* @throws Exception
*/
Expand All @@ -228,14 +233,14 @@ public static ActorSystem startActorSystem(
String listeningAddress,
int listeningPort,
Logger logger,
ActorSystemExecutorMode executorMode) throws Exception {
ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception {
return startActorSystem(
configuration,
AkkaUtils.getFlinkActorSystemName(),
listeningAddress,
listeningPort,
logger,
executorMode);
actorSystemExecutorConfiguration);
}

/**
Expand All @@ -245,7 +250,7 @@ public static ActorSystem startActorSystem(
* @param listeningAddress The address to listen at.
* @param listeningPort The port to listen at.
* @param logger the logger to output log information.
* @param executorMode The executor mode of Akka actor system.
* @param actorSystemExecutorConfiguration configuration for the ActorSystem's underlying executor
* @return The ActorSystem which has been started.
* @throws Exception
*/
Expand All @@ -255,7 +260,7 @@ public static ActorSystem startActorSystem(
String listeningAddress,
int listeningPort,
Logger logger,
ActorSystemExecutorMode executorMode) throws Exception {
ActorSystemExecutorConfiguration actorSystemExecutorConfiguration) throws Exception {

String hostPortUrl = NetUtils.unresolvedHostAndPortToNormalizedString(listeningAddress, listeningPort);
logger.info("Trying to start actor system at {}", hostPortUrl);
Expand All @@ -264,8 +269,7 @@ public static ActorSystem startActorSystem(
Config akkaConfig = AkkaUtils.getAkkaConfig(
configuration,
new Some<>(new Tuple2<>(listeningAddress, listeningPort)),
getExecutorConfigByExecutorMode(configuration, executorMode)
);
actorSystemExecutorConfiguration.getAkkaConfig());

logger.debug("Using akka configuration\n {}", akkaConfig);

Expand All @@ -286,18 +290,6 @@ public static ActorSystem startActorSystem(
}
}

private static Config getExecutorConfigByExecutorMode(Configuration configuration, ActorSystemExecutorMode executorMode) {
switch (executorMode) {
case FORK_JOIN_EXECUTOR:
return AkkaUtils.getForkJoinExecutorConfig(configuration);
case FIXED_THREAD_POOL_EXECUTOR:
return AkkaUtils.getThreadPoolExecutorConfig(
configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY));
default:
throw new IllegalArgumentException(String.format("Unknown ActorSystemExecutorMode %s.", executorMode));
}
}

/**
* Starts the web frontend.
*
Expand Down Expand Up @@ -627,12 +619,102 @@ public static Configuration cloneConfiguration(Configuration configuration) {
}

/**
* Options to specify which executor to use in an {@link ActorSystem}.
* Configuration interface for {@link ActorSystem} underlying executor.
*/
interface ActorSystemExecutorConfiguration {

/**
* Create the executor {@link Config} for the respective executor.
*
* @return Akka config for the respective executor
*/
Config getAkkaConfig();
}

/**
* Configuration for a fork join executor.
*/
public enum ActorSystemExecutorMode {
/** Used by default, use dispatcher with fork-join-executor. **/
FORK_JOIN_EXECUTOR,
/** Use dispatcher with fixed thread pool executor. **/
FIXED_THREAD_POOL_EXECUTOR
public static class ForkJoinExecutorConfiguration implements ActorSystemExecutorConfiguration {

private final double parallelismFactor;

private final int minParallelism;

private final int maxParallelism;

public ForkJoinExecutorConfiguration(double parallelismFactor, int minParallelism, int maxParallelism) {
this.parallelismFactor = parallelismFactor;
this.minParallelism = minParallelism;
this.maxParallelism = maxParallelism;
}

public double getParallelismFactor() {
return parallelismFactor;
}

public int getMinParallelism() {
return minParallelism;
}

public int getMaxParallelism() {
return maxParallelism;
}

@Override
public Config getAkkaConfig() {
return AkkaUtils.getForkJoinExecutorConfig(this);
}

public static ForkJoinExecutorConfiguration fromConfiguration(final Configuration configuration) {
final double parallelismFactor = configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR);
final int minParallelism = configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN);
final int maxParallelism = configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX);

return new ForkJoinExecutorConfiguration(parallelismFactor, minParallelism, maxParallelism);
}
}

/**
* Configuration for a fixed thread pool executor.
*/
public static class FixedThreadPoolExecutorConfiguration implements ActorSystemExecutorConfiguration {

private final int minNumThreads;

private final int maxNumThreads;

private final int threadPriority;

public FixedThreadPoolExecutorConfiguration(int minNumThreads, int maxNumThreads, int threadPriority) {
if (threadPriority < Thread.MIN_PRIORITY || threadPriority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
String.format(
"The thread priority must be within (%s, %s) but it was %s.",
Thread.MIN_PRIORITY,
Thread.MAX_PRIORITY,
threadPriority));
}

this.minNumThreads = minNumThreads;
this.maxNumThreads = maxNumThreads;
this.threadPriority = threadPriority;
}

public int getMinNumThreads() {
return minNumThreads;
}

public int getMaxNumThreads() {
return maxNumThreads;
}

public int getThreadPriority() {
return threadPriority;
}

@Override
public Config getAkkaConfig() {
return AkkaUtils.getThreadPoolExecutorConfig(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@

import scala.concurrent.duration.FiniteDuration;

import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FORK_JOIN_EXECUTOR;

/**
* Base class for the Flink cluster entry points.
*
Expand Down Expand Up @@ -297,7 +295,7 @@ protected RpcService createRpcService(
Configuration configuration,
String bindAddress,
String portRange) throws Exception {
ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG, FORK_JOIN_EXECUTOR);
ActorSystem actorSystem = BootstrapTools.startActorSystem(configuration, bindAddress, portRange, LOG);
FiniteDuration duration = AkkaUtils.getTimeout(configuration);
return new AkkaRpcService(actorSystem, Time.of(duration.length(), duration.unit()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,6 @@
import java.util.List;
import java.util.Optional;

import static org.apache.flink.runtime.clusterframework.BootstrapTools.ActorSystemExecutorMode.FIXED_THREAD_POOL_EXECUTOR;
import static org.apache.flink.runtime.metrics.util.SystemResourcesMetricsInitializer.instantiateSystemMetrics;

/**
Expand Down Expand Up @@ -123,13 +122,14 @@ public static void instantiateStatusMetrics(

public static ActorSystem startMetricsActorSystem(Configuration configuration, String hostname, Logger logger) throws Exception {
final String portRange = configuration.getString(MetricOptions.QUERY_SERVICE_PORT);
final int threadPriority = configuration.getInteger(MetricOptions.QUERY_SERVICE_THREAD_PRIORITY);
return BootstrapTools.startActorSystem(
configuration,
METRICS_ACTOR_SYSTEM_NAME,
hostname,
portRange,
logger,
FIXED_THREAD_POOL_EXECUTOR);
new BootstrapTools.FixedThreadPoolExecutorConfiguration(1, 1, threadPriority));
}

private static void instantiateNetworkMetrics(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import akka.pattern.{ask => akkaAsk}
import com.typesafe.config.{Config, ConfigFactory}
import org.apache.flink.api.common.time.Time
import org.apache.flink.configuration._
import org.apache.flink.runtime.clusterframework.BootstrapTools.{FixedThreadPoolExecutorConfiguration, ForkJoinExecutorConfiguration}
import org.apache.flink.runtime.concurrent.FutureUtils
import org.apache.flink.runtime.net.SSLUtils
import org.apache.flink.util.NetUtils
Expand Down Expand Up @@ -187,7 +188,10 @@ object AkkaUtils {
@throws(classOf[UnknownHostException])
def getAkkaConfig(configuration: Configuration,
externalAddress: Option[(String, Int)]): Config = {
getAkkaConfig(configuration, externalAddress, getForkJoinExecutorConfig(configuration))
getAkkaConfig(
configuration,
externalAddress,
getForkJoinExecutorConfig(ForkJoinExecutorConfiguration.fromConfiguration(configuration)))
}

/**
Expand Down Expand Up @@ -291,13 +295,10 @@ object AkkaUtils {
ConfigFactory.parseString(config)
}

def getThreadPoolExecutorConfig(threadPriority: Int): Config = {
if (threadPriority < Thread.MIN_PRIORITY || threadPriority > Thread.MAX_PRIORITY) {
throw new IllegalConfigurationException("The config : " +
MetricOptions.QUERY_SERVICE_THREAD_PRIORITY.key() + "'s value must between "
+ Thread.MIN_PRIORITY + " and " + Thread.MAX_PRIORITY +
", but the value is " + threadPriority)
}
def getThreadPoolExecutorConfig(configuration: FixedThreadPoolExecutorConfiguration): Config = {
val threadPriority = configuration.getThreadPriority
val minNumThreads = configuration.getMinNumThreads
val maxNumThreads = configuration.getMaxNumThreads

val configString = s"""
|akka {
Expand All @@ -307,9 +308,8 @@ object AkkaUtils {
| executor = "thread-pool-executor"
| thread-priority = $threadPriority
| thread-pool-executor {
| core-pool-size-min = 2
| core-pool-size-factor = 2.0
| core-pool-size-max = 4
| core-pool-size-min = $minNumThreads
| core-pool-size-max = $maxNumThreads
| }
| }
| }
Expand All @@ -320,15 +320,12 @@ object AkkaUtils {
ConfigFactory.parseString(configString)
}

def getForkJoinExecutorConfig(configuration: Configuration): Config = {
val forkJoinExecutorParallelismFactor =
configuration.getDouble(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_FACTOR)
def getForkJoinExecutorConfig(configuration: ForkJoinExecutorConfiguration): Config = {
val forkJoinExecutorParallelismFactor = configuration.getParallelismFactor

val forkJoinExecutorParallelismMin =
configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MIN)
val forkJoinExecutorParallelismMin = configuration.getMinParallelism

val forkJoinExecutorParallelismMax =
configuration.getInteger(AkkaOptions.FORK_JOIN_EXECUTOR_PARALLELISM_MAX)
val forkJoinExecutorParallelismMax = configuration.getMaxParallelism

val configString = s"""
|akka {
Expand Down
Loading

0 comments on commit f81297a

Please sign in to comment.