Skip to content

Commit

Permalink
[FLINK-2797][cli] Add support for running jobs in detached mode from CLI
Browse files Browse the repository at this point in the history
This closes apache#1214.
  • Loading branch information
sachingoel0101 authored and mxm committed Nov 13, 2015
1 parent 30647a2 commit b7cf642
Show file tree
Hide file tree
Showing 5 changed files with 59 additions and 23 deletions.
10 changes: 8 additions & 2 deletions docs/apis/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ The command line can be used to

./bin/flink run -q ./examples/WordCount.jar

- Run example program in detached mode

./bin/flink run -d ./examples/WordCount.jar

- Run example program on a specific JobManager:

./bin/flink run -m myJMHost:6123 \
Expand Down Expand Up @@ -128,14 +132,16 @@ Action "run" compiles and runs a program.
program. Optional flag to override the
default value specified in the
configuration.
-q --sysoutLogging Specfying this flag will disable log messages
-q --sysoutLogging Specifying this flag will disable log messages
being reported on the console. All messages
however will still be logged by SLF4J loggers,
regardless of this setting.
-d --detached Specifying this option will run the job in
detached mode.
Additional arguments if -m yarn-cluster is set:
-yD <arg> Dynamic properties
-yd,--yarndetached Start detached
-yd,--yarndetached Start detached [consider using -d flag above]
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container [in
MB]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ protected int run(String[] args) {
int userParallelism = options.getParallelism();
LOG.debug("User parallelism is set to {}", userParallelism);

Client client = getClient(options, program.getMainClassName(), userParallelism);
Client client = getClient(options, program.getMainClassName(), userParallelism, options.getDetachedMode());
client.setPrintStatusDuringExecution(options.getStdoutLogging());
LOG.debug("Client slots is set to {}", client.getMaxSlots());

Expand All @@ -307,16 +307,11 @@ protected int run(String[] args) {
userParallelism = client.getMaxSlots();
}

// check if detached per job yarn cluster is used to start flink
if (yarnCluster != null && yarnCluster.isDetached()) {
logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
// detached mode
if (options.getDetachedMode() || (yarnCluster != null && yarnCluster.isDetached())) {
exitCode = executeProgramDetached(program, client, userParallelism);
}
else {
// regular (blocking) execution.
exitCode = executeProgramBlocking(program, client, userParallelism);
}

Expand Down Expand Up @@ -638,6 +633,14 @@ protected int cancel(String[] args) {
// --------------------------------------------------------------------------------------------

protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
// log message for detached yarn job
if (yarnCluster != null) {
logAndSysout("The Flink YARN client has been started in detached mode. In order to stop " +
"Flink on YARN, use the following command or a YARN web interface to stop it:\n" +
"yarn application -kill " + yarnCluster.getApplicationId() + "\n" +
"Please also note that the temporary files of the YARN session in the home directoy will not be removed.");
}

LOG.info("Starting execution of program");

JobSubmissionResult result;
Expand All @@ -649,7 +652,7 @@ protected int executeProgramDetached(PackagedProgram program, Client client, int
program.deleteExtractedLibraries();
}

if (yarnCluster != null && yarnCluster.isDetached()) {
if (yarnCluster != null) {
yarnCluster.stopAfterJob(result.getJobID());
yarnCluster.disconnect();
}
Expand Down Expand Up @@ -796,7 +799,8 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E
protected Client getClient(
CommandLineOptions options,
String programName,
int userParallelism)
int userParallelism,
boolean detachedMode)
throws Exception {
InetSocketAddress jobManagerAddress;
int maxSlots = -1;
Expand All @@ -811,6 +815,11 @@ protected Client getClient(
throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
}
flinkYarnClient.setName("Flink Application: " + programName);
// in case the main detached mode wasn't set, we don't wanna overwrite the one loaded
// from yarn options.
if (detachedMode) {
flinkYarnClient.setDetachedMode(true);
}

// the number of slots available from YARN:
int yarnTmSlots = flinkYarnClient.getTaskManagerSlots();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public class CliFrontendParser {
static final Option LOGGING_OPTION = new Option("q", "sysoutLogging", false, "If present, " +
"supress logging output to standard out.");

static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
"the job in detached mode");

static final Option ARGS_OPTION = new Option("a", "arguments", true,
"Program arguments. Arguments can also be added without -a, simply as trailing parameters.");

Expand Down Expand Up @@ -94,6 +97,7 @@ public class CliFrontendParser {
PARALLELISM_OPTION.setArgName("parallelism");

LOGGING_OPTION.setRequired(false);
DETACHED_OPTION.setRequired(false);

ARGS_OPTION.setRequired(false);
ARGS_OPTION.setArgName("programArgs");
Expand Down Expand Up @@ -123,6 +127,7 @@ public static Options getProgramSpecificOptions(Options options) {
options.addOption(PARALLELISM_OPTION);
options.addOption(ARGS_OPTION);
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);

// also add the YARN options so that the parser can parse them
yarnSessionCLi.getYARNSessionCLIOptions(options);
Expand All @@ -134,6 +139,7 @@ private static Options getProgramSpecificOptionsWithoutDeprecatedOptions(Options
options.addOption(CLASSPATH_OPTION);
options.addOption(PARALLELISM_OPTION);
options.addOption(LOGGING_OPTION);
options.addOption(DETACHED_OPTION);
return options;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.Arrays;

import static org.apache.flink.client.cli.CliFrontendParser.ARGS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.DETACHED_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.JAR_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASS_OPTION;
import static org.apache.flink.client.cli.CliFrontendParser.CLASSPATH_OPTION;
Expand All @@ -49,6 +50,8 @@ public abstract class ProgramOptions extends CommandLineOptions {

private final boolean stdoutLogging;

private final boolean detachedMode;

protected ProgramOptions(CommandLine line) throws CliArgsException {
super(line);

Expand Down Expand Up @@ -100,11 +103,8 @@ else if (args.length > 0) {
parallelism = -1;
}

if(line.hasOption(LOGGING_OPTION.getOpt())){
stdoutLogging = false;
} else{
stdoutLogging = true;
}
stdoutLogging = !line.hasOption(LOGGING_OPTION.getOpt());
detachedMode = line.hasOption(DETACHED_OPTION.getOpt());
}

public String getJarFilePath() {
Expand All @@ -130,4 +130,8 @@ public int getParallelism() {
public boolean getStdoutLogging() {
return stdoutLogging;
}
}

public boolean getDetachedMode() {
return detachedMode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,21 +52,28 @@ public void testRun() {
// test without parallelism
{
String[] parameters = {"-v", getTestJarPath()};
RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(-1, true);
RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(-1, true, false);
assertEquals(0, testFrontend.run(parameters));
}

// test configure parallelism
{
String[] parameters = {"-v", "-p", "42", getTestJarPath()};
RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(42, true);
RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(42, true, false);
assertEquals(0, testFrontend.run(parameters));
}

// test configure sysout logging
{
String[] parameters = {"-p", "2", "-q", getTestJarPath()};
RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false);
RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, false);
assertEquals(0, testFrontend.run(parameters));
}

// test detached mode
{
String[] parameters = {"-p", "2", "-d", getTestJarPath()};
RunTestingCliFrontend testFrontend = new RunTestingCliFrontend(2, false, true);
assertEquals(0, testFrontend.run(parameters));
}

Expand Down Expand Up @@ -96,27 +103,31 @@ public static final class RunTestingCliFrontend extends CliFrontend {

private final int expectedParallelism;
private final boolean sysoutLogging;
private final boolean isDetached;

public RunTestingCliFrontend(int expectedParallelism, boolean logging) throws Exception {
public RunTestingCliFrontend(int expectedParallelism, boolean logging, boolean isDetached) throws Exception {
super(CliFrontendTestUtils.getConfigDir());
this.expectedParallelism = expectedParallelism;
this.sysoutLogging = logging;
this.isDetached = isDetached;
}

@Override
protected int executeProgramDetached(PackagedProgram program, Client client, int parallelism) {
assertTrue(isDetached);
assertEquals(this.expectedParallelism, parallelism);
assertEquals(this.sysoutLogging, client.getPrintStatusDuringExecution());
return 0;
}

@Override
protected int executeProgramBlocking(PackagedProgram program, Client client, int parallelism) {
assertTrue(!isDetached);
return 0;
}

@Override
protected Client getClient(CommandLineOptions options, String programName, int userParallelism) throws Exception {
protected Client getClient(CommandLineOptions options, String programName, int userParallelism, boolean detached) throws Exception {
return Mockito.mock(Client.class);
}
}
Expand Down

0 comments on commit b7cf642

Please sign in to comment.