Skip to content

Commit

Permalink
[FLINK-2097] [core] Implement job session management.
Browse files Browse the repository at this point in the history
Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it
around for further operations to be attached to the graph. That is the basis if interactive sessions.

This pull request implements a rudimentary session management. Together with the backtracking apache#640,
this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually.

ExecutionGraphs are kept as long as
- no timeout occurred or
- the session has not been explicitly ended
  • Loading branch information
mxm committed Jun 19, 2015
1 parent 9ee4fa5 commit 9852d39
Show file tree
Hide file tree
Showing 25 changed files with 574 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,13 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
// check if we start a session dedicated for this execution
final boolean shutDownAtEnd;
if (this.flink == null) {
// we start a session just for us now
shutDownAtEnd = true;

if (getSessionTimeout() == 0) {
// we start a session just for us now
shutDownAtEnd = true;
} else {
shutDownAtEnd = false;
}

// configure the number of local slots equal to the parallelism of the local plan
if (this.taskManagerNumSlots == DEFAULT_TASK_MANAGER_NUM_SLOTS) {
int maxParallelism = plan.getMaximumParallelism();
Expand All @@ -174,6 +178,10 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {

JobGraphGenerator jgg = new JobGraphGenerator();
JobGraph jobGraph = jgg.compileJobGraph(op);
if (getJobID() != null) {
jobGraph.setJobID(getJobID());
jobGraph.setSessionTimeout(getSessionTimeout());
}

boolean sysoutPrint = isPrintingStatusDuringExecution();
SerializedJobExecutionResult result = flink.submitJobAndWait(jobGraph,sysoutPrint);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ public class RemoteExecutor extends PlanExecutor {

private final List<String> jarFiles;
private final InetSocketAddress address;

public RemoteExecutor(String hostname, int port) {
this(hostname, port, Collections.<String>emptyList());
}
Expand Down Expand Up @@ -85,7 +85,9 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader(), -1);
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());

c.setJobID(getJobID());
c.setSessionTimeout(getSessionTimeout());

JobSubmissionResult result = c.run(p, -1, true);
if (result instanceof JobExecutionResult) {
return (JobExecutionResult) result;
Expand All @@ -101,7 +103,9 @@ public JobExecutionResult executeJar(String jarPath, String assemblerClass, Stri

Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader(), -1);
c.setPrintStatusDuringExecution(isPrintingStatusDuringExecution());

c.setJobID(getJobID());
c.setSessionTimeout(getSessionTimeout());

JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
if(result instanceof JobExecutionResult) {
return (JobExecutionResult) result;
Expand All @@ -120,6 +124,7 @@ public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
return jsonGen.getOptimizerPlanAsJSON(op);
}


// --------------------------------------------------------------------------------------------
// Utilities
Expand Down Expand Up @@ -147,5 +152,7 @@ private static InetSocketAddress getInetFromHostport(String hostport) {
}
return new InetSocketAddress(host, port);
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.net.UnknownHostException;
import java.util.List;

import akka.pattern.Patterns;
import akka.util.Timeout;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.configuration.IllegalConfigurationException;
Expand All @@ -52,6 +54,7 @@
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -89,6 +92,12 @@ public class Client {
*/
private int maxSlots = -1;

/** Job identifer, may be null if no session management is desired */
private JobID jobID = null;

/** The session timeout in seconds */
private long sessionTimeout = 0;

/** ID of the last job submitted with this client. */
private JobID lastJobId = null;

Expand Down Expand Up @@ -187,6 +196,23 @@ public void setPrintStatusDuringExecution(boolean print) {
this.printStatusDuringExecution = print;
}

/**
* Sets the job identifier to be used when submitting a job. May be set to null to disable
* session management.
* @param jobID
*/
public void setJobID(JobID jobID) {
this.jobID = jobID;
}

/**
* Sets the session timeout
* @param sessionTimeout The session timeout in seconds.
*/
public void setSessionTimeout(long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}

/**
* @return -1 if unknown. The maximum number of available processing slots at the Flink cluster
* connected to this client.
Expand Down Expand Up @@ -352,7 +378,11 @@ public JobSubmissionResult run(JobWithJars prog, int parallelism, boolean wait)

public JobSubmissionResult run(OptimizedPlan compiledPlan, List<File> libraries, boolean wait) throws ProgramInvocationException {
JobGraph job = getJobGraph(compiledPlan, libraries);
this.lastJobId = job.getJobID();
// if a JobID has been supplied, set it alongside with the session timeout
if (jobID != null) {
job.setJobID(jobID);
job.setSessionTimeout(sessionTimeout);
}
return run(job, wait);
}

Expand Down Expand Up @@ -421,6 +451,34 @@ public JobSubmissionResult run(JobGraph jobGraph, boolean wait) throws ProgramIn
}
}

/**
* Finishes the current session at the job manager.
*/
public void endSession() throws Exception {
LOG.info("Telling job manager to end the session {}.", jobID);
final ActorSystem actorSystem;
try {
actorSystem = JobClient.startJobClientActorSystem(configuration);
}
catch (Exception e) {
throw new RuntimeException("Could start client actor system.", e);
}

LOG.info("Looking up JobManager");
final ActorRef jobManager;
try {
jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, configuration);
}
catch (IOException e) {
throw new RuntimeException("Failed to resolve JobManager", e);
}

// TODO wait for an answer
Patterns.ask(jobManager,
new JobManagerMessages.RemoveCachedJob(jobID),
new Timeout(AkkaUtils.getDefaultTimeout()));
}

// --------------------------------------------------------------------------------------------

public static final class OptimizerPlanEnvironment extends ExecutionEnvironment {
Expand Down Expand Up @@ -451,7 +509,11 @@ public String getExecutionPlan() throws Exception {
// do not go on with anything now!
throw new ProgramAbortException();
}


@Override
public void startNewSession() {
}

private void setAsContext() {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.util.List;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
Expand Down Expand Up @@ -60,6 +61,8 @@ public JobExecutionResult execute(String jobName) throws Exception {
Plan p = createProgramPlan(jobName);
JobWithJars toRun = new JobWithJars(p, this.jarFilesToAttach, this.userCodeClassLoader);

this.client.setJobID(jobID);
this.client.setSessionTimeout(sessionTimeout);
JobSubmissionResult result = this.client.run(toRun, getParallelism(), wait);
if(result instanceof JobExecutionResult) {
this.lastJobExecutionResult = (JobExecutionResult) result;
Expand All @@ -81,6 +84,12 @@ public String getExecutionPlan() throws Exception {
return gen.getOptimizerPlanAsJSON(op);
}

@Override
public void startNewSession() throws Exception {
client.endSession();
jobID = JobID.generate();
}

public boolean isWait() {
return wait;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -712,7 +712,11 @@ public String getExecutionPlan() throws Exception {
// do not go on with anything now!
throw new Client.ProgramAbortException();
}


@Override
public void startNewSession() throws Exception {
}

public void setAsContext() {
ExecutionEnvironmentFactory factory = new ExecutionEnvironmentFactory() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.client;

import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
Expand All @@ -26,6 +27,7 @@
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.Collections;
import java.util.UUID;

import static org.junit.Assert.fail;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
* dependencies of all runtime classes.
*/
public abstract class PlanExecutor {

private static final String LOCAL_EXECUTOR_CLASS = "org.apache.flink.client.LocalExecutor";
private static final String REMOTE_EXECUTOR_CLASS = "org.apache.flink.client.RemoteExecutor";

Expand All @@ -51,6 +51,43 @@ public void setPrintStatusDuringExecution(boolean printStatus) {
public boolean isPrintingStatusDuringExecution() {
return this.printUpdatesToSysout;
}

/** Job identifer, may only be set for explicit resume of a job. */
private JobID jobID = null;

private long sessionTimeout = 0;

/**
* Sets the job identifier for execution of jobs. May be set to null to disable session management.
* @param jobID
*/
public void setJobID(JobID jobID) {
this.jobID = jobID;
}

/**
* Gets the job identifier of this executor.
* @return
*/
public JobID getJobID() {
return jobID;
}

/**
* Gets the session timeout.
* @return The session timeout in seconds.
*/
public long getSessionTimeout() {
return sessionTimeout;
}

/**
* Sets the session timeout.
* @return The session timeout in seconds.
*/
public void setSessionTimeout(long sessionTimeout) {
this.sessionTimeout = sessionTimeout;
}

// ------------------------------------------------------------------------
// Program Execution
Expand Down Expand Up @@ -102,7 +139,7 @@ public static PlanExecutor createLocalExecutor(Configuration configuration) {
/**
* Creates an executor that runs the plan on a remote environment. The remote executor is typically used
* to send the program to a cluster for execution.
*
*
* @param hostname The address of the JobManager to send the program to.
* @param port The port of the JobManager to send the program to.
* @param jarFiles A list of jar files that contain the user-defined function (UDF) classes and all classes used
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ public int getParallelism() {
public String getExecutionPlan() throws Exception {
throw new UnsupportedOperationException("Execution plans are not used for collection-based execution.");
}

@Override
public void startNewSession() throws Exception {
}
}
Loading

0 comments on commit 9852d39

Please sign in to comment.