Skip to content

Commit

Permalink
[FLINK-10282][rest] Separate REST and Dispatcher RPC thread pools
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Oct 10, 2018
1 parent 773e473 commit 0e1a250
Show file tree
Hide file tree
Showing 11 changed files with 49 additions and 21 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/rest_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -57,5 +57,10 @@
<td style="word-wrap: break-word;">104857600</td>
<td>The maximum content length in bytes that the server will handle.</td>
</tr>
<tr>
<td><h5>rest.server.numThreads</h5></td>
<td style="word-wrap: break-word;">4</td>
<td>The number of threads for the asynchronous processing of requests.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -117,4 +117,8 @@ public class RestOptions {
.defaultValue(104_857_600)
.withDescription("The maximum content length in bytes that the client will handle.");

public static final ConfigOption<Integer> SERVER_NUM_THREADS =
key("rest.server.numThreads")
.defaultValue(4)
.withDescription("The number of threads for the asynchronous processing of requests.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.dispatcher.DispatcherRestEndpoint;
import org.apache.flink.runtime.leaderelection.LeaderContender;
Expand Down Expand Up @@ -69,7 +68,7 @@
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.stream.Collectors;

import static org.apache.flink.docs.util.Utils.escapeCharacters;
Expand Down Expand Up @@ -323,7 +322,6 @@ private static class DocumentingDispatcherRestEndpoint extends DispatcherRestEnd
private static final Configuration config;
private static final RestServerEndpointConfiguration restConfig;
private static final RestHandlerConfiguration handlerConfig;
private static final Executor executor;
private static final GatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever;
private static final GatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever;
private static final MetricQueryServiceRetriever metricQueryServiceRetriever;
Expand All @@ -339,7 +337,6 @@ private static class DocumentingDispatcherRestEndpoint extends DispatcherRestEnd
throw new RuntimeException("Implementation error. RestServerEndpointConfiguration#fromConfiguration failed for default configuration.");
}
handlerConfig = RestHandlerConfiguration.fromConfiguration(config);
executor = Executors.directExecutor();

dispatcherGatewayRetriever = () -> null;
resourceManagerGatewayRetriever = () -> null;
Expand All @@ -354,7 +351,7 @@ private DocumentingDispatcherRestEndpoint() throws IOException {
handlerConfig,
resourceManagerGatewayRetriever,
NoOpTransientBlobService.INSTANCE,
executor,
Executors.newFixedThreadPool(1),
metricQueryServiceRetriever,
NoOpElectionService.INSTANCE,
NoOpFatalErrorHandler.INSTANCE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

/**
* REST endpoint for the {@link Dispatcher} component.
Expand All @@ -59,7 +59,7 @@ public DispatcherRestEndpoint(
RestHandlerConfiguration restConfiguration,
GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
TransientBlobService transientBlobService,
Executor executor,
ExecutorService executor,
MetricQueryServiceRetriever metricQueryServiceRetriever,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
Expand Down Expand Up @@ -138,7 +139,9 @@ public DispatcherResourceManagerComponent<T> create(
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
rpcService.getExecutor(),
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(actorSystem, timeout),
highAvailabilityServices.getWebMonitorLeaderElectionService(),
fatalErrorHandler);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;

import java.io.IOException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

/**
* REST endpoint for the {@link JobClusterEntrypoint}.
Expand All @@ -46,7 +46,7 @@ public MiniDispatcherRestEndpoint(
RestHandlerConfiguration restConfiguration,
GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
TransientBlobService transientBlobService,
Executor executor,
ExecutorService executor,
MetricQueryServiceRetriever metricQueryServiceRetriever,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobCacheService;
Expand Down Expand Up @@ -70,6 +71,7 @@
import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
import org.apache.flink.runtime.taskexecutor.TaskExecutor;
import org.apache.flink.runtime.taskexecutor.TaskManagerRunner;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.retriever.impl.AkkaQueryServiceRetriever;
import org.apache.flink.runtime.webmonitor.retriever.impl.RpcGatewayRetriever;
import org.apache.flink.util.AutoCloseableAsync;
Expand Down Expand Up @@ -341,7 +343,9 @@ public void start() throws Exception {
RestHandlerConfiguration.fromConfiguration(configuration),
resourceManagerGatewayRetriever,
blobServer.getTransientBlobService(),
commonRpcService.getExecutor(),
WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS, 1),
"DispatcherRestEndpoint"),
new AkkaQueryServiceRetriever(
actorSystem,
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

/**
* {@link RestEndpointFactory} which creates a {@link MiniDispatcherRestEndpoint}.
Expand All @@ -45,7 +45,7 @@ public WebMonitorEndpoint<RestfulGateway> createRestEndpoint(
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
TransientBlobService transientBlobService,
Executor executor,
ExecutorService executor,
MetricQueryServiceRetriever metricQueryServiceRetriever,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

/**
* {@link WebMonitorEndpoint} factory.
Expand All @@ -43,7 +43,7 @@ WebMonitorEndpoint<T> createRestEndpoint(
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
TransientBlobService transientBlobService,
Executor executor,
ExecutorService executor,
MetricQueryServiceRetriever metricQueryServiceRetriever,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;

import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;

/**
* {@link RestEndpointFactory} which creates a {@link DispatcherRestEndpoint}.
Expand All @@ -44,7 +44,7 @@ public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
TransientBlobService transientBlobService,
Executor executor,
ExecutorService executor,
MetricQueryServiceRetriever metricQueryServiceRetriever,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,13 @@
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagerStdoutFileHeaders;
import org.apache.flink.runtime.rest.messages.taskmanager.TaskManagersHeaders;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.ExecutorUtils;
import org.apache.flink.util.FileUtils;
import org.apache.flink.util.Preconditions;

Expand All @@ -134,7 +136,9 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

/**
* Rest endpoint which serves the web frontend REST calls.
Expand All @@ -148,7 +152,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
protected final RestHandlerConfiguration restConfiguration;
private final GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever;
private final TransientBlobService transientBlobService;
protected final Executor executor;
protected final ExecutorService executor;

private final ExecutionGraphCache executionGraphCache;
private final CheckpointStatsCache checkpointStatsCache;
Expand All @@ -170,7 +174,7 @@ public WebMonitorEndpoint(
RestHandlerConfiguration restConfiguration,
GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
TransientBlobService transientBlobService,
Executor executor,
ExecutorService executor,
MetricQueryServiceRetriever metricQueryServiceRetriever,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws IOException {
Expand Down Expand Up @@ -715,7 +719,9 @@ public void startInternal() throws Exception {
protected CompletableFuture<Void> shutDownInternal() {
executionGraphCache.close();

final CompletableFuture<Void> shutdownFuture = super.shutDownInternal();
final CompletableFuture<Void> shutdownFuture = FutureUtils.runAfterwards(
super.shutDownInternal(),
() -> ExecutorUtils.gracefulShutdown(10, TimeUnit.SECONDS, executor));

final File webUiDir = restConfiguration.getWebUiDir();

Expand Down Expand Up @@ -776,4 +782,13 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
}
return archivedJson;
}

public static ExecutorService createExecutorService(int numThreads, String componentName) {
return Executors.newFixedThreadPool(
numThreads,
new ExecutorThreadFactory.Builder()
.setThreadPriority(Thread.MIN_PRIORITY)
.setPoolName("Flink-" + componentName)
.build());
}
}

0 comments on commit 0e1a250

Please sign in to comment.