Skip to content

Commit

Permalink
[FLINK-17248] Introduce configuration for pool size of io-executor fo…
Browse files Browse the repository at this point in the history
…r ClusterEntrypoint and MiniCluster

This closes apache#11957.
  • Loading branch information
Myasuka authored and tillrohrmann committed May 16, 2020
1 parent f967bcb commit 367c613
Show file tree
Hide file tree
Showing 6 changed files with 41 additions and 4 deletions.
6 changes: 6 additions & 0 deletions docs/_includes/generated/cluster_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@
<td>Boolean</td>
<td>Enable the slot spread out allocation strategy. This strategy tries to spread out the slots evenly across all available <span markdown="span">`TaskExecutors`</span>.</td>
</tr>
<tr>
<td><h5>cluster.io-executor.pool-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The pool size of io executor for cluster entry-point and mini cluster. It's undefined by default and will use the number of CPU cores (hardware contexts) that the cluster entry-point JVM has access to.</td>
</tr>
<tr>
<td><h5>cluster.registration.error-delay</h5></td>
<td style="word-wrap: break-word;">10000</td>
Expand Down
6 changes: 6 additions & 0 deletions docs/_includes/generated/expert_fault_tolerance_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,12 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>cluster.io-executor.pool-size</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>Integer</td>
<td>The pool size of io executor for cluster entry-point and mini cluster. It's undefined by default and will use the number of CPU cores (hardware contexts) that the cluster entry-point JVM has access to.</td>
</tr>
<tr>
<td><h5>cluster.registration.error-delay</h5></td>
<td style="word-wrap: break-word;">10000</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,14 @@ public class ClusterOptions {
.defaultValue(30000L)
.withDescription("The shutdown timeout for cluster services like executors in milliseconds.");

@Documentation.Section(Documentation.Sections.EXPERT_FAULT_TOLERANCE)
public static final ConfigOption<Integer> CLUSTER_IO_EXECUTOR_POOL_SIZE = ConfigOptions
.key("cluster.io-executor.pool-size")
.intType()
.noDefaultValue()
.withDescription("The pool size of io executor for cluster entry-point and mini cluster. " +
"It's undefined by default and will use the number of CPU cores (hardware contexts) that the cluster entry-point JVM has access to.");

@Documentation.Section(Documentation.Sections.EXPERT_SCHEDULING)
public static final ConfigOption<Boolean> EVENLY_SPREAD_OUT_SLOTS_STRATEGY = ConfigOptions
.key("cluster.evenly-spread-out-slots")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.security.contexts.SecurityContext;
import org.apache.flink.runtime.util.ClusterEntrypointUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcMetricQueryServiceRetriever;
import org.apache.flink.util.AutoCloseableAsync;
Expand Down Expand Up @@ -261,7 +261,7 @@ protected void initializeServices(Configuration configuration, PluginManager plu
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("cluster-io"));
haServices = createHaServices(configuration, ioExecutor);
blobServer = new BlobServer(configuration, haServices.createBlobStore());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,8 @@
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceUtils;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.util.ClusterEntrypointUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
Expand Down Expand Up @@ -311,7 +311,7 @@ public void start() throws Exception {
ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));

ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
ClusterEntrypointUtils.getPoolSize(configuration),
new ExecutorThreadFactory("mini-cluster-io"));
haServices = createHighAvailabilityServices(configuration, ioExecutor);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@

package org.apache.flink.runtime.util;

import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -59,4 +62,18 @@ private static File deriveFlinkHomeDirectoryFromLibDirectory() {
return new File(libDirectory).getParentFile();
}
}

/**
* Gets and verify the io-executor pool size based on configuration.
*
* @param config The configuration to read.
* @return The legal io-executor pool size.
*/
public static int getPoolSize(Configuration config) {
final int poolSize = config.getInteger(ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE, Hardware.getNumberCPUCores());
Preconditions.checkArgument(poolSize > 0,
String.format("Illegal pool size (%s) of io-executor, please re-configure '%s'.",
poolSize, ClusterOptions.CLUSTER_IO_EXECUTOR_POOL_SIZE.key()));
return poolSize;
}
}

0 comments on commit 367c613

Please sign in to comment.