Skip to content

Commit

Permalink
[FLINK-11910] [YARN] Add customizable yarn application type
Browse files Browse the repository at this point in the history
This closes apache#7978
  • Loading branch information
HuangZhenQiu authored and rmetzger committed Apr 23, 2019
1 parent 945eb30 commit 2a139b8
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/ops/deployment/yarn_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ Usage:
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-nm,--name Set a custom name for the application on YARN
-at,--applicationType Set a custom application type on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,9 +116,13 @@ public void testDetachedMode() throws InterruptedException, IOException {
if (SecureTestEnvironment.getHadoopServicePrincipal() != null) {
args.add("-D" + SecurityOptions.KERBEROS_LOGIN_PRINCIPAL.key() + "=" + SecureTestEnvironment.getHadoopServicePrincipal());
}

args.add("--name");
args.add("MyCustomName");

args.add("--applicationType");
args.add("Apache Flink 1.x");

args.add("--detached");

Runner clusterRunner =
Expand Down Expand Up @@ -167,6 +171,7 @@ public void testDetachedMode() throws InterruptedException, IOException {
ApplicationReport app = apps.get(0);

Assert.assertEquals("MyCustomName", app.getName());
Assert.assertEquals("Apache Flink 1.x", app.getApplicationType());
ApplicationId id = app.getApplicationId();
yc.killApplication(id);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,8 @@ public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor

private String nodeLabel;

private String applicationType;

/** Optional Jar file to include in the system class loader of all application nodes
* (for per-job submission). */
private final Set<File> userJarFiles = new HashSet<>();
Expand Down Expand Up @@ -986,7 +988,7 @@ public ApplicationReport startAppMaster(
final String customApplicationName = customName != null ? customName : applicationName;

appContext.setApplicationName(customApplicationName);
appContext.setApplicationType("Apache Flink");
appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);

Expand Down Expand Up @@ -1279,10 +1281,11 @@ public String getClusterDescription() {
}

public void setName(String name) {
if (name == null) {
throw new IllegalArgumentException("The passed name is null");
}
customName = name;
this.customName = Preconditions.checkNotNull(name, "The customized name must not be null");
}

public void setApplicationType(String type) {
this.applicationType = Preconditions.checkNotNull(type, "The customized application type must not be null");
}

private void activateHighAvailabilitySupport(ApplicationSubmissionContext appContext) throws
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,7 @@ public class FlinkYarnSessionCli extends AbstractCustomCommandLine<ApplicationId
@Deprecated
private final Option streaming;
private final Option name;
private final Option applicationType;

private final Options allOptions;

Expand Down Expand Up @@ -201,6 +202,7 @@ public FlinkYarnSessionCli(
.build();
streaming = new Option(shortPrefix + "st", longPrefix + "streaming", false, "Start Flink in streaming mode");
name = new Option(shortPrefix + "nm", longPrefix + "name", true, "Set a custom name for the application on YARN");
applicationType = new Option(shortPrefix + "at", longPrefix + "applicationType", true, "Set a custom application type for the application on YARN");
zookeeperNamespace = new Option(shortPrefix + "z", longPrefix + "zookeeperNamespace", true, "Namespace to create the Zookeeper sub-paths for high availability mode");
nodeLabel = new Option(shortPrefix + "nl", longPrefix + "nodeLabel", true, "Specify YARN node label for the YARN application");
help = new Option(shortPrefix + "h", longPrefix + "help", false, "Help for the Yarn session CLI.");
Expand All @@ -221,6 +223,7 @@ public FlinkYarnSessionCli(
allOptions.addOption(streaming);
allOptions.addOption(name);
allOptions.addOption(applicationId);
allOptions.addOption(applicationType);
allOptions.addOption(zookeeperNamespace);
allOptions.addOption(nodeLabel);
allOptions.addOption(help);
Expand Down Expand Up @@ -358,6 +361,10 @@ private AbstractYarnClusterDescriptor createDescriptor(
yarnClusterDescriptor.setName(cmd.getOptionValue(name.getOpt()));
}

if (cmd.hasOption(applicationType.getOpt())) {
yarnClusterDescriptor.setApplicationType(cmd.getOptionValue(applicationType.getOpt()));
}

if (cmd.hasOption(zookeeperNamespace.getOpt())) {
String zookeeperNamespaceValue = cmd.getOptionValue(this.zookeeperNamespace.getOpt());
yarnClusterDescriptor.setZookeeperNamespace(zookeeperNamespaceValue);
Expand Down

0 comments on commit 2a139b8

Please sign in to comment.