Skip to content

Commit

Permalink
[FLINK-3293] [yarn] Respect custom CLI Yarn name in JobManager mode
Browse files Browse the repository at this point in the history
Added a method to set a default application name for the Flink Yarn session CLI.
Switched the order, such that this name can now be overwritten by the command line.

This closes apache#1558
  • Loading branch information
jkirsch authored and fhueske committed Feb 3, 2016
1 parent 1198664 commit 934774f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -935,13 +935,20 @@ protected Client getClient(
if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
logAndSysout("YARN cluster mode detected. Switching Log4j output to console");

// Default yarn application name to use, if nothing is specified on the command line
String applicationName = "Flink Application: " + programName;

// user wants to run Flink in YARN cluster.
CommandLine commandLine = options.getCommandLine();
AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);
AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser
.getFlinkYarnSessionCli()
.withDefaultApplicationName(applicationName)
.createFlinkYarnClient(commandLine);

if (flinkYarnClient == null) {
throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
}
flinkYarnClient.setName("Flink Application: " + programName);

// in case the main detached mode wasn't set, we don't wanna overwrite the one loaded
// from yarn options.
if (detachedMode) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ public class FlinkYarnSessionCli {
private AbstractFlinkYarnCluster yarnCluster = null;
private boolean detachedMode = false;

/** Default yarn application name. */
private String defaultApplicationName = null;

public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
QUERY = new Option(shortPrefix + "q", longPrefix + "query", false, "Display available YARN resources (memory, cores)");
QUEUE = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
Expand All @@ -102,6 +105,11 @@ public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
}

/**
* Creates a new Yarn Client.
* @param cmd the command line to parse options from
* @return an instance of the client or null if there was an error
*/
public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {

AbstractFlinkYarnClient flinkYarnClient = getFlinkYarnClient();
Expand Down Expand Up @@ -222,7 +230,13 @@ public boolean accept(File dir, String name) {

if(cmd.hasOption(NAME.getOpt())) {
flinkYarnClient.setName(cmd.getOptionValue(NAME.getOpt()));
} else {
// set the default application name, if none is specified
if(defaultApplicationName != null) {
flinkYarnClient.setName(defaultApplicationName);
}
}

return flinkYarnClient;
}

Expand Down Expand Up @@ -466,6 +480,16 @@ public int run(String[] args) {
return 0;
}

/**
* Sets the default Yarn Application Name.
* @param defaultApplicationName the name of the yarn application to use
* @return FlinkYarnSessionCli instance, for chaining
*/
public FlinkYarnSessionCli withDefaultApplicationName(String defaultApplicationName) {
this.defaultApplicationName = defaultApplicationName;
return this;
}

/**
* Utility method for tests.
*/
Expand Down

0 comments on commit 934774f

Please sign in to comment.