Skip to content

Commit

Permalink
[FLINK-2298] Add option to pass a custom name for Flink on YARN
Browse files Browse the repository at this point in the history
This closes apache#876
  • Loading branch information
rmetzger committed Jul 1, 2015
1 parent bd3c8d5 commit a9dc430
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 20 deletions.
2 changes: 2 additions & 0 deletions docs/setup/yarn_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,11 +100,13 @@ Usage:
-D <arg> Dynamic properties
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container [in MB]
-nm,--name Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-st,--streaming Start Flink in streaming mode
-tm,--taskManagerMemory <arg> Memory per TaskManager Container [in MB]

~~~

Please note that the Client requires the `YARN_CONF_DIR` or `HADOOP_CONF_DIR` environment variable to be set to read the YARN and HDFS configuration.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,8 +134,6 @@ public class CliFrontend {

private AbstractFlinkYarnCluster yarnCluster;



/**
*
* @throws Exception Thrown if teh configuration directory was not found, the configuration could not
Expand Down Expand Up @@ -744,7 +742,7 @@ protected Client getClient(CommandLineOptions options, ClassLoader classLoader,
// user wants to run Flink in YARN cluster.
CommandLine commandLine = options.getCommandLine();
AbstractFlinkYarnClient flinkYarnClient = CliFrontendParser.getFlinkYarnSessionCli().createFlinkYarnClient(commandLine);

flinkYarnClient.setName("Flink Application: " + programName);
if (flinkYarnClient == null) {
throw new RuntimeException("Unable to create Flink YARN Client. Check previous log messages");
}
Expand All @@ -763,7 +761,7 @@ protected Client getClient(CommandLineOptions options, ClassLoader classLoader,
}

try {
yarnCluster = flinkYarnClient.deploy("Flink Application: " + programName);
yarnCluster = flinkYarnClient.deploy();
yarnCluster.connectToCluster();
}
catch(Exception e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@ public class FlinkYarnSessionCli {
private final Option SLOTS;
private final Option DETACHED;
private final Option STREAMING;
private final Option NAME;

/**
* Dynamic properties allow the user to specify additional configuration values with -D, such as
Expand All @@ -97,6 +98,7 @@ public FlinkYarnSessionCli(String shortPrefix, String longPrefix) {
DYNAMIC_PROPERTIES = new Option(shortPrefix + "D", true, "Dynamic properties");
DETACHED = new Option(shortPrefix + "d", longPrefix + "detached", false, "Start detached");
STREAMING = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
NAME = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
}

public AbstractFlinkYarnClient createFlinkYarnClient(CommandLine cmd) {
Expand Down Expand Up @@ -220,6 +222,9 @@ public boolean accept(File dir, String name) {
if (cmd.hasOption(STREAMING.getOpt())) {
flinkYarnClient.setStreamingMode(true);
}
if(cmd.hasOption(NAME.getOpt())) {
flinkYarnClient.setName(cmd.getOptionValue(NAME.getOpt()));
}
return flinkYarnClient;
}

Expand All @@ -244,6 +249,7 @@ private void printUsage() {
opt.addOption(DYNAMIC_PROPERTIES);
opt.addOption(DETACHED);
opt.addOption(STREAMING);
opt.addOption(NAME);
formatter.printHelp(" ", opt);
}

Expand Down Expand Up @@ -350,6 +356,7 @@ public void getYARNSessionCLIOptions(Options options) {
options.addOption(DYNAMIC_PROPERTIES);
options.addOption(DETACHED);
options.addOption(STREAMING);
options.addOption(NAME);
}

public int run(String[] args) {
Expand Down Expand Up @@ -393,7 +400,7 @@ public int run(String[] args) {


try {
yarnCluster = flinkYarnClient.deploy(null);
yarnCluster = flinkYarnClient.deploy();
// only connect to cluster if its not a detached session.
if(!flinkYarnClient.isDetached()) {
yarnCluster.connectToCluster();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,8 @@ public abstract class AbstractFlinkYarnClient {
/**
* Trigger the deployment to YARN.
*
* @param clusterName Name to be shown in the YARN resource manager overview.
*/
public abstract AbstractFlinkYarnCluster deploy(String clusterName) throws Exception;
public abstract AbstractFlinkYarnCluster deploy() throws Exception;

/**
* @param detachedMode If true, the Flink YARN client is non-blocking. That means it returns
Expand All @@ -138,4 +137,10 @@ public abstract class AbstractFlinkYarnClient {
* @param streamingMode
*/
public abstract void setStreamingMode(boolean streamingMode);

/**
* Set a name for the YARN application
* @param name
*/
public abstract void setName(String name);
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void testDetachedMode() {
"-n", "1",
"-jm", "768",
"-tm", "1024",
"--name", "MyCustomName", // test setting a custom name
"--detached"},
"Flink JobManager is now running on", RunTypes.YARN_SESSION);

Expand All @@ -142,7 +143,9 @@ public void testDetachedMode() {
yc.start();
List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
Assert.assertEquals(1, apps.size()); // Only one running
ApplicationId id = apps.get(0).getApplicationId();
ApplicationReport app = apps.get(0);
Assert.assertEquals("MyCustomName", app.getName());
ApplicationId id = app.getApplicationId();
yc.killApplication(id);

while(yc.getApplications(EnumSet.of(YarnApplicationState.KILLED)).size() == 0) {
Expand All @@ -166,6 +169,7 @@ public void testTaskManagerFailure() {
"-n", "1",
"-jm", "768",
"-tm", "1024",
"-nm", "customName",
"-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3"},
"Number of connected TaskManagers changed to 1. Slots available: 1",
Expand All @@ -180,7 +184,9 @@ public void testTaskManagerFailure() {
yc.start();
List<ApplicationReport> apps = yc.getApplications(EnumSet.of(YarnApplicationState.RUNNING));
Assert.assertEquals(1, apps.size()); // Only one running
String url = apps.get(0).getTrackingUrl();
ApplicationReport app = apps.get(0);
Assert.assertEquals("customName", app.getName());
String url = app.getTrackingUrl();
if(!url.endsWith("/")) {
url += "/";
}
Expand Down Expand Up @@ -615,7 +621,7 @@ public void testJavaAPI() {
// deploy
AbstractFlinkYarnCluster yarnCluster = null;
try {
yarnCluster = flinkYarnClient.deploy(null);
yarnCluster = flinkYarnClient.deploy();
yarnCluster.connectToCluster();
} catch (Exception e) {
System.err.println("Error while deploying YARN cluster: "+e.getMessage());
Expand Down
32 changes: 22 additions & 10 deletions flink-yarn/src/main/java/org/apache/flink/yarn/FlinkYarnClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public class FlinkYarnClient extends AbstractFlinkYarnClient {
private boolean detached;
private boolean streamingMode;

private String customName = null;

public FlinkYarnClient() {
conf = new YarnConfiguration();
Expand Down Expand Up @@ -314,7 +315,7 @@ public boolean isDetached() {
return detached;
}

public AbstractFlinkYarnCluster deploy(final String clusterName) throws Exception {
public AbstractFlinkYarnCluster deploy() throws Exception {

UserGroupInformation.setConfiguration(conf);
UserGroupInformation ugi = UserGroupInformation.getCurrentUser();
Expand All @@ -327,11 +328,11 @@ public AbstractFlinkYarnCluster deploy(final String clusterName) throws Exceptio
return ugi.doAs(new PrivilegedExceptionAction<AbstractFlinkYarnCluster>() {
@Override
public AbstractFlinkYarnCluster run() throws Exception {
return deployInternal(clusterName);
return deployInternal();
}
});
} else {
return deployInternal(clusterName);
return deployInternal();
}
}

Expand All @@ -341,7 +342,7 @@ public AbstractFlinkYarnCluster run() throws Exception {
* This method will block until the ApplicationMaster/JobManager have been
* deployed on YARN.
*/
protected AbstractFlinkYarnCluster deployInternal(String clusterName) throws Exception {
protected AbstractFlinkYarnCluster deployInternal() throws Exception {
isReadyForDepoyment();

LOG.info("Using values:");
Expand Down Expand Up @@ -591,14 +592,17 @@ protected AbstractFlinkYarnCluster deployInternal(String clusterName) throws Exc
capability.setMemory(jobManagerMemoryMb);
capability.setVirtualCores(1);

if(clusterName == null) {
clusterName = "Flink session with "+taskManagerCount+" TaskManagers";
}
if(detached) {
clusterName += " (detached)";
String name;
if(customName == null) {
name = "Flink session with "+taskManagerCount+" TaskManagers";
if(detached) {
name += " (detached)";
}
} else {
name = customName;
}

appContext.setApplicationName(clusterName); // application name
appContext.setApplicationName(name); // application name
appContext.setApplicationType("Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);
Expand Down Expand Up @@ -734,6 +738,14 @@ public void setStreamingMode(boolean streamingMode) {
this.streamingMode = streamingMode;
}

@Override
public void setName(String name) {
if(name == null) {
throw new IllegalArgumentException("The passed name is null");
}
customName = name;
}

public static class YarnDeploymentException extends RuntimeException {
public YarnDeploymentException() {
}
Expand Down

0 comments on commit a9dc430

Please sign in to comment.