Skip to content

Commit

Permalink
[FLINK-11382][metrics] Disable MetricFetcher if interval is configure…
Browse files Browse the repository at this point in the history
…d to 0
  • Loading branch information
leesf authored and zentol committed Jan 31, 2019
1 parent 343d84b commit 11b9a2b
Show file tree
Hide file tree
Showing 3 changed files with 39 additions and 18 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.rest.handler.legacy.backpressure.StackTraceSampleCoordinator;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
import org.apache.flink.runtime.rest.handler.router.Router;
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
import org.apache.flink.runtime.webmonitor.retriever.LeaderGatewayRetriever;
Expand Down Expand Up @@ -194,12 +195,17 @@ public WebRuntimeMonitor(
} else {
sslFactory = null;
}
metricFetcher = new MetricFetcherImpl<>(
retriever,
queryServiceRetriever,
scheduledExecutor,
timeout,
MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL.defaultValue());

final long updateInterval =
config.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
metricFetcher = updateInterval == 0
? VoidMetricFetcher.INSTANCE
: new MetricFetcherImpl<>(
retriever,
queryServiceRetriever,
scheduledExecutor,
timeout,
updateInterval);

Router router = new Router();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.blob.BlobServer;
import org.apache.flink.runtime.clusterframework.types.ResourceID;
Expand All @@ -43,7 +44,9 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway;
import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
import org.apache.flink.runtime.rest.RestEndpointFactory;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.webmonitor.RestfulGateway;
Expand Down Expand Up @@ -135,17 +138,22 @@ public DispatcherResourceManagerComponent<T> create(
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");

final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher = updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor);

webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
configuration,
dispatcherGatewayRetriever,
resourceManagerGatewayRetriever,
blobServer,
executor,
MetricFetcherImpl.fromConfiguration(
configuration,
metricQueryServiceRetriever,
dispatcherGatewayRetriever,
executor),
metricFetcher,
highAvailabilityServices.getWebMonitorLeaderElectionService(),
fatalErrorHandler);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.MetricOptions;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
Expand Down Expand Up @@ -66,7 +67,9 @@
import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner;
import org.apache.flink.runtime.rest.RestServerEndpointConfiguration;
import org.apache.flink.runtime.rest.handler.RestHandlerConfiguration;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcher;
import org.apache.flink.runtime.rest.handler.legacy.metrics.MetricFetcherImpl;
import org.apache.flink.runtime.rest.handler.legacy.metrics.VoidMetricFetcher;
import org.apache.flink.runtime.rpc.FatalErrorHandler;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
Expand Down Expand Up @@ -360,6 +363,16 @@ public void start() throws Exception {
configuration.getInteger(RestOptions.SERVER_THREAD_PRIORITY),
"DispatcherRestEndpoint");

final long updateInterval = configuration.getLong(MetricOptions.METRIC_FETCHER_UPDATE_INTERVAL);
final MetricFetcher metricFetcher = updateInterval == 0
? VoidMetricFetcher.INSTANCE
: MetricFetcherImpl.fromConfiguration(configuration,
new AkkaQueryServiceRetriever(
metricQueryServiceActorSystem,
Time.milliseconds(
configuration.getLong(WebOptions.TIMEOUT))),
dispatcherGatewayRetriever, executor);

this.dispatcherRestEndpoint = new DispatcherRestEndpoint(
RestServerEndpointConfiguration.fromConfiguration(configuration),
dispatcherGatewayRetriever,
Expand All @@ -368,13 +381,7 @@ public void start() throws Exception {
resourceManagerGatewayRetriever,
blobServer.getTransientBlobService(),
executor,
MetricFetcherImpl.fromConfiguration(
configuration,
new AkkaQueryServiceRetriever(
metricQueryServiceActorSystem,
Time.milliseconds(configuration.getLong(WebOptions.TIMEOUT))),
dispatcherGatewayRetriever,
executor),
metricFetcher,
haServices.getWebMonitorLeaderElectionService(),
new ShutDownFatalErrorHandler());

Expand Down

0 comments on commit 11b9a2b

Please sign in to comment.