diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java index 6abdb2893042c..f465464b371d1 100755 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/MesosApplicationMasterRunner.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContainerSpecification; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; @@ -294,7 +295,7 @@ protected int runPrivileged(Configuration config, Configuration dynamicPropertie new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout), webMonitorTimeout, - futureExecutor, + new ScheduledExecutorServiceAdapter(futureExecutor), LOG); if (webMonitor != null) { final URL webMonitorURL = new URL(webMonitor.getRestAddress()); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java index 1af6ab641eb47..a37ce2de8225c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitor.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.blob.BlobView; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.jobmanager.MemoryArchivist; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; @@ -34,7 +35,7 @@ import org.apache.flink.runtime.rest.handler.legacy.CurrentJobIdsHandler; import org.apache.flink.runtime.rest.handler.legacy.CurrentJobsOverviewHandler; import org.apache.flink.runtime.rest.handler.legacy.DashboardConfigHandler; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JobAccumulatorsHandler; import org.apache.flink.runtime.rest.handler.legacy.JobCancellationHandler; import org.apache.flink.runtime.rest.handler.legacy.JobCancellationWithSavepointHandlers; @@ -93,7 +94,7 @@ import java.io.IOException; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -143,6 +144,10 @@ public class WebRuntimeMonitor implements WebMonitor { private final WebMonitorConfig cfg; + private final ExecutionGraphCache executionGraphCache; + + private final ScheduledFuture executionGraphCleanupTask; + private AtomicBoolean cleanedUp = new AtomicBoolean(); @@ -155,7 +160,7 @@ public WebRuntimeMonitor( LeaderGatewayRetriever jobManagerRetriever, MetricQueryServiceRetriever queryServiceRetriever, Time timeout, - Executor executor) throws IOException, InterruptedException { + ScheduledExecutor scheduledExecutor) throws IOException, InterruptedException { this.leaderRetrievalService = checkNotNull(leaderRetrievalService); this.retriever = Preconditions.checkNotNull(jobManagerRetriever); @@ -193,11 +198,23 @@ public WebRuntimeMonitor( this.uploadDir = null; } - ExecutionGraphHolder currentGraphs = new ExecutionGraphHolder(timeout); + final long timeToLive = cfg.getRefreshInterval() * 10L; + + this.executionGraphCache = new ExecutionGraphCache( + timeout, + Time.milliseconds(timeToLive)); + + final long cleanupInterval = timeToLive * 2L; + + this.executionGraphCleanupTask = scheduledExecutor.scheduleWithFixedDelay( + executionGraphCache::cleanup, + cleanupInterval, + cleanupInterval, + TimeUnit.MILLISECONDS); // - Back pressure stats ---------------------------------------------- - stackTraceSamples = new StackTraceSampleCoordinator(executor, 60000); + stackTraceSamples = new StackTraceSampleCoordinator(scheduledExecutor, 60000); // Back pressure stats tracker config int cleanUpInterval = config.getInteger(WebOptions.BACKPRESSURE_CLEANUP_INTERVAL); @@ -228,55 +245,55 @@ public WebRuntimeMonitor( } else { serverSSLContext = null; } - metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, executor, timeout); + metricFetcher = new MetricFetcher(retriever, queryServiceRetriever, scheduledExecutor, timeout); String defaultSavepointDir = config.getString(CoreOptions.SAVEPOINT_DIRECTORY); - JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(currentGraphs, executor, defaultSavepointDir); + JobCancellationWithSavepointHandlers cancelWithSavepoint = new JobCancellationWithSavepointHandlers(executionGraphCache, scheduledExecutor, defaultSavepointDir); RuntimeMonitorHandler triggerHandler = handler(cancelWithSavepoint.getTriggerHandler()); RuntimeMonitorHandler inProgressHandler = handler(cancelWithSavepoint.getInProgressHandler()); Router router = new Router(); // config how to interact with this web server - get(router, new DashboardConfigHandler(executor, cfg.getRefreshInterval())); + get(router, new DashboardConfigHandler(scheduledExecutor, cfg.getRefreshInterval())); // the overview - how many task managers, slots, free slots, ... - get(router, new ClusterOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT)); + get(router, new ClusterOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT)); // job manager configuration - get(router, new ClusterConfigHandler(executor, config)); + get(router, new ClusterConfigHandler(scheduledExecutor, config)); // overview over jobs - get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, true)); - get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, true, false)); - get(router, new CurrentJobsOverviewHandler(executor, DEFAULT_REQUEST_TIMEOUT, false, true)); - - get(router, new CurrentJobIdsHandler(executor, DEFAULT_REQUEST_TIMEOUT)); - - get(router, new JobDetailsHandler(currentGraphs, executor, metricFetcher)); - - get(router, new JobVertexDetailsHandler(currentGraphs, executor, metricFetcher)); - get(router, new SubtasksTimesHandler(currentGraphs, executor)); - get(router, new JobVertexTaskManagersHandler(currentGraphs, executor, metricFetcher)); - get(router, new JobVertexAccumulatorsHandler(currentGraphs, executor)); - get(router, new JobVertexBackPressureHandler(currentGraphs, executor, backPressureStatsTracker, refreshInterval)); - get(router, new JobVertexMetricsHandler(executor, metricFetcher)); - get(router, new SubtasksAllAccumulatorsHandler(currentGraphs, executor)); - get(router, new SubtaskCurrentAttemptDetailsHandler(currentGraphs, executor, metricFetcher)); - get(router, new SubtaskExecutionAttemptDetailsHandler(currentGraphs, executor, metricFetcher)); - get(router, new SubtaskExecutionAttemptAccumulatorsHandler(currentGraphs, executor)); - - get(router, new JobPlanHandler(currentGraphs, executor)); - get(router, new JobConfigHandler(currentGraphs, executor)); - get(router, new JobExceptionsHandler(currentGraphs, executor)); - get(router, new JobAccumulatorsHandler(currentGraphs, executor)); - get(router, new JobMetricsHandler(executor, metricFetcher)); - - get(router, new TaskManagersHandler(executor, DEFAULT_REQUEST_TIMEOUT, metricFetcher)); + get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, true, true)); + get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, true, false)); + get(router, new CurrentJobsOverviewHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, false, true)); + + get(router, new CurrentJobIdsHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT)); + + get(router, new JobDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher)); + + get(router, new JobVertexDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher)); + get(router, new SubtasksTimesHandler(executionGraphCache, scheduledExecutor)); + get(router, new JobVertexTaskManagersHandler(executionGraphCache, scheduledExecutor, metricFetcher)); + get(router, new JobVertexAccumulatorsHandler(executionGraphCache, scheduledExecutor)); + get(router, new JobVertexBackPressureHandler(executionGraphCache, scheduledExecutor, backPressureStatsTracker, refreshInterval)); + get(router, new JobVertexMetricsHandler(scheduledExecutor, metricFetcher)); + get(router, new SubtasksAllAccumulatorsHandler(executionGraphCache, scheduledExecutor)); + get(router, new SubtaskCurrentAttemptDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher)); + get(router, new SubtaskExecutionAttemptDetailsHandler(executionGraphCache, scheduledExecutor, metricFetcher)); + get(router, new SubtaskExecutionAttemptAccumulatorsHandler(executionGraphCache, scheduledExecutor)); + + get(router, new JobPlanHandler(executionGraphCache, scheduledExecutor)); + get(router, new JobConfigHandler(executionGraphCache, scheduledExecutor)); + get(router, new JobExceptionsHandler(executionGraphCache, scheduledExecutor)); + get(router, new JobAccumulatorsHandler(executionGraphCache, scheduledExecutor)); + get(router, new JobMetricsHandler(scheduledExecutor, metricFetcher)); + + get(router, new TaskManagersHandler(scheduledExecutor, DEFAULT_REQUEST_TIMEOUT, metricFetcher)); get(router, new TaskManagerLogHandler( retriever, - executor, + scheduledExecutor, localRestAddress, timeout, TaskManagerLogHandler.FileMode.LOG, @@ -285,13 +302,13 @@ public WebRuntimeMonitor( get(router, new TaskManagerLogHandler( retriever, - executor, + scheduledExecutor, localRestAddress, timeout, TaskManagerLogHandler.FileMode.STDOUT, config, blobView)); - get(router, new TaskManagerMetricsHandler(executor, metricFetcher)); + get(router, new TaskManagerMetricsHandler(scheduledExecutor, metricFetcher)); router // log and stdout @@ -305,48 +322,48 @@ public WebRuntimeMonitor( .GET("/jobmanager/stdout", logFiles.stdOutFile == null ? new ConstantTextHandler("(stdout file unavailable)") : new StaticFileServerHandler<>(retriever, localRestAddress, timeout, logFiles.stdOutFile)); - get(router, new JobManagerMetricsHandler(executor, metricFetcher)); + get(router, new JobManagerMetricsHandler(scheduledExecutor, metricFetcher)); // Cancel a job via GET (for proper integration with YARN this has to be performed via GET) - get(router, new JobCancellationHandler(executor, timeout)); + get(router, new JobCancellationHandler(scheduledExecutor, timeout)); // DELETE is the preferred way of canceling a job (Rest-conform) - delete(router, new JobCancellationHandler(executor, timeout)); + delete(router, new JobCancellationHandler(scheduledExecutor, timeout)); get(router, triggerHandler); get(router, inProgressHandler); // stop a job via GET (for proper integration with YARN this has to be performed via GET) - get(router, new JobStoppingHandler(executor, timeout)); + get(router, new JobStoppingHandler(scheduledExecutor, timeout)); // DELETE is the preferred way of stopping a job (Rest-conform) - delete(router, new JobStoppingHandler(executor, timeout)); + delete(router, new JobStoppingHandler(scheduledExecutor, timeout)); int maxCachedEntries = config.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE); CheckpointStatsCache cache = new CheckpointStatsCache(maxCachedEntries); // Register the checkpoint stats handlers - get(router, new CheckpointStatsHandler(currentGraphs, executor)); - get(router, new CheckpointConfigHandler(currentGraphs, executor)); - get(router, new CheckpointStatsDetailsHandler(currentGraphs, executor, cache)); - get(router, new CheckpointStatsDetailsSubtasksHandler(currentGraphs, executor, cache)); + get(router, new CheckpointStatsHandler(executionGraphCache, scheduledExecutor)); + get(router, new CheckpointConfigHandler(executionGraphCache, scheduledExecutor)); + get(router, new CheckpointStatsDetailsHandler(executionGraphCache, scheduledExecutor, cache)); + get(router, new CheckpointStatsDetailsSubtasksHandler(executionGraphCache, scheduledExecutor, cache)); if (webSubmitAllow) { // fetch the list of uploaded jars. - get(router, new JarListHandler(executor, uploadDir)); + get(router, new JarListHandler(scheduledExecutor, uploadDir)); // get plan for an uploaded jar - get(router, new JarPlanHandler(executor, uploadDir)); + get(router, new JarPlanHandler(scheduledExecutor, uploadDir)); // run a jar - post(router, new JarRunHandler(executor, uploadDir, timeout, config)); + post(router, new JarRunHandler(scheduledExecutor, uploadDir, timeout, config)); // upload a jar - post(router, new JarUploadHandler(executor, uploadDir)); + post(router, new JarUploadHandler(scheduledExecutor, uploadDir)); // delete an uploaded jar from submission interface - delete(router, new JarDeleteHandler(executor, uploadDir)); + delete(router, new JarDeleteHandler(scheduledExecutor, uploadDir)); } else { // send an Access Denied message - JarAccessDeniedHandler jad = new JarAccessDeniedHandler(executor); + JarAccessDeniedHandler jad = new JarAccessDeniedHandler(scheduledExecutor); get(router, jad); post(router, jad); delete(router, jad); @@ -447,6 +464,11 @@ public void run() { @Override public void stop() throws Exception { synchronized (startupShutdownLock) { + + executionGraphCleanupTask.cancel(false); + + executionGraphCache.close(); + leaderRetrievalService.stop(); netty.shutdown(); diff --git a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java index e67c5ce6eda9b..95b5811a8a76a 100644 --- a/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java +++ b/flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/WebRuntimeMonitorITCase.java @@ -179,7 +179,7 @@ public void testRedirectToLeader() throws Exception { jobManagerRetrievers[i], new AkkaQueryServiceRetriever(jobManagerSystem[i], TIMEOUT), TIMEOUT, - TestingUtils.defaultExecutor()); + TestingUtils.defaultScheduledExecutor()); } ActorRef[] jobManager = new ActorRef[2]; @@ -323,7 +323,7 @@ public void testLeaderNotAvailable() throws Exception { new AkkaJobManagerRetriever(actorSystem, TIMEOUT, 0, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, TIMEOUT), TIMEOUT, - TestingUtils.defaultExecutor()); + TestingUtils.defaultScheduledExecutor()); webRuntimeMonitor.start(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java index 18025dd10b9c7..2a2c414a603c8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/AkkaJobManagerGateway.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview; import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; @@ -226,7 +227,7 @@ public CompletableFuture requestJobDetails(boolean includeR } @Override - public CompletableFuture> requestJob(JobID jobId, Time timeout) { + public CompletableFuture requestJob(JobID jobId, Time timeout) { CompletableFuture jobResponseFuture = FutureUtils.toJava( jobManagerGateway .ask(new JobManagerMessages.RequestJob(jobId), FutureUtils.toFiniteDuration(timeout)) @@ -235,9 +236,9 @@ public CompletableFuture> requestJob(JobID jobId, return jobResponseFuture.thenApply( (JobManagerMessages.JobResponse jobResponse) -> { if (jobResponse instanceof JobManagerMessages.JobFound) { - return Optional.of(((JobManagerMessages.JobFound) jobResponse).executionGraph()); + return ((JobManagerMessages.JobFound) jobResponse).executionGraph(); } else { - return Optional.empty(); + throw new CompletionException(new FlinkJobNotFoundException(jobId)); } }); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java index 62cddb00b8f82..0a56963b1dd01 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/BootstrapTools.java @@ -18,13 +18,6 @@ package org.apache.flink.runtime.clusterframework; -import akka.actor.ActorSystem; -import com.typesafe.config.Config; - -import org.apache.commons.cli.CommandLine; -import org.apache.commons.cli.Option; -import org.apache.commons.lang3.StringUtils; - import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -32,6 +25,7 @@ import org.apache.flink.configuration.JobManagerOptions; import org.apache.flink.configuration.WebOptions; import org.apache.flink.runtime.akka.AkkaUtils; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.webmonitor.WebMonitor; @@ -40,10 +34,13 @@ import org.apache.flink.runtime.webmonitor.retriever.MetricQueryServiceRetriever; import org.apache.flink.util.NetUtils; +import akka.actor.ActorSystem; +import com.typesafe.config.Config; +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; - import org.slf4j.LoggerFactory; -import scala.concurrent.duration.FiniteDuration; import java.io.File; import java.io.FileWriter; @@ -55,7 +52,8 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import java.util.concurrent.Executor; + +import scala.concurrent.duration.FiniteDuration; /** * Tools for starting JobManager and TaskManager processes, including the @@ -178,7 +176,7 @@ public static ActorSystem startActorSystem( * @param jobManagerRetriever to retrieve the leading JobManagerGateway * @param queryServiceRetriever to resolve a query service * @param timeout for asynchronous operations - * @param executor to run asynchronous operations + * @param scheduledExecutor to run asynchronous operations * @param logger Logger for log output * @return WebMonitor instance. * @throws Exception @@ -189,7 +187,7 @@ public static WebMonitor startWebMonitorIfConfigured( LeaderGatewayRetriever jobManagerRetriever, MetricQueryServiceRetriever queryServiceRetriever, Time timeout, - Executor executor, + ScheduledExecutor scheduledExecutor, Logger logger) throws Exception { if (config.getInteger(WebOptions.PORT, 0) >= 0) { @@ -203,7 +201,7 @@ public static WebMonitor startWebMonitorIfConfigured( jobManagerRetriever, queryServiceRetriever, timeout, - executor); + scheduledExecutor); // start the web monitor if (monitor != null) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java index 0396d520e61f7..4d89dc8781e58 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.client.JobSubmissionException; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.RunningJobsRegistry; @@ -284,7 +285,7 @@ public CompletableFuture requestStatusOverview(Time timeout) { } @Override - public CompletableFuture requestJobDetails(Time timeout) { + public CompletableFuture requestJobDetails(boolean includeRunning, boolean includeFinished, Time timeout) { final int numberJobsRunning = jobManagerRunners.size(); ArrayList> individualJobDetails = new ArrayList<>(numberJobsRunning); @@ -300,6 +301,17 @@ public CompletableFuture requestJobDetails(Time timeout) { new MultipleJobsDetails(jobDetails, null)); } + @Override + public CompletableFuture requestJob(JobID jobId, Time timeout) { + final JobManagerRunner jobManagerRunner = jobManagerRunners.get(jobId); + + if (jobManagerRunner == null) { + return FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)); + } else { + return jobManagerRunner.getJobManagerGateway().requestArchivedExecutionGraph(timeout); + } + } + /** * Cleans up the job related data from the dispatcher. If cleanupHA is true, then * the data will also be removed from HA. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java index 46b0cd9705a51..1d0a9dda75300 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/DispatcherGateway.java @@ -22,8 +22,6 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.webmonitor.RestfulGateway; @@ -80,7 +78,4 @@ CompletableFuture> listJobs( * @param timeout of the operation * @return Future {@link StatusOverview} containing the cluster information */ - CompletableFuture requestStatusOverview(@RpcTimeout Time timeout); - - CompletableFuture requestJobDetails(@RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java index 0ff88848f0570..782d6d005dd8c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobManagerGateway.java @@ -21,15 +21,12 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.akka.ListeningBehaviour; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.instance.Instance; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.messages.webmonitor.JobsWithIDsOverview; -import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; -import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.runtime.webmonitor.RestfulGateway; import javax.annotation.Nullable; @@ -124,37 +121,6 @@ public interface JobManagerGateway extends RestfulGateway { */ CompletableFuture> requestTaskManagerInstances(Time timeout); - /** - * Requests job details currently being executed by the JobManager. - * - * @param includeRunning true if running jobs shall be included, otherwise false - * @param includeFinished true if finished jobs shall be included, otherwise false - * @param timeout for the asynchronous operation - * @return Future containing the job details - */ - CompletableFuture requestJobDetails( - boolean includeRunning, - boolean includeFinished, - Time timeout); - - /** - * Requests the AccessExecutionGraph for the given jobId. If there is no such graph, then - * {@link Optional#empty()} is returned. - * - * @param jobId identifying the job whose AccessExecutionGraph is requested - * @param timeout for the asynchronous operation - * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link Optional#empty()} - */ - CompletableFuture> requestJob(JobID jobId, Time timeout); - - /** - * Requests the status overview from the JobManager. - * - * @param timeout for the asynchronous operation - * @return Future containing the status overview - */ - CompletableFuture requestStatusOverview(Time timeout); - /** * Requests the job overview from the JobManager. * diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 0e534366c6ba0..7efcc0b30b2ec 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -40,6 +40,7 @@ import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionGraph; @@ -744,6 +745,11 @@ public CompletableFuture requestJobDetails(Time timeout) { return CompletableFuture.supplyAsync(() -> WebMonitorUtils.createDetailsForJob(executionGraph), executor); } + @Override + public CompletableFuture requestArchivedExecutionGraph(Time timeout) { + return CompletableFuture.completedFuture(executionGraph.archive()); + } + //---------------------------------------------------------------------------------------------- // Internal methods //---------------------------------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index f3ca5be48717f..0628976cf270b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -23,6 +23,8 @@ import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.execution.ExecutionState; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; @@ -232,5 +234,19 @@ CompletableFuture registerTaskManager( */ void heartbeatFromResourceManager(final ResourceID resourceID); + /** + * Request the details of the executed job. + * + * @param timeout for the rpc call + * @return Future details of the executed job + */ CompletableFuture requestJobDetails(@RpcTimeout Time timeout); + + /** + * Request the {@link ArchivedExecutionGraph} of the currently executed job. + * + * @param timeout for the rpc call + * @return Future archived execution graph derived from the currently executed job + */ + CompletableFuture requestArchivedExecutionGraph(@RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java index 95686ac122158..f606071370ef6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/FlinkJobNotFoundException.java @@ -22,13 +22,13 @@ import org.apache.flink.util.FlinkException; /** - * Exception which is returned if a Flink job could not be found. + * Exception indicating that we could not find a Flink job with the given job ID. */ public class FlinkJobNotFoundException extends FlinkException { - private static final long serialVersionUID = -7803390762010615384L; + private static final long serialVersionUID = 2294698055059659025L; public FlinkJobNotFoundException(JobID jobId) { - super("Could not find Flink job (" + jobId + ")."); + super("Could not find Flink job (" + jobId + ')'); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java index 88932b5c5b79a..7fbde15fb5c1c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractExecutionGraphRequestHandler.java @@ -27,7 +27,6 @@ import org.apache.flink.util.Preconditions; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -38,11 +37,11 @@ */ public abstract class AbstractExecutionGraphRequestHandler extends AbstractJsonRequestHandler { - private final ExecutionGraphHolder executionGraphHolder; + private final ExecutionGraphCache executionGraphCache; - public AbstractExecutionGraphRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public AbstractExecutionGraphRequestHandler(ExecutionGraphCache executionGraphCache, Executor executor) { super(executor); - this.executionGraphHolder = Preconditions.checkNotNull(executionGraphHolder); + this.executionGraphCache = Preconditions.checkNotNull(executionGraphCache); } @Override @@ -63,16 +62,15 @@ public CompletableFuture handleJsonRequest( return FutureUtils.completedExceptionally(new FlinkException("Invalid JobID string '" + jidString + "'", e)); } - final CompletableFuture> graphFuture = executionGraphHolder.getExecutionGraph(jid, jobManagerGateway); + final CompletableFuture graphFuture = executionGraphCache.getExecutionGraph(jid, jobManagerGateway); - return graphFuture.thenComposeAsync( - (Optional optGraph) -> { - if (optGraph.isPresent()) { - return handleRequest(optGraph.get(), pathParams); - } else { - throw new CompletionException(new NotFoundException("Could not find job with jobId " + jid + '.')); - } - }, executor); + return graphFuture + .exceptionally( + throwable -> { + throw new CompletionException(new NotFoundException("Could not find job " + jid + '.')); + }) + .thenComposeAsync( + (AccessExecutionGraph executionGraph) -> handleRequest(executionGraph, pathParams)); } public abstract CompletableFuture handleRequest(AccessExecutionGraph graph, Map params); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java index e2e4484b288c3..70606e4229ebf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractJobVertexRequestHandler.java @@ -32,7 +32,7 @@ */ public abstract class AbstractJobVertexRequestHandler extends AbstractExecutionGraphRequestHandler { - public AbstractJobVertexRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public AbstractJobVertexRequestHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java index ec277d8c311ea..9a225f4e9fc96 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskAttemptRequestHandler.java @@ -35,7 +35,7 @@ */ public abstract class AbstractSubtaskAttemptRequestHandler extends AbstractSubtaskRequestHandler { - public AbstractSubtaskAttemptRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public AbstractSubtaskAttemptRequestHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java index d69038a0a9d33..b1797b02a38f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/AbstractSubtaskRequestHandler.java @@ -34,7 +34,7 @@ */ public abstract class AbstractSubtaskRequestHandler extends AbstractJobVertexRequestHandler { - public AbstractSubtaskRequestHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public AbstractSubtaskRequestHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java index 669ef32906dbc..a6640a9370906 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/CurrentJobsOverviewHandler.java @@ -75,7 +75,7 @@ public CurrentJobsOverviewHandler( @Override public CompletableFuture handleRequest(HandlerRequest request, DispatcherGateway gateway) { - return gateway.requestJobDetails(timeout); + return gateway.requestJobDetails(true, true, timeout); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java new file mode 100644 index 0000000000000..f63b042b3b45c --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCache.java @@ -0,0 +1,187 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.webmonitor.RestfulGateway; +import org.apache.flink.util.Preconditions; + +import java.io.Closeable; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Cache for {@link AccessExecutionGraph} which are obtained from the Flink cluster. Every cache entry + * has an associated time to live after which a new request will trigger the reloading of the + * {@link AccessExecutionGraph} from the cluster. + */ +public class ExecutionGraphCache implements Closeable { + + private final Time timeout; + + private final Time timeToLive; + + private final ConcurrentHashMap cachedExecutionGraphs; + + private volatile boolean running = true; + + public ExecutionGraphCache( + Time timeout, + Time timeToLive) { + this.timeout = checkNotNull(timeout); + this.timeToLive = checkNotNull(timeToLive); + + cachedExecutionGraphs = new ConcurrentHashMap<>(4); + } + + @Override + public void close() { + running = false; + + // clear all cached AccessExecutionGraphs + cachedExecutionGraphs.clear(); + } + + /** + * Gets the number of cache entries. + */ + public int size() { + return cachedExecutionGraphs.size(); + } + + /** + * Gets the {@link AccessExecutionGraph} for the given {@link JobID} and caches it. The + * {@link AccessExecutionGraph} will be requested again after the refresh interval has passed + * or if the graph could not be retrieved from the given gateway. + * + * @param jobId identifying the {@link AccessExecutionGraph} to get + * @param restfulGateway to request the {@link AccessExecutionGraph} from + * @return Future containing the requested {@link AccessExecutionGraph} + */ + public CompletableFuture getExecutionGraph(JobID jobId, RestfulGateway restfulGateway) { + + Preconditions.checkState(running, "ExecutionGraphCache is no longer running"); + + while (true) { + final ExecutionGraphEntry oldEntry = cachedExecutionGraphs.get(jobId); + + final long currentTime = System.currentTimeMillis(); + + if (oldEntry != null) { + if (currentTime < oldEntry.getTTL()) { + if (oldEntry.getExecutionGraphFuture().isDone() && !oldEntry.getExecutionGraphFuture().isCompletedExceptionally()) { + + // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph + try { + if (oldEntry.getExecutionGraphFuture().get().getState() != JobStatus.SUSPENDED) { + return oldEntry.getExecutionGraphFuture(); + } + // send a new request to get the ExecutionGraph from the new leader + } catch (InterruptedException | ExecutionException e) { + throw new RuntimeException("Could not retrieve ExecutionGraph from the orderly completed future. This should never happen.", e); + } + } else if (!oldEntry.getExecutionGraphFuture().isDone()) { + return oldEntry.getExecutionGraphFuture(); + } + // otherwise it must be completed exceptionally + } + } + + final ExecutionGraphEntry newEntry = new ExecutionGraphEntry(currentTime + timeToLive.toMilliseconds()); + + final boolean successfulUpdate; + + if (oldEntry == null) { + successfulUpdate = cachedExecutionGraphs.putIfAbsent(jobId, newEntry) == null; + } else { + successfulUpdate = cachedExecutionGraphs.replace(jobId, oldEntry, newEntry); + // cancel potentially outstanding futures + oldEntry.getExecutionGraphFuture().cancel(false); + } + + if (successfulUpdate) { + final CompletableFuture executionGraphFuture = restfulGateway.requestJob(jobId, timeout); + + executionGraphFuture.whenComplete( + (AccessExecutionGraph executionGraph, Throwable throwable) -> { + if (throwable != null) { + newEntry.getExecutionGraphFuture().completeExceptionally(throwable); + + // remove exceptionally completed entry because it doesn't help + cachedExecutionGraphs.remove(jobId, newEntry); + } else { + newEntry.getExecutionGraphFuture().complete(executionGraph); + + // TODO: Remove once we no longer request the actual ExecutionGraph from the JobManager but only the ArchivedExecutionGraph + if (executionGraph.getState() == JobStatus.SUSPENDED) { + // remove the entry in case of suspension --> triggers new request when accessed next time + cachedExecutionGraphs.remove(jobId, newEntry); + } + } + }); + + if (!running) { + // delete newly added entry in case of a concurrent stopping operation + cachedExecutionGraphs.remove(jobId, newEntry); + } + + return newEntry.getExecutionGraphFuture(); + } + } + } + + /** + * Perform the cleanup of out dated {@link ExecutionGraphEntry}. + */ + public void cleanup() { + long currentTime = System.currentTimeMillis(); + + // remove entries which have exceeded their time to live + cachedExecutionGraphs.values().removeIf( + (ExecutionGraphEntry entry) -> currentTime >= entry.getTTL()); + } + + /** + * Wrapper containing the current execution graph and it's time to live (TTL). + */ + private static final class ExecutionGraphEntry { + private final long ttl; + + private final CompletableFuture executionGraphFuture; + + ExecutionGraphEntry(long ttl) { + this.ttl = ttl; + this.executionGraphFuture = new CompletableFuture<>(); + } + + public long getTTL() { + return ttl; + } + + public CompletableFuture getExecutionGraphFuture() { + return executionGraphFuture; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java deleted file mode 100644 index 8a47e509b19be..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphHolder.java +++ /dev/null @@ -1,82 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.rest.handler.legacy; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.executiongraph.ExecutionGraph; -import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.jobmaster.JobManagerGateway; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Optional; -import java.util.WeakHashMap; -import java.util.concurrent.CompletableFuture; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** - * Gateway to obtaining an {@link ExecutionGraph} from a source, like JobManager or Archive. - * - *

The holder will cache the ExecutionGraph behind a weak reference, which will be cleared - * at some point once no one else is pointing to the ExecutionGraph. - * Note that while the holder runs in the same JVM as the JobManager or Archive, the reference should - * stay valid. - */ -public class ExecutionGraphHolder { - - private static final Logger LOG = LoggerFactory.getLogger(ExecutionGraphHolder.class); - - private final Time timeout; - - private final WeakHashMap cache = new WeakHashMap<>(); - - public ExecutionGraphHolder(Time timeout) { - this.timeout = checkNotNull(timeout); - } - - /** - * Retrieves the execution graph with {@link JobID} jid wrapped in {@link Optional} or - * {@link Optional#empty()} if it cannot be found. - * - * @param jid jobID of the execution graph to be retrieved - * @return Optional ExecutionGraph if it has been retrievable, empty if there has been no ExecutionGraph - */ - public CompletableFuture> getExecutionGraph(JobID jid, JobManagerGateway jobManagerGateway) { - AccessExecutionGraph cached = cache.get(jid); - if (cached != null) { - if (cached.getState() == JobStatus.SUSPENDED) { - cache.remove(jid); - } else { - return CompletableFuture.completedFuture(Optional.of(cached)); - } - } - - CompletableFuture> executionGraphFuture = jobManagerGateway.requestJob(jid, timeout); - - executionGraphFuture.thenAcceptAsync( - optExecutionGraph -> - optExecutionGraph.ifPresent(executionGraph -> cache.put(jid, executionGraph))); - - return executionGraphFuture; - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java index 68810eb7e738a..b2a2488806782 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandler.java @@ -42,7 +42,7 @@ public class JobAccumulatorsHandler extends AbstractExecutionGraphRequestHandler private static final String JOB_ACCUMULATORS_REST_PATH = "/jobs/:jobid/accumulators"; - public JobAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public JobAccumulatorsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java index 2750c338739d8..d31af4cb4218e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlers.java @@ -48,7 +48,6 @@ import java.util.ArrayDeque; import java.util.HashMap; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.Executor; @@ -91,13 +90,13 @@ public class JobCancellationWithSavepointHandlers { private final String defaultSavepointDirectory; public JobCancellationWithSavepointHandlers( - ExecutionGraphHolder currentGraphs, + ExecutionGraphCache currentGraphs, Executor executor) { this(currentGraphs, executor, null); } public JobCancellationWithSavepointHandlers( - ExecutionGraphHolder currentGraphs, + ExecutionGraphCache currentGraphs, Executor executor, @Nullable String defaultSavepointDirectory) { @@ -124,12 +123,12 @@ public InProgressHandler getInProgressHandler() { class TriggerHandler implements RequestHandler { /** Current execution graphs. */ - private final ExecutionGraphHolder currentGraphs; + private final ExecutionGraphCache currentGraphs; /** Execution context for futures. */ private final Executor executor; - public TriggerHandler(ExecutionGraphHolder currentGraphs, Executor executor) { + public TriggerHandler(ExecutionGraphCache currentGraphs, Executor executor) { this.currentGraphs = checkNotNull(currentGraphs); this.executor = checkNotNull(executor); } @@ -148,39 +147,40 @@ public CompletableFuture handleRequest( if (jobManagerGateway != null) { JobID jobId = JobID.fromHexString(pathParams.get("jobid")); - final CompletableFuture> graphFuture; + final CompletableFuture graphFuture; graphFuture = currentGraphs.getExecutionGraph(jobId, jobManagerGateway); - return graphFuture.thenApplyAsync( - (Optional optGraph) -> { - final AccessExecutionGraph graph = optGraph.orElseThrow( - () -> new CompletionException( - new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.'))); - - CheckpointCoordinatorConfiguration jobCheckpointingConfiguration = graph.getCheckpointCoordinatorConfiguration(); - if (jobCheckpointingConfiguration == null) { - throw new CompletionException(new FlinkException("Cannot find checkpoint coordinator configuration for job.")); - } + return graphFuture.handleAsync( + (AccessExecutionGraph graph, Throwable throwable) -> { + if (throwable != null) { + throw new CompletionException(new NotFoundException("Could not find ExecutionGraph with jobId " + jobId + '.')); + } else { + CheckpointCoordinatorConfiguration jobCheckpointingConfiguration = graph.getCheckpointCoordinatorConfiguration(); + if (jobCheckpointingConfiguration == null) { + throw new CompletionException(new FlinkException("Cannot find checkpoint coordinator configuration for job.")); + } - String targetDirectory = pathParams.get("targetDirectory"); - if (targetDirectory == null) { - if (defaultSavepointDirectory == null) { - throw new IllegalStateException("No savepoint directory configured. " + - "You can either specify a directory when triggering this savepoint or " + - "configure a cluster-wide default via key '" + - CoreOptions.SAVEPOINT_DIRECTORY.key() + "'."); - } else { - targetDirectory = defaultSavepointDirectory; + String targetDirectory = pathParams.get("targetDirectory"); + if (targetDirectory == null) { + if (defaultSavepointDirectory == null) { + throw new IllegalStateException("No savepoint directory configured. " + + "You can either specify a directory when triggering this savepoint or " + + "configure a cluster-wide default via key '" + + CoreOptions.SAVEPOINT_DIRECTORY.key() + "'."); + } else { + targetDirectory = defaultSavepointDirectory; + } } - } - try { - return handleNewRequest(jobManagerGateway, jobId, targetDirectory, jobCheckpointingConfiguration.getCheckpointTimeout()); - } catch (IOException e) { - throw new CompletionException(new FlinkException("Could not cancel job with savepoint.", e)); + try { + return handleNewRequest(jobManagerGateway, jobId, targetDirectory, jobCheckpointingConfiguration.getCheckpointTimeout()); + } catch (IOException e) { + throw new CompletionException(new FlinkException("Could not cancel job with savepoint.", e)); + } } - }, executor); + }, + executor); } else { return FutureUtils.completedExceptionally(new Exception("No connection to the leading JobManager.")); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java index 787217f5d4c4c..2d404961324bc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandler.java @@ -42,8 +42,8 @@ public class JobConfigHandler extends AbstractExecutionGraphRequestHandler { private static final String JOB_CONFIG_REST_PATH = "/jobs/:jobid/config"; - public JobConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { - super(executionGraphHolder, executor); + public JobConfigHandler(ExecutionGraphCache executionGraphCache, Executor executor) { + super(executionGraphCache, executor); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java index b9f812b715a0e..31b14781a4c0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandler.java @@ -60,7 +60,7 @@ public class JobDetailsHandler extends AbstractExecutionGraphRequestHandler { private final MetricFetcher fetcher; - public JobDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + public JobDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) { super(executionGraphHolder, executor); this.fetcher = fetcher; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java index 566631e4a1b19..62ee85c2ff11b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandler.java @@ -48,7 +48,7 @@ public class JobExceptionsHandler extends AbstractExecutionGraphRequestHandler { static final int MAX_NUMBER_EXCEPTION_TO_REPORT = 20; - public JobExceptionsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public JobExceptionsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java index d9db1ff5480e1..ed8c702d8f9b6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandler.java @@ -36,7 +36,7 @@ public class JobPlanHandler extends AbstractExecutionGraphRequestHandler { private static final String JOB_PLAN_REST_PATH = "/jobs/:jobid/plan"; - public JobPlanHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public JobPlanHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java index d448027316c0b..90b1f8c048154 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandler.java @@ -44,7 +44,7 @@ public class JobVertexAccumulatorsHandler extends AbstractJobVertexRequestHandle private static final String JOB_VERTEX_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/accumulators"; - public JobVertexAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public JobVertexAccumulatorsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java index 59bfc0ba48005..fb79f46073097 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandler.java @@ -52,7 +52,7 @@ public class JobVertexBackPressureHandler extends AbstractJobVertexRequestHandle private final int refreshInterval; public JobVertexBackPressureHandler( - ExecutionGraphHolder executionGraphHolder, + ExecutionGraphCache executionGraphHolder, Executor executor, BackPressureStatsTracker backPressureStatsTracker, int refreshInterval) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java index bfa7020ef578d..2ef5faac2b7c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandler.java @@ -53,7 +53,7 @@ public class JobVertexDetailsHandler extends AbstractJobVertexRequestHandler { private final MetricFetcher fetcher; - public JobVertexDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + public JobVertexDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) { super(executionGraphHolder, executor); this.fetcher = fetcher; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java index 985ea1ea013fd..d2d5985b335c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandler.java @@ -55,7 +55,7 @@ public class JobVertexTaskManagersHandler extends AbstractJobVertexRequestHandle private final MetricFetcher fetcher; - public JobVertexTaskManagersHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + public JobVertexTaskManagersHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) { super(executionGraphHolder, executor); this.fetcher = fetcher; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java index ff4fb465df881..2abdeaf3c3e9c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandler.java @@ -32,7 +32,7 @@ public class SubtaskCurrentAttemptDetailsHandler extends SubtaskExecutionAttempt public static final String SUBTASK_CURRENT_ATTEMPT_DETAILS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum"; - public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + public SubtaskCurrentAttemptDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) { super(executionGraphHolder, executor, fetcher); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java index 1570896611e0a..3749776349ef6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandler.java @@ -47,7 +47,7 @@ public class SubtaskExecutionAttemptAccumulatorsHandler extends AbstractSubtaskA private static final String SUBTASK_ATTEMPT_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators"; - public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public SubtaskExecutionAttemptAccumulatorsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java index b0b22eef8621a..5aa83122b8531 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandler.java @@ -55,7 +55,7 @@ public class SubtaskExecutionAttemptDetailsHandler extends AbstractSubtaskAttemp private final MetricFetcher fetcher; - public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, MetricFetcher fetcher) { + public SubtaskExecutionAttemptDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, MetricFetcher fetcher) { super(executionGraphHolder, executor); this.fetcher = fetcher; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java index 10d9e02cc111a..d1b607ab0b1f8 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandler.java @@ -46,7 +46,7 @@ public class SubtasksAllAccumulatorsHandler extends AbstractJobVertexRequestHand private static final String SUBTASKS_ALL_ACCUMULATORS_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasks/accumulators"; - public SubtasksAllAccumulatorsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public SubtasksAllAccumulatorsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java index bf1d87e6a0df4..a968ab6f2ed78 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandler.java @@ -47,7 +47,7 @@ public class SubtasksTimesHandler extends AbstractJobVertexRequestHandler { private static final String SUBTASK_TIMES_REST_PATH = "/jobs/:jobid/vertices/:vertexid/subtasktimes"; - public SubtasksTimesHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public SubtasksTimesHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java index 6ab6676301d5c..69a59f5256322 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandler.java @@ -22,7 +22,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -46,7 +46,7 @@ public class CheckpointConfigHandler extends AbstractExecutionGraphRequestHandle private static final String CHECKPOINT_CONFIG_REST_PATH = "/jobs/:jobid/checkpoints/config"; - public CheckpointConfigHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public CheckpointConfigHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java index b61c5d0c7575c..e27797189c09c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandler.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.checkpoint.TaskStateStats; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -54,7 +54,7 @@ public class CheckpointStatsDetailsHandler extends AbstractExecutionGraphRequest private final CheckpointStatsCache cache; - public CheckpointStatsDetailsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) { + public CheckpointStatsDetailsHandler(ExecutionGraphCache executionGraphHolder, Executor executor, CheckpointStatsCache cache) { super(executionGraphHolder, executor); this.cache = cache; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java index 22a8db2ade986..5420cf4c22d3a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsSubtasksHandler.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.jobmaster.JobManagerGateway; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; import org.apache.flink.runtime.rest.handler.legacy.AbstractJobVertexRequestHandler; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -59,7 +59,7 @@ public class CheckpointStatsDetailsSubtasksHandler extends AbstractExecutionGrap private final CheckpointStatsCache cache; - public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphHolder executionGraphHolder, Executor executor, CheckpointStatsCache cache) { + public CheckpointStatsDetailsSubtasksHandler(ExecutionGraphCache executionGraphHolder, Executor executor, CheckpointStatsCache cache) { super(executionGraphHolder, executor); this.cache = checkNotNull(cache); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java index bea94f2c7d152..bbfcd8a962c5c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandler.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.rest.handler.legacy.AbstractExecutionGraphRequestHandler; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.rest.handler.legacy.JsonFactory; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -55,7 +55,7 @@ public class CheckpointStatsHandler extends AbstractExecutionGraphRequestHandler private static final String CHECKPOINT_STATS_REST_PATH = "/jobs/:jobid/checkpoints"; - public CheckpointStatsHandler(ExecutionGraphHolder executionGraphHolder, Executor executor) { + public CheckpointStatsHandler(ExecutionGraphCache executionGraphHolder, Executor executor) { super(executionGraphHolder, executor); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java index a5d52e5ff2ebc..d5d194daf883f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java @@ -18,9 +18,15 @@ package org.apache.flink.runtime.webmonitor; +import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.runtime.messages.webmonitor.MultipleJobsDetails; +import org.apache.flink.runtime.messages.webmonitor.StatusOverview; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.RpcTimeout; import java.util.concurrent.CompletableFuture; @@ -38,5 +44,36 @@ public interface RestfulGateway extends RpcGateway { * @param timeout for this operation * @return Future REST endpoint address */ - CompletableFuture requestRestAddress(Time timeout); + CompletableFuture requestRestAddress(@RpcTimeout Time timeout); + + /** + * Requests the AccessExecutionGraph for the given jobId. If there is no such graph, then + * the future is completed with a {@link FlinkJobNotFoundException}. + * + * @param jobId identifying the job whose AccessExecutionGraph is requested + * @param timeout for the asynchronous operation + * @return Future containing the AccessExecutionGraph for the given jobId, otherwise {@link FlinkJobNotFoundException} + */ + CompletableFuture requestJob(JobID jobId, @RpcTimeout Time timeout); + + /** + * Requests job details currently being executed on the Flink cluster. + * + * @param includeRunning true if running jobs shall be included, otherwise false + * @param includeFinished true if finished jobs shall be included, otherwise false + * @param timeout for the asynchronous operation + * @return Future containing the job details + */ + CompletableFuture requestJobDetails( + boolean includeRunning, + boolean includeFinished, + @RpcTimeout Time timeout); + + /** + * Requests the cluster status overview. + * + * @param timeout for the asynchronous operation + * @return Future containing the status overview + */ + CompletableFuture requestStatusOverview(@RpcTimeout Time timeout); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java index 0accab755763a..1730bc8cd5268 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/WebMonitorUtils.java @@ -23,6 +23,7 @@ import org.apache.flink.configuration.WebOptions; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.blob.BlobView; +import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.executiongraph.AccessExecutionJobVertex; @@ -55,7 +56,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; /** * Utilities for the web runtime monitor. This class contains for example methods to build @@ -131,7 +131,7 @@ private static File resolveFileLocation(String logFilePath) { * @param jobManagerRetriever which retrieves the currently leading JobManager * @param queryServiceRetriever which retrieves the query service * @param timeout for asynchronous operations - * @param executor to run asynchronous operations + * @param scheduledExecutor to run asynchronous operations */ public static WebMonitor startWebRuntimeMonitor( Configuration config, @@ -139,7 +139,7 @@ public static WebMonitor startWebRuntimeMonitor( LeaderGatewayRetriever jobManagerRetriever, MetricQueryServiceRetriever queryServiceRetriever, Time timeout, - Executor executor) { + ScheduledExecutor scheduledExecutor) { // try to load and instantiate the class try { String classname = "org.apache.flink.runtime.webmonitor.WebRuntimeMonitor"; @@ -152,7 +152,7 @@ public static WebMonitor startWebRuntimeMonitor( LeaderGatewayRetriever.class, MetricQueryServiceRetriever.class, Time.class, - Executor.class); + ScheduledExecutor.class); return constructor.newInstance( config, highAvailabilityServices.getJobManagerLeaderRetriever(HighAvailabilityServices.DEFAULT_JOB_ID), @@ -160,7 +160,7 @@ public static WebMonitor startWebRuntimeMonitor( jobManagerRetriever, queryServiceRetriever, timeout, - executor); + scheduledExecutor); } catch (ClassNotFoundException e) { LOG.error("Could not load web runtime monitor. " + "Probably reason: flink-runtime-web is not in the classpath"); diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 0445f19c29450..1c0d223b14670 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -46,7 +46,7 @@ import org.apache.flink.runtime.clusterframework.FlinkResourceManager import org.apache.flink.runtime.clusterframework.messages._ import org.apache.flink.runtime.clusterframework.standalone.StandaloneResourceManager import org.apache.flink.runtime.clusterframework.types.ResourceID -import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors} +import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.SuppressRestartsException import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders.ResolveOrder import org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, LibraryCacheManager} @@ -2232,7 +2232,7 @@ object JobManager { new AkkaJobManagerRetriever(jobManagerSystem, timeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(jobManagerSystem, timeout), timeout, - futureExecutor) + new ScheduledExecutorServiceAdapter(futureExecutor)) Option(webServer) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala index cedf60776dd9c..5692863785313 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/minicluster/FlinkMiniCluster.scala @@ -32,7 +32,7 @@ import org.apache.flink.configuration._ import org.apache.flink.core.fs.Path import org.apache.flink.runtime.akka.{AkkaJobManagerGateway, AkkaUtils} import org.apache.flink.runtime.client.{JobClient, JobExecutionException} -import org.apache.flink.runtime.concurrent.{FutureUtils, Executors => FlinkExecutors} +import org.apache.flink.runtime.concurrent.{FutureUtils, ScheduledExecutorServiceAdapter, Executors => FlinkExecutors} import org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders import org.apache.flink.runtime.highavailability.{HighAvailabilityServices, HighAvailabilityServicesUtils} import org.apache.flink.runtime.instance.{ActorGateway, AkkaActorGateway} @@ -406,7 +406,7 @@ abstract class FlinkMiniCluster( new AkkaJobManagerRetriever(actorSystem, flinkTimeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, flinkTimeout), flinkTimeout, - actorSystem.dispatcher) + new ScheduledExecutorServiceAdapter(futureExecutor)) ) webServer.foreach(_.start()) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java new file mode 100644 index 0000000000000..1a8ea84de2b6e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/ExecutionGraphCacheTest.java @@ -0,0 +1,353 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.rest.handler.legacy; + +import org.apache.flink.api.common.ArchivedExecutionConfig; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.concurrent.FutureUtils; +import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; +import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ErrorInfo; +import org.apache.flink.runtime.executiongraph.ExecutionGraph; +import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.jobmaster.JobManagerGateway; +import org.apache.flink.runtime.messages.FlinkJobNotFoundException; +import org.apache.flink.util.FlinkException; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for the {@link ExecutionGraphCache}. + */ +public class ExecutionGraphCacheTest extends TestLogger { + + /** + * Tests that we can cache AccessExecutionGraphs over multiple accesses. + */ + @Test + public void testExecutionGraphCaching() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.hours(1L); + final JobID jobId = new JobID(); + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + CompletableFuture accessExecutionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, accessExecutionGraphFuture.get()); + + CompletableFuture accessExecutionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, accessExecutionGraphFuture2.get()); + + // verify that we only issued a single request to the gateway + verify(jobManagerGateway, times(1)).requestJob(eq(jobId), any(Time.class)); + } + } + + /** + * Tests that an AccessExecutionGraph is invalidated after its TTL expired. + */ + @Test + public void testExecutionGraphEntryInvalidation() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.milliseconds(1L); + final JobID jobId = new JobID(); + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, executionGraphFuture.get()); + + // sleep for the TTL + Thread.sleep(timeToLive.toMilliseconds()); + + CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, executionGraphFuture2.get()); + + verify(jobManagerGateway, times(2)).requestJob(eq(jobId), any(Time.class)); + } + } + + + /** + * Tests that a failure in requesting an AccessExecutionGraph from the gateway, will not create + * a cache entry --> another cache request will trigger a new gateway request. + */ + @Test + public void testImmediateCacheInvalidationAfterFailure() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.hours(1L); + final JobID jobId = new JobID(); + + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + // let's first answer with a JobNotFoundException and then only with the correct result + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn( + FutureUtils.completedExceptionally(new FlinkJobNotFoundException(jobId)), + CompletableFuture.completedFuture(accessExecutionGraph)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + try { + executionGraphFuture.get(); + + fail("The execution graph future should have been completed exceptionally."); + } catch (ExecutionException ee) { + assertTrue(ee.getCause() instanceof FlinkException); + } + + CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, executionGraphFuture2.get()); + } + } + + /** + * Tests that cache entries are cleaned up when their TTL has expired upon + * calling {@link ExecutionGraphCache#cleanup()}. + */ + @Test + public void testCacheEntryCleanup() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.milliseconds(1L); + final JobID jobId1 = new JobID(); + final JobID jobId2 = new JobID(); + final AccessExecutionGraph accessExecutionGraph1 = mock(AccessExecutionGraph.class); + final AccessExecutionGraph accessExecutionGraph2 = mock(AccessExecutionGraph.class); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestJob(eq(jobId1), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph1)); + when(jobManagerGateway.requestJob(eq(jobId2), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph2)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + + CompletableFuture executionGraph1Future = executionGraphCache.getExecutionGraph(jobId1, jobManagerGateway); + + CompletableFuture executionGraph2Future = executionGraphCache.getExecutionGraph(jobId2, jobManagerGateway); + + assertEquals(accessExecutionGraph1, executionGraph1Future.get()); + + assertEquals(accessExecutionGraph2, executionGraph2Future.get()); + + verify(jobManagerGateway, times(1)).requestJob(eq(jobId1), any(Time.class)); + verify(jobManagerGateway, times(1)).requestJob(eq(jobId2), any(Time.class)); + + Thread.sleep(timeToLive.toMilliseconds()); + + executionGraphCache.cleanup(); + + assertTrue(executionGraphCache.size() == 0); + } + } + + /** + * Tests that concurrent accesses only trigger a single AccessExecutionGraph request. + */ + @Test + public void testConcurrentAccess() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.hours(1L); + final JobID jobId = new JobID(); + + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn(CompletableFuture.completedFuture(accessExecutionGraph)); + + final int numConcurrentAccesses = 10; + + final ArrayList> executionGraphFutures = new ArrayList<>(numConcurrentAccesses); + + final ExecutorService executor = java.util.concurrent.Executors.newFixedThreadPool(numConcurrentAccesses); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + for (int i = 0; i < numConcurrentAccesses; i++) { + CompletableFuture executionGraphFuture = CompletableFuture + .supplyAsync( + () -> executionGraphCache.getExecutionGraph(jobId, jobManagerGateway), + executor) + .thenCompose(Function.identity()); + + executionGraphFutures.add(executionGraphFuture); + } + + final CompletableFuture> allExecutionGraphFutures = FutureUtils.combineAll(executionGraphFutures); + + Collection allExecutionGraphs = allExecutionGraphFutures.get(); + + for (AccessExecutionGraph executionGraph : allExecutionGraphs) { + assertEquals(accessExecutionGraph, executionGraph); + } + + verify(jobManagerGateway, times(1)).requestJob(eq(jobId), any(Time.class)); + } finally { + Executors.gracefulShutdown(5000L, TimeUnit.MILLISECONDS, executor); + } + } + + /** + * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph} is in + * state {@link JobStatus#SUSPENDED}. + * + *

This test can be removed once we no longer request the actual {@link ExecutionGraph} from the + * {@link JobManager}. + */ + @Test + public void testCacheInvalidationIfSuspended() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.hours(1L); + final JobID jobId = new JobID(); + + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final AccessExecutionGraph suspendedExecutionGraph = mock(AccessExecutionGraph.class); + when(suspendedExecutionGraph.getState()).thenReturn(JobStatus.SUSPENDED); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + // let's first answer with a suspended ExecutionGraph and then only with the correct result + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn( + CompletableFuture.completedFuture(suspendedExecutionGraph), + CompletableFuture.completedFuture(accessExecutionGraph)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(suspendedExecutionGraph, executionGraphFuture.get()); + + CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, executionGraphFuture2.get()); + } + } + + /** + * Tests that a cache entry is invalidated if the retrieved {@link AccessExecutionGraph} changes its + * state to {@link JobStatus#SUSPENDED}. + * + *

This test can be removed once we no longer request the actual {@link ExecutionGraph} from the + * {@link JobManager}. + */ + @Test + public void testCacheInvalidationIfSwitchToSuspended() throws Exception { + final Time timeout = Time.milliseconds(100L); + final Time timeToLive = Time.hours(1L); + final JobID jobId = new JobID(); + + final AccessExecutionGraph accessExecutionGraph = mock(AccessExecutionGraph.class); + + final SuspendableAccessExecutionGraph toBeSuspendedExecutionGraph = new SuspendableAccessExecutionGraph(jobId); + + final JobManagerGateway jobManagerGateway = mock(JobManagerGateway.class); + // let's first answer with a JobNotFoundException and then only with the correct result + when(jobManagerGateway.requestJob(eq(jobId), any(Time.class))).thenReturn( + CompletableFuture.completedFuture(toBeSuspendedExecutionGraph), + CompletableFuture.completedFuture(accessExecutionGraph)); + + try (ExecutionGraphCache executionGraphCache = new ExecutionGraphCache(timeout, timeToLive)) { + CompletableFuture executionGraphFuture = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(toBeSuspendedExecutionGraph, executionGraphFuture.get()); + + toBeSuspendedExecutionGraph.setJobStatus(JobStatus.SUSPENDED); + + // retrieve the same job from the cache again --> this should return it and invalidate the cache entry + CompletableFuture executionGraphFuture2 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, executionGraphFuture2.get()); + + CompletableFuture executionGraphFuture3 = executionGraphCache.getExecutionGraph(jobId, jobManagerGateway); + + assertEquals(accessExecutionGraph, executionGraphFuture3.get()); + + verify(jobManagerGateway, times(2)).requestJob(eq(jobId), any(Time.class)); + } + } + + private static final class SuspendableAccessExecutionGraph extends ArchivedExecutionGraph { + + private static final long serialVersionUID = -6796543726305778101L; + + private JobStatus jobStatus; + + public SuspendableAccessExecutionGraph(JobID jobId) { + super( + jobId, + "ExecutionGraphCacheTest", + Collections.emptyMap(), + Collections.emptyList(), + new long[0], + JobStatus.RUNNING, + new ErrorInfo(new FlinkException("Test"), 42L), + "", + new StringifiedAccumulatorResult[0], + Collections.emptyMap(), + new ArchivedExecutionConfig(new ExecutionConfig()), + false, + null, + null); + + jobStatus = super.getState(); + } + + @Override + public JobStatus getState() { + return jobStatus; + } + + public void setJobStatus(JobStatus jobStatus) { + this.jobStatus = jobStatus; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java index e1736c113fbe8..0a502e4d2aafd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobAccumulatorsHandlerTest.java @@ -54,7 +54,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + JobAccumulatorsHandler handler = new JobAccumulatorsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/accumulators", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java index 8bb11411ad36e..18a52f5958663 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobCancellationWithSavepointHandlersTest.java @@ -44,7 +44,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -66,7 +65,7 @@ public class JobCancellationWithSavepointHandlersTest extends TestLogger { @Test public void testGetPaths() { - JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphHolder.class), executor); + JobCancellationWithSavepointHandlers handler = new JobCancellationWithSavepointHandlers(mock(ExecutionGraphCache.class), executor); JobCancellationWithSavepointHandlers.TriggerHandler triggerHandler = handler.getTriggerHandler(); String[] triggerPaths = triggerHandler.getPaths(); @@ -89,9 +88,9 @@ public void testGetPaths() { public void testAskTimeoutEqualsCheckpointTimeout() throws Exception { long timeout = 128288238L; JobID jobId = new JobID(); - ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraphCache holder = mock(ExecutionGraphCache.class); ExecutionGraph graph = mock(ExecutionGraph.class); - when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(graph)); when(graph.getCheckpointCoordinatorConfiguration()).thenReturn( new CheckpointCoordinatorConfiguration( 1L, @@ -123,9 +122,9 @@ public void testAskTimeoutEqualsCheckpointTimeout() throws Exception { public void testSavepointDirectoryConfiguration() throws Exception { long timeout = 128288238L; JobID jobId = new JobID(); - ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraphCache holder = mock(ExecutionGraphCache.class); ExecutionGraph graph = mock(ExecutionGraph.class); - when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(graph)); when(graph.getCheckpointCoordinatorConfiguration()).thenReturn( new CheckpointCoordinatorConfiguration( 1L, @@ -176,9 +175,9 @@ public void testSavepointDirectoryConfiguration() throws Exception { @Test public void testTriggerNewRequest() throws Exception { JobID jobId = new JobID(); - ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraphCache holder = mock(ExecutionGraphCache.class); ExecutionGraph graph = mock(ExecutionGraph.class); - when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(graph)); when(graph.getCheckpointCoordinatorConfiguration()).thenReturn( new CheckpointCoordinatorConfiguration( 1L, @@ -308,9 +307,9 @@ public void testTriggerNewRequest() throws Exception { @Test public void testFailedCancellation() throws Exception { JobID jobId = new JobID(); - ExecutionGraphHolder holder = mock(ExecutionGraphHolder.class); + ExecutionGraphCache holder = mock(ExecutionGraphCache.class); ExecutionGraph graph = mock(ExecutionGraph.class); - when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(Optional.of(graph))); + when(holder.getExecutionGraph(eq(jobId), any(JobManagerGateway.class))).thenReturn(CompletableFuture.completedFuture(graph)); when(graph.getCheckpointCoordinatorConfiguration()).thenReturn( new CheckpointCoordinatorConfiguration( 1L, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java index 567df8cd6e66b..efe9dc8191b6c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobConfigHandlerTest.java @@ -55,7 +55,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + JobConfigHandler handler = new JobConfigHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/config", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java index afd743e4c779a..028bf0dd7965f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobDetailsHandlerTest.java @@ -68,7 +68,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + JobDetailsHandler handler = new JobDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), null); String[] paths = handler.getPaths(); Assert.assertEquals(2, paths.length); List pathsList = Lists.newArrayList(paths); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java index 6a20696ff7224..d20a156bc8bc4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobExceptionsHandlerTest.java @@ -58,7 +58,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + JobExceptionsHandler handler = new JobExceptionsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/exceptions", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java index 6d3b213768a3e..29e0819c61dea 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobPlanHandlerTest.java @@ -51,7 +51,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + JobPlanHandler handler = new JobPlanHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/plan", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java index feffe60b67ea2..d04345d0b915c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexAccumulatorsHandlerTest.java @@ -57,7 +57,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + JobVertexAccumulatorsHandler handler = new JobVertexAccumulatorsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/accumulators", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java index bd6817f84fcd6..e02ca35d4fa70 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexBackPressureHandlerTest.java @@ -45,7 +45,7 @@ public class JobVertexBackPressureHandlerTest { @Test public void testGetPaths() { - JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0); + JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), mock(BackPressureStatsTracker.class), 0); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/backpressure", paths[0]); @@ -61,7 +61,7 @@ public void testResponseNoStatsAvailable() throws Exception { .thenReturn(Optional.empty()); JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( - mock(ExecutionGraphHolder.class), + mock(ExecutionGraphCache.class), Executors.directExecutor(), statsTracker, 9999); @@ -95,7 +95,7 @@ public void testResponseStatsAvailable() throws Exception { .thenReturn(Optional.of(stats)); JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( - mock(ExecutionGraphHolder.class), + mock(ExecutionGraphCache.class), Executors.directExecutor(), statsTracker, 9999); @@ -157,7 +157,7 @@ public void testResponsePassedRefreshInterval() throws Exception { .thenReturn(Optional.of(stats)); JobVertexBackPressureHandler handler = new JobVertexBackPressureHandler( - mock(ExecutionGraphHolder.class), + mock(ExecutionGraphCache.class), Executors.directExecutor(), statsTracker, 0); // <----- refresh interval should fire immediately diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java index 5af1d53fd35a0..c2d260333f54b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexDetailsHandlerTest.java @@ -59,7 +59,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + JobVertexDetailsHandler handler = new JobVertexDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java index 2a027fd4a4a25..e3f3382c278af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/JobVertexTaskManagersHandlerTest.java @@ -62,7 +62,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + JobVertexTaskManagersHandler handler = new JobVertexTaskManagersHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/taskmanagers", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java index 9e0d54973e219..8082e3a1ae97c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskCurrentAttemptDetailsHandlerTest.java @@ -32,7 +32,7 @@ public class SubtaskCurrentAttemptDetailsHandlerTest extends TestLogger { @Test public void testGetPaths() { - SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + SubtaskCurrentAttemptDetailsHandler handler = new SubtaskCurrentAttemptDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java index 49e54c0d5cf58..894d6596c4eb0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptAccumulatorsHandlerTest.java @@ -65,7 +65,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + SubtaskExecutionAttemptAccumulatorsHandler handler = new SubtaskExecutionAttemptAccumulatorsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt/accumulators", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java index e1fe8b5d9050c..32ba09c1d84a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtaskExecutionAttemptDetailsHandlerTest.java @@ -74,7 +74,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), null); + SubtaskExecutionAttemptDetailsHandler handler = new SubtaskExecutionAttemptDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), null); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/:subtasknum/attempts/:attempt", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java index 1478f00343caf..e55776d7a0132 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksAllAccumulatorsHandlerTest.java @@ -59,7 +59,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + SubtasksAllAccumulatorsHandler handler = new SubtasksAllAccumulatorsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasks/accumulators", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java index 3783b84b8c793..9f4a7dabeea82 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/SubtasksTimesHandlerTest.java @@ -60,7 +60,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + SubtasksTimesHandler handler = new SubtasksTimesHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/vertices/:vertexid/subtasktimes", paths[0]); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java index 7050fa66e8c88..0e12e0fdbc931 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointConfigHandlerTest.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.ExternalizedCheckpointSettings; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -79,7 +79,7 @@ public void testArchiver() throws IOException { @Test public void testGetPaths() { - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/checkpoints/config", paths[0]); @@ -95,7 +95,7 @@ public void testSimpleConfig() throws Exception { AccessExecutionGraph graph = graphAndSettings.graph; CheckpointCoordinatorConfiguration chkConfig = graphAndSettings.jobCheckpointingConfiguration; - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String json = handler.handleRequest(graph, Collections.emptyMap()).get(); ObjectMapper mapper = new ObjectMapper(); @@ -121,7 +121,7 @@ public void testAtLeastOnce() throws Exception { AccessExecutionGraph graph = graphAndSettings.graph; - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String json = handler.handleRequest(graph, Collections.emptyMap()).get(); ObjectMapper mapper = new ObjectMapper(); @@ -140,7 +140,7 @@ public void testEnabledExternalizedCheckpointSettings() throws Exception { AccessExecutionGraph graph = graphAndSettings.graph; ExternalizedCheckpointSettings externalizedSettings = graphAndSettings.externalizedSettings; - CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + CheckpointConfigHandler handler = new CheckpointConfigHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String json = handler.handleRequest(graph, Collections.emptyMap()).get(); ObjectMapper mapper = new ObjectMapper(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java index e614608850d14..1eac20b337e2f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsDetailsHandlerTest.java @@ -31,7 +31,7 @@ import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -102,7 +102,7 @@ public void testArchiver() throws IOException { @Test public void testGetPaths() { - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid", paths[0]); @@ -114,7 +114,7 @@ public void testGetPaths() { @Test public void testIllegalCheckpointId() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "illegal checkpoint"); String json = handler.handleRequest(graph, params).get(); @@ -128,7 +128,7 @@ public void testIllegalCheckpointId() throws Exception { @Test public void testNoCheckpointIdParam() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); String json = handler.handleRequest(graph, Collections.emptyMap()).get(); assertEquals("{}", json); @@ -148,7 +148,7 @@ public void testCheckpointNotFound() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "123"); String json = handler.handleRequest(graph, params).get(); @@ -319,7 +319,7 @@ private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throw AccessExecutionGraph graph = mock(AccessExecutionGraph.class); when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsHandler handler = new CheckpointStatsDetailsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "123"); String json = handler.handleRequest(graph, params).get(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java index 4be7840a536b3..4bb385c14298e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsHandlerTest.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.checkpoint.RestoredCheckpointStats; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -87,7 +87,7 @@ public void testArchiver() throws IOException { @Test public void testGetPaths() { - CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/checkpoints", paths[0]); @@ -100,7 +100,7 @@ public void testGetPaths() { public void testCheckpointStatsRequest() throws Exception { TestCheckpointStats testCheckpointStats = createTestCheckpointStats(); - CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor()); + CheckpointStatsHandler handler = new CheckpointStatsHandler(mock(ExecutionGraphCache.class), Executors.directExecutor()); String json = handler.handleRequest(testCheckpointStats.graph, Collections.emptyMap()).get(); ObjectMapper mapper = new ObjectMapper(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java index 4d9b3948de23b..b352baec98963 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/legacy/checkpoints/CheckpointStatsSubtaskDetailsHandlerTest.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.executiongraph.AccessExecutionGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphHolder; +import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache; import org.apache.flink.runtime.webmonitor.history.ArchivedJson; import org.apache.flink.runtime.webmonitor.history.JsonArchivist; @@ -101,7 +101,7 @@ public void testArchiver() throws Exception { @Test public void testGetPaths() { - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); String[] paths = handler.getPaths(); Assert.assertEquals(1, paths.length); Assert.assertEquals("/jobs/:jobid/checkpoints/details/:checkpointid/subtasks/:vertexid", paths[0]); @@ -150,7 +150,7 @@ public void testSubtaskRequestNoSummary() throws Exception { @Test public void testIllegalCheckpointId() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "illegal checkpoint"); String json = handler.handleRequest(graph, params).get(); @@ -164,7 +164,7 @@ public void testIllegalCheckpointId() throws Exception { @Test public void testNoCheckpointIdParam() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); String json = handler.handleRequest(graph, Collections.emptyMap()).get(); assertEquals("{}", json); @@ -184,7 +184,7 @@ public void testCheckpointNotFound() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "123"); params.put("vertexid", new JobVertexID().toString()); @@ -200,7 +200,7 @@ public void testCheckpointNotFound() throws Exception { @Test public void testIllegalJobVertexIdParam() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "1"); params.put("vertexid", "illegal vertex id"); @@ -215,7 +215,7 @@ public void testIllegalJobVertexIdParam() throws Exception { @Test public void testNoJobVertexIdParam() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "1"); String json = handler.handleRequest(graph, params).get(); @@ -239,7 +239,7 @@ public void testJobVertexNotFound() throws Exception { AccessExecutionGraph graph = mock(AccessExecutionGraph.class); when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "123"); params.put("vertexid", new JobVertexID().toString()); @@ -260,7 +260,7 @@ private static JsonNode triggerRequest(AbstractCheckpointStats checkpoint) throw AccessExecutionGraph graph = mock(AccessExecutionGraph.class); when(graph.getCheckpointStatsSnapshot()).thenReturn(snapshot); - CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphHolder.class), Executors.directExecutor(), new CheckpointStatsCache(0)); + CheckpointStatsDetailsSubtasksHandler handler = new CheckpointStatsDetailsSubtasksHandler(mock(ExecutionGraphCache.class), Executors.directExecutor(), new CheckpointStatsCache(0)); Map params = new HashMap<>(); params.put("checkpointid", "123"); params.put("vertexid", new JobVertexID().toString()); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java index 43641d274bfb2..c101b759d0676 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnApplicationMasterRunner.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.clusterframework.BootstrapTools; import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.jobmanager.JobManager; @@ -353,7 +354,7 @@ protected int runApplicationMaster(Configuration config) { new AkkaJobManagerRetriever(actorSystem, webMonitorTimeout, 10, Time.milliseconds(50L)), new AkkaQueryServiceRetriever(actorSystem, webMonitorTimeout), webMonitorTimeout, - futureExecutor, + new ScheduledExecutorServiceAdapter(futureExecutor), LOG); // 2: the JobManager