Skip to content

Commit

Permalink
[FLINK-9777] YARN: JM and TM Memory must be specified with Units
Browse files Browse the repository at this point in the history
This closes apache#6297
  • Loading branch information
yanghua authored and dawidwys committed Jul 18, 2018
1 parent 056486a commit 4095a31
Show file tree
Hide file tree
Showing 8 changed files with 167 additions and 22 deletions.
4 changes: 2 additions & 2 deletions docs/dev/scala_shell.md
Original file line number Diff line number Diff line change
Expand Up @@ -175,15 +175,15 @@ Starts Flink scala shell connecting to a yarn cluster
-n arg | --container arg
Number of YARN container to allocate (= Number of TaskManagers)
-jm arg | --jobManagerMemory arg
Memory for JobManager container [in MB]
Memory for JobManager container with optional unit (default: MB)
-nm <value> | --name <value>
Set a custom name for the application on YARN
-qu <arg> | --queue <arg>
Specifies YARN queue
-s <arg> | --slots <arg>
Number of slots per TaskManager
-tm <arg> | --taskManagerMemory <arg>
Memory per TaskManager container [in MB]
Memory per TaskManager container with optional unit (default: MB)
-a <path/to/jar> | --addclasspath <path/to/jar>
Specifies additional jars to be used in Flink
--configDir <value>
Expand Down
8 changes: 4 additions & 4 deletions docs/ops/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ Action "run" compiles and runs a program.
-yh,--yarnhelp Help for the Yarn session CLI.
-yid,--yarnapplicationId <arg> Attach to running YARN session
-yj,--yarnjar <arg> Path to Flink jar file
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container [in
MB]
-yjm,--yarnjobManagerMemory <arg> Memory for JobManager Container
with optional unit (default: MB)
-yn,--yarncontainer <arg> Number of YARN container to allocate
(=Number of Task Managers)
-ynm,--yarnname <arg> Set a custom name for the application
Expand All @@ -285,8 +285,8 @@ Action "run" compiles and runs a program.
-yst,--yarnstreaming Start Flink in streaming mode
-yt,--yarnship <arg> Ship files in the specified directory
(t for transfer)
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container [in
MB]
-ytm,--yarntaskManagerMemory <arg> Memory per TaskManager Container
with optional unit (default: MB)
-yz,--yarnzookeeperNamespace <arg> Namespace to create the Zookeeper
sub-paths for high availability mode
-ynl,--yarnnodeLabel <arg> Specify YARN node label for
Expand Down
8 changes: 4 additions & 4 deletions docs/ops/deployment/yarn_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Start a YARN session with 4 Task Managers (each with 4 GB of Heapspace):
curl -O <flink_hadoop2_download_url>
tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
cd flink-{{ site.version }}/
./bin/yarn-session.sh -n 4 -jm 1024 -tm 4096
./bin/yarn-session.sh -n 4 -jm 1024m -tm 4096m
{% endhighlight %}

Specify the `-s` flag for the number of processing slots per Task Manager. We recommend to set the number of slots to the number of processors per machine.
Expand All @@ -53,7 +53,7 @@ Once the session has been started, you can submit jobs to the cluster using the
curl -O <flink_hadoop2_download_url>
tar xvzf flink-{{ site.version }}-bin-hadoop2.tgz
cd flink-{{ site.version }}/
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024 -ytm 4096 ./examples/batch/WordCount.jar
./bin/flink run -m yarn-cluster -yn 4 -yjm 1024m -ytm 4096m ./examples/batch/WordCount.jar
{% endhighlight %}

## Flink YARN Session
Expand Down Expand Up @@ -101,12 +101,12 @@ Usage:
Optional
-D <arg> Dynamic properties
-d,--detached Start detached
-jm,--jobManagerMemory <arg> Memory for JobManager Container [in MB]
-jm,--jobManagerMemory <arg> Memory for JobManager Container with optional unit (default: MB)
-nm,--name Set a custom name for the application on YARN
-q,--query Display available YARN resources (memory, cores)
-qu,--queue <arg> Specify YARN queue.
-s,--slots <arg> Number of slots per TaskManager
-tm,--taskManagerMemory <arg> Memory per TaskManager Container [in MB]
-tm,--taskManagerMemory <arg> Memory per TaskManager Container with optional unit (default: MB)
-z,--zookeeperNamespace <arg> Namespace to create the Zookeeper sub-paths for HA mode
{% endhighlight %}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,7 @@
package org.apache.flink.client.deployment;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.TaskManagerOptions;

/**
Expand Down Expand Up @@ -68,8 +67,8 @@ public String toString() {
public static ClusterSpecification fromConfiguration(Configuration configuration) {
int slots = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS, 1);

int jobManagerMemoryMb = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
int taskManagerMemoryMb = MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
int jobManagerMemoryMb = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();
int taskManagerMemoryMb = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();

return new ClusterSpecificationBuilder()
.setMasterMemoryMB(jobManagerMemoryMb)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,44 @@ public class ConfigurationUtils {

private static final String[] EMPTY = new String[0];

/**
* Get job manager's heap memory. This method will check the new key
* {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY} and
* the old key {@link JobManagerOptions#JOB_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
*
* @param configuration the configuration object
* @return the memory size of job manager's heap memory.
*/
public static MemorySize getJobManagerHeapMemory(Configuration configuration) {
if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY.key())) {
return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
} else if (configuration.containsKey(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB.key())) {
return MemorySize.parse(configuration.getInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB) + "m");
} else {
//use default value
return MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY));
}
}

/**
* Get task manager's heap memory. This method will check the new key
* {@link TaskManagerOptions#TASK_MANAGER_HEAP_MEMORY} and
* the old key {@link TaskManagerOptions#TASK_MANAGER_HEAP_MEMORY_MB} for backwards compatibility.
*
* @param configuration the configuration object
* @return the memory size of task manager's heap memory.
*/
public static MemorySize getTaskManagerHeapMemory(Configuration configuration) {
if (configuration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY.key())) {
return MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY));
} else if (configuration.containsKey(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB.key())) {
return MemorySize.parse(configuration.getInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB) + "m");
} else {
//use default value
return MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY));
}
}

/**
* Extracts the task manager directories for temporary files as defined by
* {@link org.apache.flink.configuration.CoreOptions#TMP_DIRS}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.runtime.clusterframework.ApplicationStatus;
import org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
Expand Down Expand Up @@ -163,7 +163,7 @@ public YarnResourceManager(

this.webInterfaceUrl = webInterfaceUrl;
this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
this.defaultTaskManagerMemoryMB = MemorySize.parse(flinkConfig.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
import org.apache.flink.configuration.CoreOptions;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.JobManagerOptions;
Expand Down Expand Up @@ -190,8 +191,8 @@ public FlinkYarnSessionCli(
queue = new Option(shortPrefix + "qu", longPrefix + "queue", true, "Specify YARN queue.");
shipPath = new Option(shortPrefix + "t", longPrefix + "ship", true, "Ship files in the specified directory (t for transfer)");
flinkJar = new Option(shortPrefix + "j", longPrefix + "jar", true, "Path to Flink jar file");
jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container [in MB]");
tmMemory = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container [in MB]");
jmMemory = new Option(shortPrefix + "jm", longPrefix + "jobManagerMemory", true, "Memory for JobManager Container with optional unit (default: MB)");
tmMemory = new Option(shortPrefix + "tm", longPrefix + "taskManagerMemory", true, "Memory per TaskManager Container with optional unit (default: MB)");
container = new Option(shortPrefix + "n", longPrefix + "container", true, "Number of YARN container to allocate (=Number of Task Managers)");
slots = new Option(shortPrefix + "s", longPrefix + "slots", true, "Number of slots per TaskManager");
dynamicproperties = Option.builder(shortPrefix + "D")
Expand Down Expand Up @@ -386,10 +387,10 @@ private ClusterSpecification createClusterSpecification(Configuration configurat
}

// JobManager Memory
final int jobManagerMemoryMB = MemorySize.parse(configuration.getString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY)).getMebiBytes();
final int jobManagerMemoryMB = ConfigurationUtils.getJobManagerHeapMemory(configuration).getMebiBytes();

// Task Managers memory
final int taskManagerMemoryMB = MemorySize.parse(configuration.getString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY)).getMebiBytes();
final int taskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(configuration).getMebiBytes();

int slotsPerTaskManager = configuration.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);

Expand Down Expand Up @@ -500,11 +501,19 @@ protected Configuration applyCommandLineOptionsToConfiguration(CommandLine comma
}

if (commandLine.hasOption(jmMemory.getOpt())) {
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(jmMemory.getOpt()));
String jmMemoryVal = commandLine.getOptionValue(jmMemory.getOpt());
if (!MemorySize.MemoryUnit.hasUnit(jmMemoryVal)) {
jmMemoryVal += "m";
}
effectiveConfiguration.setString(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY, jmMemoryVal);
}

if (commandLine.hasOption(tmMemory.getOpt())) {
effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, commandLine.getOptionValue(tmMemory.getOpt()));
String tmMemoryVal = commandLine.getOptionValue(tmMemory.getOpt());
if (!MemorySize.MemoryUnit.hasUnit(tmMemoryVal)) {
tmMemoryVal += "m";
}
effectiveConfiguration.setString(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY, tmMemoryVal);
}

if (commandLine.hasOption(slots.getOpt())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,105 @@ public void testConfigurationClusterSpecification() throws Exception {
assertThat(clusterSpecification.getSlotsPerTaskManager(), is(slotsPerTaskManager));
}

/**
* Tests the specifying heap memory without unit for job manager and task manager.
*/
@Test
public void testHeapMemoryPropertyWithoutUnit() throws Exception {
final String[] args = new String[] { "-yn", "2", "-yjm", "1024", "-ytm", "2048" };
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
new Configuration(),
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");

final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);

final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);

assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
}

/**
* Tests the specifying heap memory with unit (MB) for job manager and task manager.
*/
@Test
public void testHeapMemoryPropertyWithUnitMB() throws Exception {
final String[] args = new String[] { "-yn", "2", "-yjm", "1024m", "-ytm", "2048m" };
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
new Configuration(),
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);

assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
}

/**
* Tests the specifying heap memory with arbitrary unit for job manager and task manager.
*/
@Test
public void testHeapMemoryPropertyWithArbitraryUnit() throws Exception {
final String[] args = new String[] { "-yn", "2", "-yjm", "1g", "-ytm", "2g" };
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
new Configuration(),
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");
final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(args, false);
final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);

assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(2048));
}

/**
* Tests the specifying heap memory with old config key for job manager and task manager.
*/
@Test
public void testHeapMemoryPropertyWithOldConfigKey() throws Exception {
Configuration configuration = new Configuration();
configuration.setInteger(JobManagerOptions.JOB_MANAGER_HEAP_MEMORY_MB, 2048);
configuration.setInteger(TaskManagerOptions.TASK_MANAGER_HEAP_MEMORY_MB, 4096);

final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
configuration,
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");

final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);

final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);

assertThat(clusterSpecification.getMasterMemoryMB(), is(2048));
assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(4096));
}

/**
* Tests the specifying heap memory with config default value for job manager and task manager.
*/
@Test
public void testHeapMemoryPropertyWithConfigDefaultValue() throws Exception {
final FlinkYarnSessionCli flinkYarnSessionCli = new FlinkYarnSessionCli(
new Configuration(),
tmp.getRoot().getAbsolutePath(),
"y",
"yarn");

final CommandLine commandLine = flinkYarnSessionCli.parseCommandLineOptions(new String[0], false);

final ClusterSpecification clusterSpecification = flinkYarnSessionCli.getClusterSpecification(commandLine);

assertThat(clusterSpecification.getMasterMemoryMB(), is(1024));
assertThat(clusterSpecification.getTaskManagerMemoryMB(), is(1024));
}


///////////
// Utils //
///////////
Expand Down

0 comments on commit 4095a31

Please sign in to comment.