Skip to content

Commit

Permalink
[FLINK-1771] Add support for submitting single jobs to a detached YAR…
Browse files Browse the repository at this point in the history
…N session

With this change, users can submit a Flink job to a YARN cluster without having a local client monitoring the Application Master or job. You can basically fire and forget a Flink job to YARN.
For supporting this, the ApplicationMaster can now monitor the status of a job and shutdown itself once it is in a terminal state.

The change also verifies that various ways of setting the parallelism on YARN are passed through the system correctly (per job, session).

There was a bug in YARN container creation which made the configuration values for the heap offset useless. This change fixes this error.

All mentioned features and bugs are covered by the flink-yarn-tests.

This closes apache#542
  • Loading branch information
rmetzger committed Mar 31, 2015
1 parent 121a5a0 commit 6b0d407
Show file tree
Hide file tree
Showing 199 changed files with 1,125 additions and 635 deletions.
116 changes: 88 additions & 28 deletions flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import akka.util.Timeout;

import org.apache.commons.cli.CommandLine;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.client.cli.CancelOptions;
import org.apache.flink.client.cli.CliArgsException;
import org.apache.flink.client.cli.CliFrontendParser;
Expand All @@ -66,7 +67,7 @@
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.runtime.jobgraph.JobID;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
Expand Down Expand Up @@ -265,12 +266,32 @@ protected int run(String[] args) {
}

try {
Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName());

int parallelism = options.getParallelism();
int exitCode = executeProgram(program, client, parallelism);

if (yarnCluster != null) {
int userParallelism = options.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);

Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), userParallelism);
LOG.debug("Client slots is set to {}", client.getMaxSlots());
if(client.getMaxSlots() != -1 && userParallelism == -1) {
logAndSysout("Using the parallelism provided by the remote cluster ("+client.getMaxSlots()+"). " +
"To use another parallelism, set it at the ./bin/flink client.");
userParallelism = client.getMaxSlots();
}
int exitCode = 0;

// check if detached per job yarn cluster is used to start flink
if(yarnCluster != null && yarnCluster.isDetached()) {
logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
executeProgram(program, client, userParallelism, false);
} else {
// regular (blocking) execution.
exitCode = executeProgram(program, client, userParallelism, true);
}

// show YARN cluster status if its not a detached YARN cluster.
if (yarnCluster != null && !yarnCluster.isDetached()) {
List<String> msgs = yarnCluster.getNewMessages();
if (msgs != null && msgs.size() > 1) {

Expand All @@ -291,7 +312,7 @@ protected int run(String[] args) {
return handleError(t);
}
finally {
if (yarnCluster != null) {
if (yarnCluster != null && !yarnCluster.isDetached()) {
logAndSysout("Shutting down YARN cluster");
yarnCluster.shutdown();
}
Expand Down Expand Up @@ -346,7 +367,7 @@ protected int info(String[] args) {
int parallelism = options.getParallelism();

LOG.info("Creating program plan dump");
Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName());
Client client = getClient(options, program.getUserCodeClassLoader(), program.getMainClassName(), parallelism);
String jsonPlan = client.getOptimizedPlanAsJson(program, parallelism);

if (jsonPlan != null) {
Expand Down Expand Up @@ -555,12 +576,12 @@ protected int cancel(String[] args) {
// Interaction with programs and JobManager
// --------------------------------------------------------------------------------------------

protected int executeProgram(PackagedProgram program, Client client, int parallelism) {
LOG.info("Starting execution or program");
JobExecutionResult execResult;
protected int executeProgram(PackagedProgram program, Client client, int parallelism, boolean wait) {
LOG.info("Starting execution of program");
JobSubmissionResult execResult;
try {
client.setPrintStatusDuringExecution(true);
execResult = client.run(program, parallelism, true);
execResult = client.run(program, parallelism, wait);
}
catch (ProgramInvocationException e) {
return handleError(e);
Expand All @@ -569,15 +590,33 @@ protected int executeProgram(PackagedProgram program, Client client, int paralle
program.deleteExtractedLibraries();
}

LOG.info("Program execution finished");
if(wait) {
LOG.info("Program execution finished");
}

// we come here after the job has finished
// we come here after the job has finished (or the job has been submitted)
if (execResult != null) {
System.out.println("Job Runtime: " + execResult.getNetRuntime());
Map<String, Object> accumulatorsResult = execResult.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0) {
System.out.println("Accumulator Results: ");
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
// if the job has been submitted to a detached YARN cluster, there won't be any
// exec results, but the object will be set (for the job id)
if (yarnCluster != null && yarnCluster.isDetached()) {
if(execResult.getJobID() == null) {
throw new RuntimeException("Error while starting job. No Job ID set.");
}
yarnCluster.stopAfterJob(execResult.getJobID());
yarnCluster.disconnect();
System.out.println("The Job has been submitted with JobID "+execResult.getJobID());
return 0;
}
if (execResult instanceof JobExecutionResult) {
JobExecutionResult result = (JobExecutionResult) execResult;
System.out.println("Job Runtime: " + result.getNetRuntime());
Map<String, Object> accumulatorsResult = result.getAllAccumulatorResults();
if (accumulatorsResult.size() > 0) {
System.out.println("Accumulator Results: ");
System.out.println(AccumulatorHelper.getResultsFormated(accumulatorsResult));
}
} else {
LOG.info("The Job did not return an execution result");
}
}
return 0;
Expand Down Expand Up @@ -681,26 +720,47 @@ protected ActorRef getJobManager(CommandLineOptions options) throws Exception {
LOG.info("JobManager is at " + jmActor.path());
return jmActor;
}



protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName) throws Exception {

/**
*
* @param options
* @param classLoader
* @param programName
* @param userParallelism The parallelism requested by the user in the CLI frontend.
* @return
* @throws Exception
*/
protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName, int userParallelism) throws Exception {
InetSocketAddress jobManagerAddress;

int maxSlots = -1;
if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
logAndSysout("YARN cluster mode detected. Switching Log4j output to console");

// user wants to run Flink in YARN cluster.
CommandLine commandLine = options.getCommandLine();
AbstractFlinkYarnClient flinkYarnClient =
CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);
AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);

if (flinkYarnClient == null) {
throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
}

// the number of slots available from YARN:
int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
if(yarnTmSlots == -1) {
yarnTmSlots = 1;
}
maxSlots = yarnTmSlots * flinkYarnClient.getTaskManagerCount();
if(userParallelism != -1) {
int slotsPerTM = userParallelism / flinkYarnClient.getTaskManagerCount();
logAndSysout("The YARN cluster has "+maxSlots+" slots available, but the user requested a parallelism of "+userParallelism+" on YARN. " +
"Each of the "+flinkYarnClient.getTaskManagerCount()+" TaskManagers will get "+slotsPerTM+" slots.");
flinkYarnClient.setTaskManagerSlots(slotsPerTM);
}

try {
yarnCluster = flinkYarnClient.deploy("Flink Application: " + programName);
yarnCluster.connectToCluster();
}
catch(Exception e) {
throw new RuntimeException("Error deploying the YARN cluster", e);
Expand All @@ -722,7 +782,7 @@ protected Client getClient(CommandLineOptions options, ClassLoader classLoader,
break;
}
} else {
logAndSysout("No status updates from YARN cluster received so far. Waiting ...");
logAndSysout("No status updates from the YARN cluster received so far. Waiting ...");
}

try {
Expand All @@ -738,7 +798,7 @@ protected Client getClient(CommandLineOptions options, ClassLoader classLoader,
else {
jobManagerAddress = getJobManagerAddress(options);
}
return new Client(jobManagerAddress, config, classLoader);
return new Client(jobManagerAddress, config, classLoader, maxSlots);
}

// --------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,12 +122,6 @@ public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
} else {
LOG.info("No path for the flink jar passed. Using the location of "+flinkYarnClient.getClass()+" to locate the jar");
localJarPath = new Path("file:https://"+flinkYarnClient.getClass().getProtectionDomain().getCodeSource().getLocation().getPath());
if(!localJarPath.toString().contains("uberjar")) {
// we need to have a proper uberjar because otherwise we don't have the required classes available on the cluster.
// most likely the user did try to start yarn in a regular hadoop2 flink build (not a yarn package) (using ./bin/flink -m yarn-cluster)
LOG.error("The detected jar file '"+localJarPath+"' is not a uberjar.");
return null;
}
}

flinkYarnClient.setLocalJarPath(localJarPath);
Expand Down Expand Up @@ -392,6 +386,10 @@ public int run(String[] args) {

try {
yarnCluster = flinkYarnClient.deploy(null);
// only connect to cluster if its not a detached session.
if(!flinkYarnClient.isDetached()) {
yarnCluster.connectToCluster();
}
} catch (Exception e) {
System.err.println("Error while deploying YARN cluster: "+e.getMessage());
e.printStackTrace(System.err);
Expand Down Expand Up @@ -423,7 +421,7 @@ public int run(String[] args) {

if (detachedMode) {
// print info and quit:
LOG.info("The Flink YARN client has been started in detached mode. In order to stop" +
LOG.info("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill "+yarnCluster.getApplicationId()+"\n" +
"Please also note that the temporary files of the YARN session in {} will not be removed.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,20 @@ public void setPrintStatusDuringExecution(boolean printStatus) {
}

// --------------------------------------------------------------------------------------------


public static Configuration getConfiguration(LocalExecutor le) {
Configuration configuration = new Configuration();
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, le.getTaskManagerNumSlots());
configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, le.isDefaultOverwriteFiles());
return configuration;
}

public void start() throws Exception {
synchronized (this.lock) {
if (this.flink == null) {

// create the embedded runtime
Configuration configuration = new Configuration();
configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, getTaskManagerNumSlots());
configuration.setBoolean(ConfigConstants.FILESYSTEM_DEFAULT_OVERWRITE_KEY, isDefaultOverwriteFiles());
Configuration configuration = getConfiguration(this);
// start it up
this.flink = new LocalFlinkMiniCluster(configuration, true);
} else {
Expand Down Expand Up @@ -158,7 +163,7 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
}

try {
Optimizer pc = new Optimizer(new DataStatistics());
Optimizer pc = new Optimizer(new DataStatistics(), this.flink.getConfiguration());
OptimizedPlan op = pc.compile(plan);

JobGraphGenerator jgg = new JobGraphGenerator();
Expand Down Expand Up @@ -186,7 +191,7 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
* @throws Exception
*/
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
Optimizer pc = new Optimizer(new DataStatistics());
Optimizer pc = new Optimizer(new DataStatistics(), getConfiguration(this));
OptimizedPlan op = pc.compile(plan);
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();

Expand Down Expand Up @@ -242,7 +247,7 @@ public static String optimizerPlanAsJSON(Plan plan) throws Exception {
LocalExecutor exec = new LocalExecutor();
try {
exec.start();
Optimizer pc = new Optimizer(new DataStatistics());
Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.getConfiguration());
OptimizedPlan op = pc.compile(plan);
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.util.List;

import org.apache.flink.api.common.JobExecutionResult;
import org.apache.flink.api.common.JobSubmissionResult;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.PlanExecutor;
import org.apache.flink.client.program.Client;
Expand All @@ -35,11 +36,13 @@
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RemoteExecutor extends PlanExecutor {
private static final Logger LOG = LoggerFactory.getLogger(RemoteExecutor.class);

private final List<String> jarFiles;

private final InetSocketAddress address;

public RemoteExecutor(String hostname, int port) {
Expand Down Expand Up @@ -86,22 +89,34 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
}

public JobExecutionResult executePlanWithJars(JobWithJars p) throws Exception {
Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader());
return c.run(p, -1, true);
Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader(), -1);
JobSubmissionResult result = c.run(p, -1, true);
if(result instanceof JobExecutionResult) {
return (JobExecutionResult) result;
} else {
LOG.warn("The Client didn't return a JobExecutionResult");
return new JobExecutionResult(result.getJobID(), -1, null);
}
}

public JobExecutionResult executeJar(String jarPath, String assemblerClass, String... args) throws Exception {
File jarFile = new File(jarPath);
PackagedProgram program = new PackagedProgram(jarFile, assemblerClass, args);

Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader());
return c.run(program.getPlanWithJars(), -1, true);
Client c = new Client(this.address, new Configuration(), program.getUserCodeClassLoader(), -1);
JobSubmissionResult result = c.run(program.getPlanWithJars(), -1, true);
if(result instanceof JobExecutionResult) {
return (JobExecutionResult) result;
} else {
LOG.warn("The Client didn't return a JobExecutionResult");
return new JobExecutionResult(result.getJobID(), -1, null);
}
}

@Override
public String getOptimizerPlanAsJSON(Plan plan) throws Exception {
JobWithJars p = new JobWithJars(plan, this.jarFiles);
Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader());
Client c = new Client(this.address, new Configuration(), p.getUserCodeClassLoader(), -1);

OptimizedPlan op = (OptimizedPlan) c.getOptimizedPlan(p, -1);
PlanJSONDumpGenerator jsonGen = new PlanJSONDumpGenerator();
Expand Down
Loading

0 comments on commit 6b0d407

Please sign in to comment.