Skip to content

Commit

Permalink
[FLINK-17308] Pass ScheduledExecutorService to WebMonitorEndpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Apr 24, 2020
1 parent 5cfa396 commit bf0cfa6
Show file tree
Hide file tree
Showing 8 changed files with 18 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
* REST endpoint for the {@link Dispatcher} component.
Expand All @@ -58,7 +58,7 @@ public DispatcherRestEndpoint(
RestHandlerConfiguration restConfiguration,
GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
TransientBlobService transientBlobService,
ExecutorService executor,
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
* Abstract class which implements the creation of the {@link DispatcherResourceManagerComponent} components.
Expand Down Expand Up @@ -137,7 +137,7 @@ public DispatcherResourceManagerComponent create(
10,
Time.milliseconds(50L));

final ExecutorService executor = WebMonitorEndpoint.createExecutorService(
final ScheduledExecutorService executor = WebMonitorEndpoint.createExecutorService(
configuration.getInteger(RestOptions.SERVER_NUM_THREADS),
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
* REST endpoint for the {@link JobClusterEntrypoint}.
Expand All @@ -46,7 +46,7 @@ public MiniDispatcherRestEndpoint(
RestHandlerConfiguration restConfiguration,
GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
TransientBlobService transientBlobService,
ExecutorService executor,
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
* {@link RestEndpointFactory} which creates a {@link MiniDispatcherRestEndpoint}.
Expand All @@ -45,7 +45,7 @@ public WebMonitorEndpoint<RestfulGateway> createRestEndpoint(
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
TransientBlobService transientBlobService,
ExecutorService executor,
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
* {@link WebMonitorEndpoint} factory.
Expand All @@ -43,7 +43,7 @@ WebMonitorEndpoint<T> createRestEndpoint(
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
TransientBlobService transientBlobService,
ExecutorService executor,
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws Exception;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;

/**
* {@link RestEndpointFactory} which creates a {@link DispatcherRestEndpoint}.
Expand All @@ -44,7 +44,7 @@ public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGatewayRetriever,
TransientBlobService transientBlobService,
ExecutorService executor,
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
Expand All @@ -160,7 +160,7 @@ public class WebMonitorEndpoint<T extends RestfulGateway> extends RestServerEndp
protected final RestHandlerConfiguration restConfiguration;
private final GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever;
private final TransientBlobService transientBlobService;
protected final ExecutorService executor;
protected final ScheduledExecutorService executor;

private final ExecutionGraphCache executionGraphCache;
private final CheckpointStatsCache checkpointStatsCache;
Expand All @@ -182,7 +182,7 @@ public WebMonitorEndpoint(
RestHandlerConfiguration restConfiguration,
GatewayRetriever<ResourceManagerGateway> resourceManagerRetriever,
TransientBlobService transientBlobService,
ExecutorService executor,
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws IOException {
Expand Down Expand Up @@ -799,7 +799,7 @@ public Collection<ArchivedJson> archiveJsonWithPath(AccessExecutionGraph graph)
return archivedJson;
}

public static ExecutorService createExecutorService(int numThreads, int threadPriority, String componentName) {
public static ScheduledExecutorService createExecutorService(int numThreads, int threadPriority, String componentName) {
if (threadPriority < Thread.MIN_PRIORITY || threadPriority > Thread.MAX_PRIORITY) {
throw new IllegalArgumentException(
String.format(
Expand All @@ -809,7 +809,7 @@ public static ExecutorService createExecutorService(int numThreads, int threadPr
threadPriority));
}

return Executors.newFixedThreadPool(
return Executors.newScheduledThreadPool(
numThreads,
new ExecutorThreadFactory.Builder()
.setThreadPriority(threadPriority)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public DocumentingDispatcherRestEndpoint() throws IOException {
handlerConfig,
resourceManagerGatewayRetriever,
NoOpTransientBlobService.INSTANCE,
Executors.newFixedThreadPool(1),
Executors.newScheduledThreadPool(1),
VoidMetricFetcher.INSTANCE,
NoOpElectionService.INSTANCE,
NoOpFatalErrorHandler.INSTANCE);
Expand Down

0 comments on commit bf0cfa6

Please sign in to comment.