Skip to content

Commit

Permalink
ok
Browse files Browse the repository at this point in the history
ui

jo

commit ceb2d57
Author: Maximilian Michels <[email protected]>
Date:   Thu Jun 18 16:38:09 2015 +0200

    [FLINK-2097] [core] Finalize session management

commit 30f78f0
Author: Stephan Ewen <[email protected]>
Date:   Fri May 29 14:35:33 2015 +0200

    [FLINK-2097] [core] Improve session management.

     - The Client manages only connections to the JobManager, it is not job specific
     - Executors provide a more explicit life cycle and methods to start new sessions
     - Sessions are handled by the environments
     - The environments use reapers (local) and shutdown hooks (remote) to ensure session termination
       when the environment runs out of scope

commit b6bb34e
Author: Maximilian Michels <[email protected]>
Date:   Wed May 13 17:06:47 2015 +0200

    [FLINK-2097] [core] Implement job session management.

    Sessions make sure that the JobManager does not immediately discard a JobGraph after execution, but keeps it
    around for further operations to be attached to the graph. That is the basis if interactive sessions.

    This pull request implements a rudimentary session management. Together with the backtracking apache#640,
    this will enable users to submit jobs to the cluster and access intermediate results. Session handling ensures that the results are cleared eventually.

    ExecutionGraphs are kept as long as
    - no timeout occurred or
    - the session has not been explicitly ended

major

fix

next

ok

OK

ok

ok3

bla

ok
  • Loading branch information
mxm committed Sep 8, 2015
1 parent 97ad55f commit bb3724a
Show file tree
Hide file tree
Showing 63 changed files with 2,164 additions and 1,042 deletions.
184 changes: 104 additions & 80 deletions flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,13 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.Optimizer;
import org.apache.flink.optimizer.costs.DefaultCostEstimator;
import org.apache.flink.optimizer.plan.FlinkPlan;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
Expand Down Expand Up @@ -298,44 +301,51 @@ protected int run(String[] args) {
int userParallelism = options.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);

Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
Client client = getClient(options, program.getMainClassName(), userParallelism);
client.setPrintStatusDuringExecution(options.getStdoutLogging());
LOG.debug("Client slots is set to {}", client.getMaxSlots());
if(client.getMaxSlots() != -1 && userParallelism == -1) {
logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
"To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
}

// check if detached per job yarn cluster is used to start flink
if(yarnCluster != null && yarnCluster.isDetached()) {
logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
exitCode = executeProgram(program, client, userParallelism, false);
} else {
// regular (blocking) execution.
exitCode = executeProgram(program, client, userParallelism, true);
}
try {
if (client.getMaxSlots() != -1 && userParallelism == -1) {
logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
"To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
}

// show YARN cluster status if its not a detached YARN cluster.
if (yarnCluster != null && !yarnCluster.isDetached()) {
List<String> msgs = yarnCluster.getNewMessages();
if (msgs != null && msgs.size() > 1) {
// check if detached per job yarn cluster is used to start flink
if (yarnCluster != null && yarnCluster.isDetached()) {
logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
exitCode = executeProgramDetached(program, client, userParallelism);
}
else {
// regular (blocking) execution.
exitCode = executeProgramBlocking(program, client, userParallelism);
}

// show YARN cluster status if its not a detached YARN cluster.
if (yarnCluster != null && !yarnCluster.isDetached()) {
List<String> msgs = yarnCluster.getNewMessages();
if (msgs != null && msgs.size() > 1) {

logAndSysout("The following messages were created by the YARN cluster while running the Job:");
for (String msg : msgs) {
logAndSysout(msg);
logAndSysout("The following messages were created by the YARN cluster while running the Job:");
for (String msg : msgs) {
logAndSysout(msg);
}
}
if (yarnCluster.hasFailed()) {
logAndSysout("YARN cluster is in failed state!");
logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
}
}
if (yarnCluster.hasFailed()) {
logAndSysout("YARN cluster is in failed state!");
logAndSysout("YARN Diagnostics: " + yarnCluster.getDiagnostics());
}
}

return exitCode;
return exitCode;
}
finally {
client.shutdown();
}
}
catch (Throwable t) {
return handleError(t);
Expand Down Expand Up @@ -396,12 +406,14 @@ protected int info(String[] args) {
int parallelism = options.getParallelism();

LOG.info("Creating program plan dump");
Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), parallelism);
FlinkPlan flinkPlan = client.getOptimizedPlan(program, parallelism);

Optimizer compiler = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), config);

FlinkPlan flinkPlan = Client.getOptimizedPlan(compiler, program, parallelism);

if (webFrontend) {
this.optimizedPlan = flinkPlan;
this.jobGraph = client.getJobGraph(program, flinkPlan);
this.jobGraph = Client.getJobGraph(program, flinkPlan);
} else {
String jsonPlan = new PlanJSONDumpGenerator()
.getOptimizerPlanAsJSON((OptimizedPlan) flinkPlan);
Expand All @@ -426,6 +438,8 @@ protected int info(String[] args) {
}
}
return 0;


}
catch (Throwable t) {
return handleError(t);
Expand Down Expand Up @@ -624,52 +638,65 @@ protected int cancel(String[] args) {
// Interaction with programs and JobManager
// --------------------------------------------------------------------------------------------

protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
LOG.info("Starting execution of program");
JobSubmissionResult execResult;
protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
JobSubmissionResult result;
try {
execResult = client.run(program, parallelism, wait);
}
catch (ProgramInvocationException e) {
result = client.runDetached(program, parallelism);
} catch (ProgramInvocationException e) {
return handleError(e);
}
finally {
} finally {
program.deleteExtractedLibraries();
}

if(wait) {
LOG.info("Program execution finished");
}

// we come here after the job has finished (or the job has been submitted)
if (execResult != null) {
if (result != null) {
// if the job has been submitted to a detached YARN cluster, there won't be any
// exec results, but the object will be set (for the job id)
if (yarnCluster != null && yarnCluster.isDetached()) {
if(execResult.getJobID() == null) {
throw new RuntimeException("Error while starting job. No Job ID set.");
}
yarnCluster.stopAfterJob(execResult.getJobID());

yarnCluster.stopAfterJob(result.getJobID());
yarnCluster.disconnect();
if(!webFrontend) {
System.out.println("The Job has been submitted with JobID "+execResult.getJobID());
if (!webFrontend) {
System.out.println("The Job has been submitted with JobID " + result.getJobID());
}
return 0;
}
if (execResult instanceof JobExecutionResult) {
JobExecutionResult result = (JobExecutionResult) execResult;
if(!webFrontend) {
System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
}
Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0 && !webFrontend) {
System.out.println("Accumulator Results: ");
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
}
} else {
LOG.info("The Job did not return an execution result");
throw new RuntimeException("Error while starting job. No Job ID set.");
}
}

return 0;
}

protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
LOG.info("Starting execution of program");

JobExecutionResult result;
try {
client.setPrintStatusDuringExecution(true);
result = client.runBlocking(program, parallelism);
}
catch (ProgramInvocationException e) {
return handleError(e);
}
finally {
program.deleteExtractedLibraries();
}

LOG.info("Program execution finished");

if (result != null) {
if (!webFrontend) {
System.out.println("Job Runtime: " + result.getNetRuntime() + " ms");
}
Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0 && !webFrontend) {
System.out.println("Accumulator Results: ");
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
}
} else {
LOG.info("The Job did not return an execution result");
}

return 0;
}

Expand Down Expand Up @@ -768,20 +795,17 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E
* Retrieves a {@link Client} object from the given command line options and other parameters.
*
* @param options Command line options which contain JobManager address
* @param classLoader Class loader to use by the Client
* @param programName Program name
* @param userParallelism Given user parallelism
* @return
* @throws Exception
*/
protected Client getClient(
CommandLineOptions options,
ClassLoader classLoader,
String programName,
int userParallelism)
throws Exception {
InetSocketAddress jobManagerAddress = null;

InetSocketAddress jobManagerAddress;
int maxSlots = -1;

if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
Expand All @@ -797,26 +821,29 @@ protected Client getClient(

// the number of slots available from YARN:
int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
if(yarnTmSlots == -1) {
if (yarnTmSlots == -1) {
yarnTmSlots = 1;
}
maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
if(userParallelism != -1) {
if (userParallelism != -1) {
int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
logAndSysout("The YARN cluster has "+maxSlots+" slots available, but the user requested a parallelism of "+userParallelism+" on YARN. " +
"Each of the "+flinkYarnClient.getTaskManagerCount()+" TaskManagers will get "+slotsPerTM+" slots.");
logAndSysout("The YARN cluster has " + maxSlots + " slots available, " +
"but the user requested a parallelism of " + userParallelism + " on YARN. " +
"Each of the " + flinkYarnClient.getTaskManagerCount() + " TaskManagers " +
"will get "+slotsPerTM+" slots.");
flinkYarnClient.setTaskManagerSlots(slotsPerTM);
}

try {
yarnCluster = flinkYarnClient.deploy();
yarnCluster.connectToCluster();
}
catch(Exception e) {
catch (Exception e) {
throw new RuntimeException("Error deploying the YARN cluster", e);
}

jobManagerAddress = yarnCluster.getJobManagerAddress();
writeJobManagerAddressToConfig(jobManagerAddress);

logAndSysout("YARN cluster started");
logAndSysout("JobManager web interface address " + yarnCluster.getWebInterfaceURL());
Expand Down Expand Up @@ -848,14 +875,11 @@ protected Client getClient(
else {
if(options.getJobManagerAddress() != null) {
jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
writeJobManagerAddressToConfig(jobManagerAddress);
}
}

if(jobManagerAddress != null) {
writeJobManagerAddressToConfig(jobManagerAddress);
}

return new Client(config, classLoader, maxSlots);
return new Client(config, maxSlots);
}

// --------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit bb3724a

Please sign in to comment.