Skip to content

Commit

Permalink
[FLINK-11851] Introduce dedicated io executor for ClusterEntrypoint a…
Browse files Browse the repository at this point in the history
…nd MiniCluster

The io executor is responsible for running io operations like discarding checkpoints.
By using the io executor, we don't risk that the RpcService is blocked by blocking
io operations.

This closes apache#7924.
  • Loading branch information
tillrohrmann committed Mar 7, 2019
1 parent f2b0e49 commit 3c7ed14
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 3 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/cluster_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,10 @@
<td style="word-wrap: break-word;">30000</td>
<td>The pause made after the registration attempt was refused in milliseconds.</td>
</tr>
<tr>
<td><h5>cluster.services.shutdown-timeout</h5></td>
<td style="word-wrap: break-word;">30000</td>
<td>The shutdown timeout for cluster services like executors in milliseconds.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,9 @@ public class ClusterOptions {
.key("cluster.registration.refused-registration-delay")
.defaultValue(30000L)
.withDescription("The pause made after the registration attempt was refused in milliseconds.");

public static final ConfigOption<Long> CLUSTER_SERVICES_SHUTDOWN_TIMEOUT = ConfigOptions
.key("cluster.services.shutdown-timeout")
.defaultValue(30000L)
.withDescription("The shutdown timeout for cluster services like executors in milliseconds.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.entrypoint;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -53,10 +54,13 @@
import org.apache.flink.runtime.security.SecurityConfiguration;
import org.apache.flink.runtime.security.SecurityContext;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.ZooKeeperUtils;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.ShutdownHookUtil;
Expand All @@ -80,6 +84,8 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -129,6 +135,9 @@ public abstract class ClusterEntrypoint implements AutoCloseableAsync, FatalErro
@GuardedBy("lock")
private RpcService commonRpcService;

@GuardedBy("lock")
private ExecutorService ioExecutor;

@GuardedBy("lock")
private ActorSystem metricQueryServiceActorSystem;

Expand Down Expand Up @@ -258,7 +267,10 @@ protected void initializeServices(Configuration configuration) throws Exception
configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());

haServices = createHaServices(configuration, commonRpcService.getExecutor());
ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("cluster-io"));
haServices = createHaServices(configuration, ioExecutor);
blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
heartbeatServices = createHeartbeatServices(configuration);
Expand Down Expand Up @@ -324,6 +336,8 @@ public CompletableFuture<Void> closeAsync() {
}

protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
final long shutdownTimeout = configuration.getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);

synchronized (lock) {
Throwable exception = null;

Expand Down Expand Up @@ -373,6 +387,10 @@ protected CompletableFuture<Void> stopClusterServices(boolean cleanupHaData) {
terminationFutures.add(AkkaUtils.terminateActorSystem(metricQueryServiceActorSystem));
}

if (ioExecutor != null) {
terminationFutures.add(ExecutorUtils.nonBlockingShutdown(shutdownTimeout, TimeUnit.MILLISECONDS, ioExecutor));
}

if (commonRpcService != null) {
terminationFutures.add(commonRpcService.stopService());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.io.FileOutputFormat;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
Expand Down Expand Up @@ -65,12 +66,15 @@
import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.webmonitor.retriever.LeaderRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.util.AutoCloseableAsync;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.function.FunctionUtils;

Expand All @@ -96,6 +100,9 @@
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -130,6 +137,9 @@ public class MiniCluster implements JobExecutorService, AutoCloseableAsync {
@GuardedBy("lock")
private RpcService commonRpcService;

@GuardedBy("lock")
private ExecutorService ioExecutor;

@GuardedBy("lock")
private final Collection<RpcService> rpcServices;

Expand Down Expand Up @@ -288,7 +298,10 @@ public void start() throws Exception {
LOG);
metricRegistry.startQueryService(metricQueryServiceActorSystem, null);

haServices = createHighAvailabilityServices(configuration, commonRpcService.getExecutor());
ioExecutor = Executors.newFixedThreadPool(
Hardware.getNumberCPUCores(),
new ExecutorThreadFactory("mini-cluster-io"));
haServices = createHighAvailabilityServices(configuration, ioExecutor);

blobServer = new BlobServer(configuration, haServices.createBlobStore());
blobServer.start();
Expand Down Expand Up @@ -410,6 +423,7 @@ public CompletableFuture<Void> closeAsync() {
if (running) {
LOG.info("Shutting down Flink Mini Cluster");
try {
final long shutdownTimeoutMillis = miniClusterConfiguration.getConfiguration().getLong(ClusterOptions.CLUSTER_SERVICES_SHUTDOWN_TIMEOUT);
final int numComponents = 2 + miniClusterConfiguration.getNumTaskManagers();
final Collection<CompletableFuture<Void>> componentTerminationFutures = new ArrayList<>(numComponents);

Expand All @@ -431,7 +445,11 @@ public CompletableFuture<Void> closeAsync() {
rpcServicesTerminationFuture,
this::terminateMiniClusterServices);

remainingServicesTerminationFuture.whenComplete(
final CompletableFuture<Void> executorsTerminationFuture = FutureUtils.runAfterwards(
remainingServicesTerminationFuture,
() -> terminateExecutors(shutdownTimeoutMillis));

executorsTerminationFuture.whenComplete(
(Void ignored, Throwable throwable) -> {
if (throwable != null) {
terminationFuture.completeExceptionally(ExceptionUtils.stripCompletionException(throwable));
Expand Down Expand Up @@ -842,6 +860,16 @@ private CompletionStage<Void> terminateRpcServices() {
}
}

private CompletableFuture<Void> terminateExecutors(long executorShutdownTimeoutMillis) {
synchronized (lock) {
if (ioExecutor != null) {
return ExecutorUtils.nonBlockingShutdown(executorShutdownTimeoutMillis, TimeUnit.MILLISECONDS, ioExecutor);
} else {
return CompletableFuture.completedFuture(null);
}
}
}

/**
* Internal factory for {@link RpcService}.
*/
Expand Down

0 comments on commit 3c7ed14

Please sign in to comment.