From 9cb049a9367d23c640c4525b32990ae84161b476 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Wed, 13 May 2015 17:06:47 +0200 Subject: [PATCH] implement a simple session management This pull request implements a rudimentary session management. Together with the backtracking #640, this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually. ExecutionGraphs are kept as long as - no timeout occurred or - the session has not been explicitly ended --- .../apache/flink/client/RemoteExecutor.java | 22 +++-- .../apache/flink/client/program/Client.java | 50 +++++++++- .../client/program/ContextEnvironment.java | 7 ++ .../flink/client/program/PackagedProgram.java | 6 +- .../RemoteExecutorHostnameResolutionTest.java | 5 +- .../apache/flink/api/common/PlanExecutor.java | 5 +- .../flink/api/java/CollectionEnvironment.java | 4 + .../flink/api/java/ExecutionEnvironment.java | 13 ++- .../flink/api/java/LocalEnvironment.java | 8 ++ .../flink/api/java/RemoteEnvironment.java | 41 +++++++- .../executiongraph/ExecutionGraph.java | 9 ++ .../flink/runtime/jobgraph/JobGraph.java | 46 ++++++++- .../flink/runtime/jobmanager/JobInfo.scala | 14 ++- .../flink/runtime/jobmanager/JobManager.scala | 28 +++++- .../runtime/messages/JobManagerMessages.scala | 8 ++ .../runtime/jobmanager/JobManagerITCase.scala | 97 ++++++++++++++++++- .../api/scala/ExecutionEnvironment.scala | 7 ++ .../flink/test/util/TestEnvironment.java | 4 + 18 files changed, 344 insertions(+), 30 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java index 373d70c800945..3ec62ed2862b8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java +++ b/flink-clients/src/main/java/org/apache/flink/client/RemoteExecutor.java @@ -24,6 +24,7 @@ import java.net.URISyntaxException; import java.util.Collections; import java.util.List; +import java.util.UUID; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobSubmissionResult; @@ -52,26 +53,28 @@ public class RemoteExecutor extends PlanExecutor { private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class); + private final UUID executionID; private final List jarFiles; private final InetSocketAddress address; - public RemoteExecutor(String hostname, int port) { - this(hostname, port, Collections.emptyList()); + public RemoteExecutor(UUID executionID, String hostname, int port) { + this(executionID, hostname, port, Collections.emptyList()); } - public RemoteExecutor(String hostname, int port, String jarFile) { - this(hostname, port, Collections.singletonList(jarFile)); + public RemoteExecutor(UUID executionID, String hostname, int port, String jarFile) { + this(executionID, hostname, port, Collections.singletonList(jarFile)); } - public RemoteExecutor(String hostport, String jarFile) { - this(getInetFromHostport(hostport), Collections.singletonList(jarFile)); + public RemoteExecutor(UUID executionID, String hostport, String jarFile) { + this(executionID, getInetFromHostport(hostport), Collections.singletonList(jarFile)); } - public RemoteExecutor(String hostname, int port, List jarFiles) { - this(new InetSocketAddress(hostname, port), jarFiles); + public RemoteExecutor(UUID executionID, String hostname, int port, List jarFiles) { + this(executionID, new InetSocketAddress(hostname, port), jarFiles); } - public RemoteExecutor(InetSocketAddress inet, List jarFiles) { + public RemoteExecutor(UUID executionID, InetSocketAddress inet, List jarFiles) { + this.executionID = executionID; this.jarFiles = jarFiles; this.address = inet; } @@ -101,6 +104,7 @@ public JobExecutionResult executeJar(String jarPath, String assemblerClass, Stri Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader(), -1); c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution()); + c.setSessionID(executionID); JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true); if(result instanceof JobExecutionResult) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index e219a38d6b668..d2aaf4edee429 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -26,7 +26,10 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.List; +import java.util.UUID; +import akka.pattern.Patterns; +import akka.util.Timeout; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; import org.apache.flink.configuration.IllegalConfigurationException; @@ -52,6 +55,7 @@ import org.apache.flink.runtime.client.SerializedJobExecutionResult; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.JobManager; +import org.apache.flink.runtime.messages.JobManagerMessages; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,6 +93,11 @@ public class Client { */ private int maxSlots = -1; + /** + * The session id, passed from the ExecutionEnvironment. + */ + private UUID sessionID; + /** ID of the last job submitted with this client. */ private JobID lastJobId = null; @@ -351,6 +360,7 @@ public JobSubmissionResult run(JobWithJars prog, int parallelism, boolean wait) public JobSubmissionResult run(OptimizedPlan compiledPlan, List libraries, boolean wait) throws ProgramInvocationException { JobGraph job = getJobGraph(compiledPlan, libraries); + job.setSessionID(sessionID); this.lastJobId = job.getJobID(); return run(job, wait); } @@ -420,6 +430,40 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn } } + /** + * Setter for the ExecutionEnvironment to set the sesion id. + * @param sessionID + */ + public void setSessionID(UUID sessionID) { + this.sessionID = sessionID; + } + + /** + * Finishes the current session at the job manager. + * @throws ProgramInvocationException + */ + public void endSession() throws Exception { + LOG.info("Telling job manager to end the session {}.", sessionID); + final ActorSystem actorSystem; + try { + actorSystem = JobClient.startJobClientActorSystem(configuration); + } + catch (Exception e) { + throw new RuntimeException("Could start client actor system.", e); + } + + LOG.info("Looking up JobManager"); + final ActorRef jobManager; + try { + jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, configuration); + } + catch (IOException e) { + throw new RuntimeException("Failed to resolve JobManager", e); + } + + Patterns.ask(jobManager, new JobManagerMessages.RemoveCachedJobs(sessionID), new Timeout(AkkaUtils.getDefaultTimeout())); + } + // -------------------------------------------------------------------------------------------- public static final class OptimizerPlanEnvironment extends ExecutionEnvironment { @@ -450,7 +494,11 @@ public String getExecutionPlan() throws Exception { // do not go on with anything now! throw new ProgramAbortException(); } - + + @Override + public void startNewSession() { + } + private void setAsContext() { ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java index 35e5846797edf..8ca72ba066aab 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ContextEnvironment.java @@ -20,6 +20,7 @@ import java.io.File; import java.util.List; +import java.util.UUID; import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobSubmissionResult; @@ -79,6 +80,12 @@ public String getExecutionPlan() throws Exception { return gen.getOptimizerPlanAsJSON(op); } + @Override + public void startNewSession() throws Exception { + client.endSession(); + executionId = UUID.randomUUID(); + } + public boolean isWait() { return wait; } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index 10096dac6021d..f38899018b724 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -712,7 +712,11 @@ public String getExecutionPlan() throws Exception { // do not go on with anything now! throw new Client.ProgramAbortException(); } - + + @Override + public void startNewSession() throws Exception { + } + public void setAsContext() { ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() { @Override diff --git a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java index a1bd0e2dd839e..0c5da4482e6fc 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/RemoteExecutorHostnameResolutionTest.java @@ -26,6 +26,7 @@ import java.net.InetSocketAddress; import java.net.UnknownHostException; import java.util.Collections; +import java.util.UUID; import static org.junit.Assert.fail; @@ -38,7 +39,7 @@ public class RemoteExecutorHostnameResolutionTest { @Test public void testUnresolvableHostname1() { try { - RemoteExecutor exec = new RemoteExecutor(nonExistingHostname, port); + RemoteExecutor exec = new RemoteExecutor(UUID.randomUUID(), nonExistingHostname, port); exec.executePlan(getProgram()); fail("This should fail with an UnknownHostException"); } @@ -56,7 +57,7 @@ public void testUnresolvableHostname1() { public void testUnresolvableHostname2() { try { InetSocketAddress add = new InetSocketAddress(nonExistingHostname, port); - RemoteExecutor exec = new RemoteExecutor(add, Collections.emptyList()); + RemoteExecutor exec = new RemoteExecutor(UUID.randomUUID(), add, Collections.emptyList()); exec.executePlan(getProgram()); fail("This should fail with an UnknownHostException"); } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java index 74bdb09527860..0a9629ee255bc 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/PlanExecutor.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.List; +import java.util.UUID; /** * A PlanExecutor runs a plan. The specific implementation (such as the org.apache.flink.client.LocalExecutor @@ -109,7 +110,7 @@ public static PlanExecutor createLocalExecutor(Configuration configuration) { * from within the UDFs. * @return A remote executor. */ - public static PlanExecutor createRemoteExecutor(String hostname, int port, String... jarFiles) { + public static PlanExecutor createRemoteExecutor(UUID executionID, String hostname, int port, String... jarFiles) { if (hostname == null) { throw new IllegalArgumentException("The hostname must not be null."); } @@ -123,7 +124,7 @@ public static PlanExecutor createRemoteExecutor(String hostname, int port, Strin : Arrays.asList(jarFiles); try { - return reClass.getConstructor(String.class, int.class, List.class).newInstance(hostname, port, files); + return reClass.getConstructor(UUID.class, String.class, int.class, List.class).newInstance(executionID, hostname, port, files); } catch (Throwable t) { throw new RuntimeException("An error occurred while loading the remote executor (" diff --git a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java index b48debc381cd5..7036157bdde59 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/CollectionEnvironment.java @@ -51,4 +51,8 @@ public int getParallelism() { public String getExecutionPlan() throws Exception { throw new UnsupportedOperationException("Execution plans are not used for collection-based execution."); } + + @Override + public void startNewSession() throws Exception { + } } diff --git a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java index 2e7e57cfd563e..fe9f192fd33d7 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/ExecutionEnvironment.java @@ -106,9 +106,9 @@ public abstract class ExecutionEnvironment { private static boolean allowLocalExecution = true; // -------------------------------------------------------------------------------------------- - - private final UUID executionId; - + + protected UUID executionId; + private final List> sinks = new ArrayList>(); private final List> cacheFile = new ArrayList>(); @@ -123,7 +123,7 @@ public abstract class ExecutionEnvironment { * Creates a new Execution Environment. */ protected ExecutionEnvironment() { - this.executionId = UUID.randomUUID(); + executionId = UUID.randomUUID(); } /** @@ -242,6 +242,11 @@ public String getIdString() { return this.executionId.toString(); } + /** + * Starts a new session, discarding all intermediate results. + */ + public abstract void startNewSession() throws Exception; + // -------------------------------------------------------------------------------------------- // Registry for types and serializers // -------------------------------------------------------------------------------------------- diff --git a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java index 25042b6b7edef..27e74c6e3f59b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/LocalEnvironment.java @@ -24,6 +24,8 @@ import org.apache.flink.api.common.PlanExecutor; import org.apache.flink.configuration.Configuration; +import java.util.UUID; + /** * An {@link ExecutionEnvironment} that runs the program locally, multi-threaded, in the JVM where the * environment is instantiated. When this environment is instantiated, it uses a default parallelism @@ -61,6 +63,12 @@ public String getExecutionPlan() throws Exception { PlanExecutor executor = PlanExecutor.createLocalExecutor(configuration); return executor.getOptimizerPlanAsJSON(p); } + + @Override + public void startNewSession() { + executionId = UUID.randomUUID(); + } + // -------------------------------------------------------------------------------------------- @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java index c9a4fe047a904..91872e65f6a17 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/RemoteEnvironment.java @@ -21,6 +21,13 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.Plan; import org.apache.flink.api.common.PlanExecutor; +import org.apache.flink.configuration.ConfigConstants; +import org.apache.flink.configuration.Configuration; + +import java.lang.reflect.Constructor; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.UUID; /** * An {@link ExecutionEnvironment} that sends programs @@ -35,6 +42,8 @@ public class RemoteEnvironment extends ExecutionEnvironment { private final int port; private final String[] jarFiles; + + private static String clientClassName = "org.apache.flink.client.program.Client"; /** * Creates a new RemoteEnvironment that points to the master (JobManager) described by the @@ -65,7 +74,7 @@ public RemoteEnvironment(String host, int port, String... jarFiles) { public JobExecutionResult execute(String jobName) throws Exception { Plan p = createProgramPlan(jobName); - PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles); + PlanExecutor executor = PlanExecutor.createRemoteExecutor(executionId, host, port, jarFiles); executor.setPrintStatusDuringExecution(p.getExecutionConfig().isSysoutLoggingEnabled()); return executor.executePlan(p); } @@ -76,10 +85,38 @@ public String getExecutionPlan() throws Exception { p.setDefaultParallelism(getParallelism()); registerCachedFilesWithPlan(p); - PlanExecutor executor = PlanExecutor.createRemoteExecutor(host, port, jarFiles); + PlanExecutor executor = PlanExecutor.createRemoteExecutor(executionId, host, port, jarFiles); return executor.getOptimizerPlanAsJSON(p); } + @Override + public void startNewSession() { + try { + Class clientClass = Class.forName(clientClassName); + Constructor constructor = clientClass.getConstructor(Configuration.class, ClassLoader.class); + Configuration configuration = new Configuration(); + configuration.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, host); + configuration.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, port); + Method setSession = clientClass.getDeclaredMethod("setSessionID", UUID.class); + Method startNewSession = clientClass.getDeclaredMethod("endSession"); + + Object client = constructor.newInstance(configuration, ClassLoader.getSystemClassLoader()); + setSession.invoke(client, executionId); + startNewSession.invoke(client); + } catch (NoSuchMethodException e) { + throw new RuntimeException("Couldn't find constructor/method method to invoke on the Client class."); + } catch (InstantiationException e) { + throw new RuntimeException("Couldn't instantiate the Client class."); + } catch (IllegalAccessException e) { + throw new RuntimeException("Couldn't access the Client class or its methods."); + } catch (InvocationTargetException e) { + throw new RuntimeException("Couldn't invoke the Client class method."); + } catch (ClassNotFoundException e) { + throw new RuntimeException("Couldn't find the Client class."); + } + executionId = UUID.randomUUID(); + } + @Override public String toString() { return "Remote Environment (" + this.host + ":" + this.port + " - parallelism = " + diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index a041f861744de..a323feb9fc5ce 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -168,6 +168,9 @@ public class ExecutionGraph implements Serializable { /** Flag that indicate whether the executed dataflow should be periodically snapshotted */ private boolean snapshotCheckpointsEnabled; + + /** Flag to indicate whether the Graph has been archived */ + private boolean isArchived = false; // ------ Execution status and progress. These values are volatile, and accessed under the lock ------- @@ -283,6 +286,10 @@ public ScheduleMode getScheduleMode() { return scheduleMode; } + public boolean isArchived() { + return isArchived; + } + public void enableSnaphotCheckpointing(long interval, long checkpointTimeout, List verticesToTrigger, List verticesToWaitFor, @@ -641,6 +648,8 @@ public void prepareForArchiving() { requiredJarFiles.clear(); jobStatusListenerActors.clear(); executionListenerActors.clear(); + + isArchived = true; } public ExecutionConfig getExecutionConfig() { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 28fa78e67853f..baa8042bdb4f0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; +import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobID; @@ -39,6 +41,7 @@ import org.apache.flink.runtime.blob.BlobClient; import org.apache.flink.runtime.blob.BlobKey; import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings; +import scala.concurrent.duration.FiniteDuration; /** * The JobGraph represents a Flink dataflow program, at the low level that the JobManager accepts. @@ -70,7 +73,12 @@ public class JobGraph implements Serializable { /** Set of blob keys identifying the JAR files required to run this job. */ private final List userJarBlobKeys = new ArrayList(); - + + /** + * Session that this job belongs to. + */ + private UUID sessionID = null; + /** ID of this job. */ private final JobID jobID; @@ -79,6 +87,9 @@ public class JobGraph implements Serializable { /** The number of times that failed tasks should be re-executed */ private int numExecutionRetries; + + /** The Duration after which the corresponding ExecutionGraph is removed at the job manager after it has been executed. */ + private FiniteDuration sessionTimeout = new FiniteDuration(10, TimeUnit.MINUTES); /** flag to enable queued scheduling */ private boolean allowQueuedScheduling; @@ -100,14 +111,14 @@ public JobGraph() { } /** - * Constructs a new job graph with the given name and a random job ID. + * Constructs a new job graph with the given name, a random job ID and execution ID. * * @param jobName The name of the job */ public JobGraph(String jobName) { this(null, jobName); } - + /** * Constructs a new job graph with the given name and a random job ID. * @@ -163,6 +174,22 @@ public JobGraph(JobID jobId, String jobName, AbstractJobVertex... vertices) { public JobID getJobID() { return this.jobID; } + + /** + * Gets the session id this graph belongs to. + * @return the session identifier. + */ + public UUID getSessionID() { + return sessionID; + } + + /** + * Setst the session id (set by the Client). + * @param sessionID + */ + public void setSessionID(UUID sessionID) { + this.sessionID = sessionID; + } /** * Returns the name assigned to the job graph. @@ -206,10 +233,23 @@ public void setNumberOfExecutionRetries(int numberOfExecutionRetries) { public int getNumberOfExecutionRetries() { return numExecutionRetries; } + + /** + * Gets the timeout after which the corresponding ExecutionGraph is removed at the + * job manager after it has been executed. + * @return a timeout as a FiniteDuration + */ + public FiniteDuration getSessionTimeout() { + return sessionTimeout; + } public void setAllowQueuedScheduling(boolean allowQueuedScheduling) { this.allowQueuedScheduling = allowQueuedScheduling; } + + public void setSessionTimeout(FiniteDuration sessionTimeout) { + this.sessionTimeout = sessionTimeout; + } public boolean getAllowQueuedScheduling() { return allowQueuedScheduling; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala index 26d7272ed6525..85b6d9f5ce253 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobInfo.scala @@ -18,8 +18,12 @@ package org.apache.flink.runtime.jobmanager +import java.util.UUID + import akka.actor.ActorRef +import scala.concurrent.duration.FiniteDuration + /** * Utility class to store job information on the [[JobManager]]. The JobInfo stores which actor * submitted the job, when the start time and, if already terminated, the end time was. @@ -29,7 +33,11 @@ import akka.actor.ActorRef * @param client Actor which submitted the job * @param start Starting time */ -class JobInfo(val client: ActorRef, val start: Long){ +class JobInfo(val client: ActorRef, val start: Long, + val sessionID : UUID, val sessionTimeout: FiniteDuration) { + + var sessionAlive = true + var end: Long = -1 def duration: Long = { @@ -42,5 +50,7 @@ class JobInfo(val client: ActorRef, val start: Long){ } object JobInfo{ - def apply(client: ActorRef, start: Long) = new JobInfo(client, start) + def apply(client: ActorRef, start: Long, + sessionID: UUID, sessionTimeout: FiniteDuration) = + new JobInfo(client, start, sessionID, sessionTimeout) } diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 4745fb6fb5ea0..927b79ef935be 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -61,6 +61,8 @@ import scala.concurrent._ import scala.concurrent.duration._ import scala.language.postfixOps import scala.collection.JavaConverters._ +import scala.concurrent.ExecutionContext.Implicits.global + /** * The job manager is responsible for receiving Flink jobs, scheduling the tasks, gathering the @@ -100,7 +102,7 @@ class JobManager(protected val flinkConfiguration: Configuration, protected val timeout: FiniteDuration) extends Actor with ActorLogMessages with ActorSynchronousLogging { - /** List of current jobs running jobs */ + /** List of current running jobs */ protected val currentJobs = scala.collection.mutable.HashMap[JobID, (ExecutionGraph, JobInfo)]() @@ -319,7 +321,13 @@ class JobManager(protected val flinkConfiguration: Configuration, throw exception } - removeJob(jobID) + if (jobInfo.sessionID != null && jobInfo.sessionAlive) { + context.system.scheduler.scheduleOnce(jobInfo.sessionTimeout) { + removeJob(jobID) + } + } else { + removeJob(jobID) + } } case None => @@ -406,6 +414,19 @@ class JobManager(protected val flinkConfiguration: Configuration, case RequestJobManagerStatus => sender() ! JobManagerStatusAlive + case RemoveCachedJobs(sessionID) => + currentJobs.values.foreach { + case (graph, info) => + if (sessionID.equals(info.sessionID)) { + if (graph.getState.isTerminalState) { + removeJob(graph.getJobID) + } else { + // triggers removal upon completion of job + info.sessionAlive = false + } + } + } + case Disconnect(msg) => val taskManager = sender() @@ -464,7 +485,8 @@ class JobManager(protected val flinkConfiguration: Configuration, executionGraph = currentJobs.getOrElseUpdate(jobGraph.getJobID, (new ExecutionGraph(jobGraph.getJobID, jobGraph.getName, jobGraph.getJobConfiguration, timeout, jobGraph.getUserJarBlobKeys, userCodeLoader), - JobInfo(sender(), System.currentTimeMillis())))._1 + JobInfo(sender(), System.currentTimeMillis(), jobGraph.getSessionID(), + jobGraph.getSessionTimeout())))._1 // configure the execution graph val jobNumberRetries = if (jobGraph.getNumberOfExecutionRetries >= 0) { diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala index 03e837db1e70a..bebcbf21b3604 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala @@ -18,6 +18,8 @@ package org.apache.flink.runtime.messages +import java.util.UUID + import org.apache.flink.api.common.JobID import org.apache.flink.runtime.client.{SerializedJobExecutionResult, JobStatusMessage} import org.apache.flink.runtime.executiongraph.{ExecutionAttemptID, ExecutionGraph} @@ -208,6 +210,12 @@ object JobManagerMessages { */ case class JobNotFound(jobID: JobID) extends JobResponse with JobStatusResponse + /** + * Removes the jobs belonging to the session id supplied. + * @param sessionID The session id + */ + case class RemoveCachedJobs(sessionID: UUID) + /** * Requests the instances of all registered task managers. */ diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala index ee584f08ae76a..e13bfeca164fa 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/jobmanager/JobManagerITCase.scala @@ -18,6 +18,8 @@ package org.apache.flink.runtime.jobmanager +import java.util.UUID + import Tasks._ import akka.actor.ActorSystem import akka.actor.Status.{Success, Failure} @@ -27,7 +29,8 @@ import akka.util.Timeout import org.apache.flink.runtime.client.JobExecutionException import org.apache.flink.runtime.jobgraph.{AbstractJobVertex, DistributionPattern, JobGraph, ScheduleMode} import org.apache.flink.runtime.messages.JobManagerMessages._ -import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.NotifyWhenJobRemoved +import org.apache.flink.runtime.testingUtils.TestingJobManagerMessages.{ExecutionGraphFound, +RequestExecutionGraph, NotifyWhenJobRemoved} import org.apache.flink.runtime.testingUtils.TestingUtils import org.junit.runner.RunWith import org.scalatest.junit.JUnitRunner @@ -609,6 +612,98 @@ WordSpecLike with Matchers with BeforeAndAfterAll { cluster.stop() } } + + "remove execution graphs when the client ends the session explicitly" in { + val vertex = new AbstractJobVertex("Test Vertex") + vertex.setInvokableClass(classOf[NoOpInvokable]) + + val slowVertex = new AbstractJobVertex("Slow Vertex") + slowVertex.setInvokableClass(classOf[WaitingNoOpInvokable]) + + val sessionID = UUID.randomUUID() + + val jobGraph1 = new JobGraph("Test Job", vertex) + jobGraph1.setSessionID(sessionID) + jobGraph1.setSessionTimeout(9 hours) + + val jobGraph2 = new JobGraph("Slow Test Job", slowVertex) + jobGraph2.setSessionID(sessionID) + jobGraph2.setSessionTimeout(9 hours) + + val cluster = TestingUtils.startTestingCluster(1) + val jm = cluster.getJobManager + + try { + within(TestingUtils.TESTING_DURATION) { + jm ! SubmitJob(jobGraph1, false) + expectMsg(Success(jobGraph1.getJobID)) + expectMsgType[JobResultSuccess] + + jm ! SubmitJob(jobGraph2, false) + + // remove jobs while jobGraph2 is still running + jm ! RemoveCachedJobs(sessionID) + + expectMsg(Success(jobGraph2.getJobID)) + expectMsgType[JobResultSuccess] + + jm ! RequestExecutionGraph(jobGraph1.getJobID) + val graph1 = expectMsgType[ExecutionGraphFound].executionGraph + assert(graph1.isArchived) + + jm ! RequestExecutionGraph(jobGraph2.getJobID) + val graph2 = expectMsgType[ExecutionGraphFound].executionGraph + assert(graph2.isArchived) + } + } finally { + cluster.stop() + } + } + + "remove execution graphs when when the client's session times out" in { + val vertex = new AbstractJobVertex("Test Vertex") + vertex.setParallelism(1) + vertex.setInvokableClass(classOf[NoOpInvokable]) + + val sessionID = UUID.randomUUID() + + val jobGraph1 = new JobGraph("Test Job", vertex) + jobGraph1.setSessionID(sessionID) + jobGraph1.setSessionTimeout(1 second) + + val jobGraph2 = new JobGraph("Test Job", vertex) + jobGraph2.setSessionID(sessionID) + jobGraph2.setSessionTimeout(1 second) + + val cluster = TestingUtils.startTestingCluster(1) + val jm = cluster.getJobManager + + try { + within(TestingUtils.TESTING_DURATION) { + jm ! SubmitJob(jobGraph1, false) + expectMsg(Success(jobGraph1.getJobID)) + expectMsgType[JobResultSuccess] + + jm ! SubmitJob(jobGraph2, false) + expectMsg(Success(jobGraph2.getJobID)) + expectMsgType[JobResultSuccess] + + // wait until graph is archived + Thread.sleep(2000) + + jm ! RequestExecutionGraph(jobGraph1.getJobID) + val graph1 = expectMsgType[ExecutionGraphFound].executionGraph + assert(graph1.isArchived) + + jm ! RequestExecutionGraph(jobGraph2.getJobID) + val graph2 = expectMsgType[ExecutionGraphFound].executionGraph + assert(graph2.isArchived) + } + } finally { + cluster.stop() + } + } + } class WaitingOnFinalizeJobVertex(name: String, val waitingTime: Long) extends diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala index efa1e8840b368..caca4fc6c72a6 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/ExecutionEnvironment.scala @@ -138,6 +138,13 @@ class ExecutionEnvironment(javaEnv: JavaEnv) { javaEnv.getIdString } + /** + * Starts a new session, discarding all intermediate results. + */ + def startNewSession { + javaEnv.startNewSession() + } + /** * Registers the given type with the serializer at the [[KryoSerializer]]. * diff --git a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java index cf1caeb1db9d8..c70e1ae62a10d 100644 --- a/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java +++ b/flink-test-utils/src/main/java/org/apache/flink/test/util/TestEnvironment.java @@ -44,6 +44,10 @@ public TestEnvironment(ForkableFlinkMiniCluster executor, int parallelism) { setParallelism(parallelism); } + @Override + public void startNewSession() throws Exception { + } + @Override public JobExecutionResult execute(String jobName) throws Exception { try {