Skip to content

Commit

Permalink
[FLINK-17308] Pass in ExecutionGraphCache to WebMonitorEndpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Apr 24, 2020
1 parent bf0cfa6 commit 3635b54
Show file tree
Hide file tree
Showing 7 changed files with 21 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
import org.apache.flink.runtime.rest.handler.job.JobSubmitHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.WebMonitorEndpoint;
Expand Down Expand Up @@ -61,6 +62,7 @@ public DispatcherRestEndpoint(
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
ExecutionGraphCache executionGraphCache,
FatalErrorHandler fatalErrorHandler) throws IOException {

super(
Expand All @@ -73,6 +75,7 @@ public DispatcherRestEndpoint(
executor,
metricFetcher,
leaderElectionService,
executionGraphCache,
fatalErrorHandler);

webSubmissionExtension = WebMonitorExtension.empty();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
Expand All @@ -49,6 +50,7 @@ public MiniDispatcherRestEndpoint(
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
ExecutionGraphCache executionGraphCache,
FatalErrorHandler fatalErrorHandler) throws IOException {
super(
endpointConfiguration,
Expand All @@ -60,6 +62,7 @@ public MiniDispatcherRestEndpoint(
executor,
metricFetcher,
leaderElectionService,
executionGraphCache,
fatalErrorHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ public WebMonitorEndpoint<RestfulGateway> createRestEndpoint(
executor,
metricFetcher,
leaderElectionService,
RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration),
fatalErrorHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,14 @@

package org.apache.flink.runtime.rest;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.blob.TransientBlobService;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
Expand All @@ -47,4 +50,10 @@ WebMonitorEndpoint<T> createRestEndpoint(
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
FatalErrorHandler fatalErrorHandler) throws Exception;

static ExecutionGraphCache createExecutionGraphCache(RestHandlerConfiguration restConfiguration) {
return new ExecutionGraphCache(
restConfiguration.getTimeout(),
Time.milliseconds(restConfiguration.getRefreshInterval()));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
executor,
metricFetcher,
leaderElectionService,
RestEndpointFactory.createExecutionGraphCache(restHandlerConfiguration),
fatalErrorHandler);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ public WebMonitorEndpoint(
ScheduledExecutorService executor,
MetricFetcher metricFetcher,
LeaderElectionService leaderElectionService,
ExecutionGraphCache executionGraphCache,
FatalErrorHandler fatalErrorHandler) throws IOException {
super(endpointConfiguration);
this.leaderRetriever = Preconditions.checkNotNull(leaderRetriever);
Expand All @@ -194,9 +195,7 @@ public WebMonitorEndpoint(
this.transientBlobService = Preconditions.checkNotNull(transientBlobService);
this.executor = Preconditions.checkNotNull(executor);

this.executionGraphCache = new ExecutionGraphCache(
restConfiguration.getTimeout(),
Time.milliseconds(restConfiguration.getRefreshInterval()));
this.executionGraphCache = executionGraphCache;

this.checkpointStatsCache = new CheckpointStatsCache(
restConfiguration.getMaxCheckpointStatisticCacheEntries());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.rest.RestEndpointFactory;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerSpecification;
Expand Down Expand Up @@ -85,6 +86,7 @@ public DocumentingDispatcherRestEndpoint() throws IOException {
Executors.newScheduledThreadPool(1),
VoidMetricFetcher.INSTANCE,
NoOpElectionService.INSTANCE,
RestEndpointFactory.createExecutionGraphCache(handlerConfig),
NoOpFatalErrorHandler.INSTANCE);
}

Expand Down

0 comments on commit 3635b54

Please sign in to comment.