Skip to content

Commit

Permalink
[FLINK-7668] Add ExecutionGraphCache for ExecutionGraph based REST ha…
Browse files Browse the repository at this point in the history
…ndlers

The ExecutionGraphCache replaces the ExecutionGraphHolder. Unlike the latter, the former
does not expect the AccessExecutionGraph to be the true ExecutionGraph. Instead it assumes
it to be the ArchivedExecutionGraph. Therefore, it invalidates the cache entries after
a given time to live period. This will trigger requesting the AccessExecutionGraph again
and, thus, updating the ExecutionGraph information for the ExecutionGraph based REST
handlers.

In order to avoid memory leaks, the WebRuntimeMonitor starts now a periodic cleanup task
which triggers ExecutionGraphCache.cleanup. This methods releases all cache entries which
have exceeded their time to live. Currently it is set to 20 * refreshInterval of the
web gui.

This closes apache#4728.
  • Loading branch information
tillrohrmann committed Oct 2, 2017
1 parent c3472b9 commit aae417f
Show file tree
Hide file tree
Showing 62 changed files with 840 additions and 330 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.clusterframework.BootstrapTools;
import org.apache.flink.runtime.clusterframework.ContainerSpecification;
import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
import org.apache.flink.runtime.jobmanager.JobManager;
Expand Down Expand Up @@ -294,7 +295,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie
new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)),
new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout),
webMonitorTimeout,
futureExecutor,
new ScheduledExecutorServiceAdapter(futureExecutor),
LOG);
if (webMonitor != null) {
final URL webMonitorURL = new URL(webMonitor.getRestAddress());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.blob.BlobView;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.jobmanager.MemoryArchivist;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
Expand All @@ -34,7 +35,7 @@
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobIdsHandler;
import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler;
import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder;
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
import org.apache.flink.runtime.rest.handler.legacy.JobAccumulatorsHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler;
import org.apache.flink.runtime.rest.handler.legacy.JobCancellationWithSavepointHandlers;
Expand Down Expand Up @@ -93,7 +94,7 @@
import java.io.IOException;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;

Expand Down Expand Up @@ -143,6 +144,10 @@ public class WebRuntimeMonitor implements WebMonitor {

private final WebMonitorConfig cfg;

private final ExecutionGraphCache executionGraphCache;

private final ScheduledFuture<?> executionGraphCleanupTask;

private AtomicBoolean cleanedUp = new AtomicBoolean();


Expand All @@ -155,7 +160,7 @@ public WebRuntimeMonitor(
LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
MetricQueryServiceRetriever queryServiceRetriever,
Time timeout,
Executor executor) throws IOException, InterruptedException {
ScheduledExecutor scheduledExecutor) throws IOException, InterruptedException {

this.leaderRetrievalService = checkNotNull(leaderRetrievalService);
this.retriever = Preconditions.checkNotNull(jobManagerRetriever);
Expand Down Expand Up @@ -193,11 +198,23 @@ public WebRuntimeMonitor(
this.uploadDir = null;
}

ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(timeout);
final long timeToLive = cfg.getRefreshInterval() * 10L;

this.executionGraphCache = new ExecutionGraphCache(
timeout,
Time.milliseconds(timeToLive));

final long cleanupInterval = timeToLive * 2L;

this.executionGraphCleanupTask = scheduledExecutor.scheduleWithFixedDelay(
executionGraphCache::cleanup,
cleanupInterval,
cleanupInterval,
TimeUnit.MILLISECONDS);

// - Back pressure stats ----------------------------------------------

stackTraceSamples = new StackTraceSampleCoordinator(executor, 60000);
stackTraceSamples = new StackTraceSampleCoordinator(scheduledExecutor, 60000);

// Back pressure stats tracker config
int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL);
Expand Down Expand Up @@ -228,55 +245,55 @@ public WebRuntimeMonitor(
} else {
serverSSLContext = null;
}
metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, executor, timeout);
metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout);

String defaultSavepointDir = config.getString(CoreOptions.SAVEPOINT_DIRECTORY);

JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, executor, defaultSavepointDir);
JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(executionGraphCache, scheduledExecutor, defaultSavepointDir);
RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler());
RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler());

Router router = new Router();
// config how to interact with this web server
get(router, new DashboardConfigHandler(executor, cfg.getRefreshInterval()));
get(router, new DashboardConfigHandler(scheduledExecutor, cfg.getRefreshInterval()));

// the overview - how many task managers, slots, free slots, ...
get(router, new ClusterOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT));
get(router, new ClusterOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT));

// job manager configuration
get(router, new ClusterConfigHandler(executor, config));
get(router, new ClusterConfigHandler(scheduledExecutor, config));

// overview over jobs
get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, true));
get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, false));
get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, false, true));

get(router, new CurrentJobIdsHandler(executor, DEFAULT_REQUEST_TIMEOUT));

get(router, new JobDetailsHandler(currentGraphs, executor, metricFetcher));

get(router, new JobVertexDetailsHandler(currentGraphs, executor, metricFetcher));
get(router, new SubtasksTimesHandler(currentGraphs, executor));
get(router, new JobVertexTaskManagersHandler(currentGraphs, executor, metricFetcher));
get(router, new JobVertexAccumulatorsHandler(currentGraphs, executor));
get(router, new JobVertexBackPressureHandler(currentGraphs, executor, backPressureStatsTracker, refreshInterval));
get(router, new JobVertexMetricsHandler(executor, metricFetcher));
get(router, new SubtasksAllAccumulatorsHandler(currentGraphs, executor));
get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, executor, metricFetcher));
get(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, executor, metricFetcher));
get(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs, executor));

get(router, new JobPlanHandler(currentGraphs, executor));
get(router, new JobConfigHandler(currentGraphs, executor));
get(router, new JobExceptionsHandler(currentGraphs, executor));
get(router, new JobAccumulatorsHandler(currentGraphs, executor));
get(router, new JobMetricsHandler(executor, metricFetcher));

get(router, new TaskManagersHandler(executor, DEFAULT_REQUEST_TIMEOUT, metricFetcher));
get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, true, true));
get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, true, false));
get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, false, true));

get(router, new CurrentJobIdsHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT));

get(router, new JobDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher));

get(router, new JobVertexDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher));
get(router, new SubtasksTimesHandler(executionGraphCache, scheduledExecutor));
get(router, new JobVertexTaskManagersHandler(executionGraphCache, scheduledExecutor, metricFetcher));
get(router, new JobVertexAccumulatorsHandler(executionGraphCache, scheduledExecutor));
get(router, new JobVertexBackPressureHandler(executionGraphCache, scheduledExecutor, backPressureStatsTracker, refreshInterval));
get(router, new JobVertexMetricsHandler(scheduledExecutor, metricFetcher));
get(router, new SubtasksAllAccumulatorsHandler(executionGraphCache, scheduledExecutor));
get(router, new SubtaskCurrentAttemptDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher));
get(router, new SubtaskExecutionAttemptDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher));
get(router, new SubtaskExecutionAttemptAccumulatorsHandler(executionGraphCache, scheduledExecutor));

get(router, new JobPlanHandler(executionGraphCache, scheduledExecutor));
get(router, new JobConfigHandler(executionGraphCache, scheduledExecutor));
get(router, new JobExceptionsHandler(executionGraphCache, scheduledExecutor));
get(router, new JobAccumulatorsHandler(executionGraphCache, scheduledExecutor));
get(router, new JobMetricsHandler(scheduledExecutor, metricFetcher));

get(router, new TaskManagersHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, metricFetcher));
get(router,
new TaskManagerLogHandler(
retriever,
executor,
scheduledExecutor,
localRestAddress,
timeout,
TaskManagerLogHandler.FileMode.LOG,
Expand All @@ -285,13 +302,13 @@ public WebRuntimeMonitor(
get(router,
new TaskManagerLogHandler(
retriever,
executor,
scheduledExecutor,
localRestAddress,
timeout,
TaskManagerLogHandler.FileMode.STDOUT,
config,
blobView));
get(router, new TaskManagerMetricsHandler(executor, metricFetcher));
get(router, new TaskManagerMetricsHandler(scheduledExecutor, metricFetcher));

router
// log and stdout
Expand All @@ -305,48 +322,48 @@ public WebRuntimeMonitor(
.GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") :
new StaticFileServerHandler<>(retriever, localRestAddress, timeout, logFiles.stdOutFile));

get(router, new JobManagerMetricsHandler(executor, metricFetcher));
get(router, new JobManagerMetricsHandler(scheduledExecutor, metricFetcher));

// Cancel a job via GET (for proper integration with YARN this has to be performed via GET)
get(router, new JobCancellationHandler(executor, timeout));
get(router, new JobCancellationHandler(scheduledExecutor, timeout));
// DELETE is the preferred way of canceling a job (Rest-conform)
delete(router, new JobCancellationHandler(executor, timeout));
delete(router, new JobCancellationHandler(scheduledExecutor, timeout));

get(router, triggerHandler);
get(router, inProgressHandler);

// stop a job via GET (for proper integration with YARN this has to be performed via GET)
get(router, new JobStoppingHandler(executor, timeout));
get(router, new JobStoppingHandler(scheduledExecutor, timeout));
// DELETE is the preferred way of stopping a job (Rest-conform)
delete(router, new JobStoppingHandler(executor, timeout));
delete(router, new JobStoppingHandler(scheduledExecutor, timeout));

int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries);

// Register the checkpoint stats handlers
get(router, new CheckpointStatsHandler(currentGraphs, executor));
get(router, new CheckpointConfigHandler(currentGraphs, executor));
get(router, new CheckpointStatsDetailsHandler(currentGraphs, executor, cache));
get(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, executor, cache));
get(router, new CheckpointStatsHandler(executionGraphCache, scheduledExecutor));
get(router, new CheckpointConfigHandler(executionGraphCache, scheduledExecutor));
get(router, new CheckpointStatsDetailsHandler(executionGraphCache, scheduledExecutor, cache));
get(router, new CheckpointStatsDetailsSubtasksHandler(executionGraphCache, scheduledExecutor, cache));

if (webSubmitAllow) {
// fetch the list of uploaded jars.
get(router, new JarListHandler(executor, uploadDir));
get(router, new JarListHandler(scheduledExecutor, uploadDir));

// get plan for an uploaded jar
get(router, new JarPlanHandler(executor, uploadDir));
get(router, new JarPlanHandler(scheduledExecutor, uploadDir));

// run a jar
post(router, new JarRunHandler(executor, uploadDir, timeout, config));
post(router, new JarRunHandler(scheduledExecutor, uploadDir, timeout, config));

// upload a jar
post(router, new JarUploadHandler(executor, uploadDir));
post(router, new JarUploadHandler(scheduledExecutor, uploadDir));

// delete an uploaded jar from submission interface
delete(router, new JarDeleteHandler(executor, uploadDir));
delete(router, new JarDeleteHandler(scheduledExecutor, uploadDir));
} else {
// send an Access Denied message
JarAccessDeniedHandler jad = new JarAccessDeniedHandler(executor);
JarAccessDeniedHandler jad = new JarAccessDeniedHandler(scheduledExecutor);
get(router, jad);
post(router, jad);
delete(router, jad);
Expand Down Expand Up @@ -447,6 +464,11 @@ public void run() {
@Override
public void stop() throws Exception {
synchronized (startupShutdownLock) {

executionGraphCleanupTask.cancel(false);

executionGraphCache.close();

leaderRetrievalService.stop();

netty.shutdown();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ public void testRedirectToLeader() throws Exception {
jobManagerRetrievers[i],
new AkkaQueryServiceRetriever(jobManagerSystem[i], TIMEOUT),
TIMEOUT,
TestingUtils.defaultExecutor());
TestingUtils.defaultScheduledExecutor());
}

ActorRef[] jobManager = new ActorRef[2];
Expand Down Expand Up @@ -323,7 +323,7 @@ public void testLeaderNotAvailable() throws Exception {
new AkkaJobManagerRetriever(actorSystem, TIMEOUT, 0, Time.milliseconds(50L)),
new AkkaQueryServiceRetriever(actorSystem, TIMEOUT),
TIMEOUT,
TestingUtils.defaultExecutor());
TestingUtils.defaultScheduledExecutor());

webRuntimeMonitor.start();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.FlinkJobNotFoundException;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview;
import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails;
Expand Down Expand Up @@ -226,7 +227,7 @@ public CompletableFuture<MultipleJobsDetails> requestJobDetails(boolean includeR
}

@Override
public CompletableFuture<Optional<AccessExecutionGraph>> requestJob(JobID jobId, Time timeout) {
public CompletableFuture<AccessExecutionGraph> requestJob(JobID jobId, Time timeout) {
CompletableFuture<JobManagerMessages.JobResponse> jobResponseFuture = FutureUtils.toJava(
jobManagerGateway
.ask(new JobManagerMessages.RequestJob(jobId), FutureUtils.toFiniteDuration(timeout))
Expand All @@ -235,9 +236,9 @@ public CompletableFuture<Optional<AccessExecutionGraph>> requestJob(JobID jobId,
return jobResponseFuture.thenApply(
(JobManagerMessages.JobResponse jobResponse) -> {
if (jobResponse instanceof JobManagerMessages.JobFound) {
return Optional.of(((JobManagerMessages.JobFound) jobResponse).executionGraph());
return ((JobManagerMessages.JobFound) jobResponse).executionGraph();
} else {
return Optional.empty();
throw new CompletionException(new FlinkJobNotFoundException(jobId));
}
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,14 @@

package org.apache.flink.runtime.clusterframework;

import akka.actor.ActorSystem;
import com.typesafe.config.Config;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.lang3.StringUtils;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.WebOptions;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.concurrent.ScheduledExecutor;
import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
import org.apache.flink.runtime.jobmaster.JobManagerGateway;
import org.apache.flink.runtime.webmonitor.WebMonitor;
Expand All @@ -40,10 +34,13 @@
import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever;
import org.apache.flink.util.NetUtils;

import akka.actor.ActorSystem;
import com.typesafe.config.Config;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;

import org.slf4j.LoggerFactory;
import scala.concurrent.duration.FiniteDuration;

import java.io.File;
import java.io.FileWriter;
Expand All @@ -55,7 +52,8 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import java.util.concurrent.Executor;

import scala.concurrent.duration.FiniteDuration;

/**
* Tools for starting JobManager and TaskManager processes, including the
Expand Down Expand Up @@ -178,7 +176,7 @@ public static ActorSystem startActorSystem(
* @param jobManagerRetriever to retrieve the leading JobManagerGateway
* @param queryServiceRetriever to resolve a query service
* @param timeout for asynchronous operations
* @param executor to run asynchronous operations
* @param scheduledExecutor to run asynchronous operations
* @param logger Logger for log output
* @return WebMonitor instance.
* @throws Exception
Expand All @@ -189,7 +187,7 @@ public static WebMonitor startWebMonitorIfConfigured(
LeaderGatewayRetriever<JobManagerGateway> jobManagerRetriever,
MetricQueryServiceRetriever queryServiceRetriever,
Time timeout,
Executor executor,
ScheduledExecutor scheduledExecutor,
Logger logger) throws Exception {

if (config.getInteger(WebOptions.PORT, 0) >= 0) {
Expand All @@ -203,7 +201,7 @@ public static WebMonitor startWebMonitorIfConfigured(
jobManagerRetriever,
queryServiceRetriever,
timeout,
executor);
scheduledExecutor);

// start the web monitor
if (monitor != null) {
Expand Down
Loading

0 comments on commit aae417f

Please sign in to comment.