Skip to content

Commit

Permalink
[FLINK-2097][core] implement a job session management
Browse files Browse the repository at this point in the history
Sessions make sure that the JobManager does not immediately discard a
JobGraph after execution, but keeps it around for further operations to
be attached to the graph. That is the basis for interactive sessions.

This pull request implements a rudimentary session management. Together
with the backtracking apache#640, this will enable users to submit jobs to the
cluster and access intermediate results. Session handling ensures that
the results are cleared eventually.

ExecutionGraphs are kept as long as
  - no timeout occurred or
  - the session has not been explicitly ended

The following changes have also been made in this pull request:

- The Job ID is created through the ExecutionEnvironment and passed through

- Sessions can be termined by the ExecutionEnvironment or directly
  through the executor

- The environments use reapers (local) and shutdown hooks (remote) to
  ensure session termination when the environment runs out of scope

- The Client manages only connections to the JobManager, it is not job
  specific

This closes apache#858.
  • Loading branch information
mxm committed Sep 22, 2015
1 parent 7984acc commit 71bf2f5
Show file tree
Hide file tree
Showing 62 changed files with 2,114 additions and 1,040 deletions.
182 changes: 103 additions & 79 deletions flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
Expand Down Expand Up @@ -297,44 +300,51 @@ protected int run(String[] args) {
int userParallelism = options.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);

Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
Client client = getClient(options, program.getMainClassName(), userParallelism);
client.setPrintStatusDuringExecution(options.getStdoutLogging());
LOG.debug("Client slots is set to {}", client.getMaxSlots());
if(client.getMaxSlots() != -1 && userParallelism == -1) {
logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
"To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
}

// check if detached per job yarn cluster is used to start flink
if(yarnCluster != null && yarnCluster.isDetached()) {
logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
exitCode = executeProgram(program, client, userParallelism, false);
} else {
// regular (blocking) execution.
exitCode = executeProgram(program, client, userParallelism, true);
}
try {
if (client.getMaxSlots() != -1 && userParallelism == -1) {
logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
"To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
}

// show YARN cluster status if its not a detached YARN cluster.
if (yarnCluster != null && !yarnCluster.isDetached()) {
List<String> msgs = yarnCluster.getNewMessages();
if (msgs != null && msgs.size() > 1) {
// check if detached per job yarn cluster is used to start flink
if (yarnCluster != null && yarnCluster.isDetached()) {
logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
exitCode = executeProgramDetached(program, client, userParallelism);
}
else {
// regular (blocking) execution.
exitCode = executeProgramBlocking(program, client, userParallelism);
}

// show YARN cluster status if its not a detached YARN cluster.
if (yarnCluster != null && !yarnCluster.isDetached()) {
List<String> msgs = yarnCluster.getNewMessages();
if (msgs != null && msgs.size() > 1) {

logAndSysout("The following messages were created by the YARN cluster while running the Job:");
for (String msg : msgs) {
logAndSysout(msg);
logAndSysout("The following messages were created by the YARN cluster while running the Job:");
for (String msg : msgs) {
logAndSysout(msg);
}
}
if (yarnCluster.hasFailed()) {
logAndSysout("YARN cluster is in failed state!");
logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
}
}
if (yarnCluster.hasFailed()) {
logAndSysout("YARN cluster is in failed state!");
logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
}
}

return exitCode;
return exitCode;
}
finally {
client.shutdown();
}
}
catch (Throwable t) {
return handleError(t);
Expand Down Expand Up @@ -395,8 +405,10 @@ protected int info(String[] args) {
int parallelism = options.getParallelism();

LOG.info("Creating program plan dump");
Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), parallelism);
FlinkPlan flinkPlan = client.getOptimizedPlan(program, parallelism);

Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);

FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism);

if (webFrontend) {
this.optimizedPlan = flinkPlan;
Expand Down Expand Up @@ -425,6 +437,8 @@ protected int info(String[] args) {
}
}
return 0;


}
catch (Throwable t) {
return handleError(t);
Expand Down Expand Up @@ -623,52 +637,65 @@ protected int cancel(String[] args) {
// Interaction with programs and JobManager
// --------------------------------------------------------------------------------------------

protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
LOG.info("Starting execution of program");
JobSubmissionResult execResult;
protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
JobSubmissionResult result;
try {
execResult = client.run(program, parallelism, wait);
}
catch (ProgramInvocationException e) {
result = client.runDetached(program, parallelism);
} catch (ProgramInvocationException e) {
return handleError(e);
}
finally {
} finally {
program.deleteExtractedLibraries();
}

if(wait) {
LOG.info("Program execution finished");
}

// we come here after the job has finished (or the job has been submitted)
if (execResult != null) {
if (result != null) {
// if the job has been submitted to a detached YARN cluster, there won't be any
// exec results, but the object will be set (for the job id)
if (yarnCluster != null && yarnCluster.isDetached()) {
if(execResult.getJobID() == null) {
throw new RuntimeException("Error while starting job. No Job ID set.");
}
yarnCluster.stopAfterJob(execResult.getJobID());

yarnCluster.stopAfterJob(result.getJobID());
yarnCluster.disconnect();
if(!webFrontend) {
System.out.println("The Job has been submitted with JobID "+execResult.getJobID());
if (!webFrontend) {
System.out.println("The Job has been submitted with JobID " + result.getJobID());
}
return 0;
}
if (execResult instanceof JobExecutionResult) {
JobExecutionResult result = (JobExecutionResult) execResult;
if(!webFrontend) {
System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
}
Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0 && !webFrontend) {
System.out.println("Accumulator Results: ");
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
}
} else {
LOG.info("The Job did not return an execution result");
throw new RuntimeException("Error while starting job. No Job ID set.");
}
}

return 0;
}

protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
LOG.info("Starting execution of program");

JobExecutionResult result;
try {
client.setPrintStatusDuringExecution(true);
result = client.runBlocking(program, parallelism);
}
catch (ProgramInvocationException e) {
return handleError(e);
}
finally {
program.deleteExtractedLibraries();
}

LOG.info("Program execution finished");

if (result != null) {
if (!webFrontend) {
System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
}
Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0 && !webFrontend) {
System.out.println("Accumulator Results: ");
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
}
} else {
LOG.info("The Job did not return an execution result");
}

return 0;
}

Expand Down Expand Up @@ -767,20 +794,17 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E
* Retrieves a {@link Client} object from the given command line options and other parameters.
*
* @param options Command line options which contain JobManager address
* @param classLoader Class loader to use by the Client
* @param programName Program name
* @param userParallelism Given user parallelism
* @return
* @throws Exception
*/
protected Client getClient(
CommandLineOptions options,
ClassLoader classLoader,
String programName,
int userParallelism)
throws Exception {
InetSocketAddress jobManagerAddress = null;

InetSocketAddress jobManagerAddress;
int maxSlots = -1;

if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
Expand All @@ -796,26 +820,29 @@ protected Client getClient(

// the number of slots available from YARN:
int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
if(yarnTmSlots == -1) {
if (yarnTmSlots == -1) {
yarnTmSlots = 1;
}
maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
if(userParallelism != -1) {
if (userParallelism != -1) {
int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
logAndSysout("The YARN cluster has "+maxSlots+" slots available, but the user requested a parallelism of "+userParallelism+" on YARN. " +
"Each of the "+flinkYarnClient.getTaskManagerCount()+" TaskManagers will get "+slotsPerTM+" slots.");
logAndSysout("The YARN cluster has " + maxSlots + " slots available, " +
"but the user requested a parallelism of " + userParallelism + " on YARN. " +
"Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
"will get "+slotsPerTM+" slots.");
flinkYarnClient.setTaskManagerSlots(slotsPerTM);
}

try {
yarnCluster = flinkYarnClient.deploy();
yarnCluster.connectToCluster();
}
catch(Exception e) {
catch (Exception e) {
throw new RuntimeException("Error deploying the YARN cluster", e);
}

jobManagerAddress = yarnCluster.getJobManagerAddress();
writeJobManagerAddressToConfig(jobManagerAddress);

logAndSysout("YARN cluster started");
logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL());
Expand Down Expand Up @@ -847,14 +874,11 @@ protected Client getClient(
else {
if(options.getJobManagerAddress() != null) {
jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
writeJobManagerAddressToConfig(jobManagerAddress);
}
}

if(jobManagerAddress != null) {
writeJobManagerAddressToConfig(jobManagerAddress);
}

return new Client(config, classLoader, maxSlots);
return new Client(config, maxSlots);
}

// --------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 71bf2f5

Please sign in to comment.