Skip to content

Commit

Permalink
[FLINK-11272][flink-yarn] Support for parsing multiple --yarnship par…
Browse files Browse the repository at this point in the history
…ameters
  • Loading branch information
leesf authored and aljoscha committed Jan 10, 2019
1 parent e8182dc commit f692d37
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 8 deletions.
3 changes: 2 additions & 1 deletion docs/ops/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,8 @@ Action "run" compiles and runs a program.
-ys,--yarnslots <arg> Number of slots per TaskManager
-yst,--yarnstreaming Start Flink in streaming mode
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
(t for transfer), multiple options are
supported.
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container
with optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -311,14 +311,16 @@ private AbstractYarnClusterDescriptor createDescriptor(
}

List<File> shipFiles = new ArrayList<>();
// path to directory to ship
// path to directories to ship
if (cmd.hasOption(shipPath.getOpt())) {
String shipPath = cmd.getOptionValue(this.shipPath.getOpt());
File shipDir = new File(shipPath);
if (shipDir.isDirectory()) {
shipFiles.add(shipDir);
} else {
LOG.warn("Ship directory is not a directory. Ignoring it.");
String[] shipPaths = cmd.getOptionValues(this.shipPath.getOpt());
for (String shipPath : shipPaths) {
File shipDir = new File(shipPath);
if (shipDir.isDirectory()) {
shipFiles.add(shipDir);
} else {
LOG.warn("Ship directory {} is not a directory. Ignoring it.", shipDir.getAbsolutePath());
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,23 @@ public void testHeapMemoryPropertyWithConfigDefaultValue() throws Exception {
assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(1024));
}

@Test
public void testMultipleYarnShipOptions() throws Exception {
final String[] args = new String[]{"run", "--yarnship", tmp.newFolder().getAbsolutePath(), "--yarnship", tmp.newFolder().getAbsolutePath()};
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
new Configuration(),
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");

final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);

AbstractYarnClusterDescriptor flinkYarnDescriptor = flinkYarnSessionCli.createClusterDescriptor(commandLine);

assertEquals(2, flinkYarnDescriptor.shipFiles.size());

}


///////////
// Utils //
Expand Down

0 comments on commit f692d37

Please sign in to comment.