Skip to content

Commit

Permalink
[FLINK-13548][Deployment/YARN]Support priority of the Flink YARN appl…
Browse files Browse the repository at this point in the history
…ication.
  • Loading branch information
tianboxiu authored and tillrohrmann committed Aug 30, 2019
1 parent 60b65c4 commit a9d34dd
Show file tree
Hide file tree
Showing 6 changed files with 49 additions and 0 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/yarn_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@
<td style="word-wrap: break-word;">"0"</td>
<td>With this configuration option, users can specify a port, a range of ports or a list of ports for the Application Master (and JobManager) RPC port. By default we recommend using the default value (0) to let the operating system choose an appropriate port. In particular when multiple AMs are running on the same physical host, fixed port assignments prevent the AM from starting. For example when running Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of allowed ports.</td>
</tr>
<tr>
<td><h5>yarn.application.priority</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>A non-negative integer indicating the priority for submitting a Flink YARN application. It will only take effect if YARN priority scheduling setting is enabled. Larger integer corresponds with higher priority. If priority is negative or set to '-1'(default), Flink will unset yarn priority setting and use cluster default priority. Please refer to YARN's official documentation for specific settings required to enable priority scheduling for the targeted YARN version.</td>
</tr>
<tr>
<td><h5>yarn.appmaster.rpc.address</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
9 changes: 9 additions & 0 deletions docs/ops/deployment/yarn_setup.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ Flink's YARN client has the following configuration parameters to control how to

- `yarn.application-attempts`: The number of ApplicationMaster (+ its TaskManager containers) attempts. If this value is set to 1 (default), the entire YARN session will fail when the Application master fails. Higher values specify the number of restarts of the ApplicationMaster by YARN.

## Setup for application priority on YARN

Flink's YARN client has the following configuration parameters to setup application priority. These parameters can be set either from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` parameters.

- `yarn.application.priority`: A non-negative integer indicating the priority for submitting a Flink YARN application.
It will only take effect if YARN priority scheduling setting is enabled. Larger integer corresponds with higher priority.
If priority is negative or set to '-1'(default), Flink will unset yarn priority setting and use cluster default priority.
Please refer to YARN's official documentation for specific settings required to enable priority scheduling for the targeted YARN version.

## Debugging a failed YARN session

There are many reasons why a Flink YARN session deployment can fail. A misconfigured Hadoop setup (HDFS permissions, YARN configuration), version incompatibilities (running Flink with vanilla Hadoop dependencies on Cloudera Hadoop) or other errors.
Expand Down
9 changes: 9 additions & 0 deletions docs/ops/deployment/yarn_setup.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,15 @@ Flink's YARN client has the following configuration parameters to control how to

- `yarn.application-attempts`: The number of ApplicationMaster (+ its TaskManager containers) attempts. If this value is set to 1 (default), the entire YARN session will fail when the Application master fails. Higher values specify the number of restarts of the ApplicationMaster by YARN.

## Setup for application priority on YARN

Flink's YARN client has the following configuration parameters to setup application priority. These parameters can be set either from the `conf/flink-conf.yaml` or when starting the YARN session, using `-D` parameters.

- `yarn.application.priority`: A non-negative integer indicating the priority for submitting a Flink YARN application.
It will only take effect if YARN priority scheduling setting is enabled. Larger integer corresponds with higher priority.
If priority is negative or set to '-1'(default), Flink will unset yarn priority setting and use cluster default priority.
Please refer to YARN's official documentation for specific settings required to enable priority scheduling for the targeted YARN version.

## Debugging a failed YARN session

There are many reasons why a Flink YARN session deployment can fail. A misconfigured Hadoop setup (HDFS permissions, YARN configuration), version incompatibilities (running Flink with vanilla Hadoop dependencies on Cloudera Hadoop) or other errors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ public void testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterface
"-nm", "customName",
"-Dfancy-configuration-value=veryFancy",
"-Dyarn.maximum-failed-containers=3",
"-Dyarn.application.priority=-5",
"-D" + YarnConfigOptions.VCORES.key() + "=2"},
"Flink JobManager is now running on ",
RunTypes.YARN_SESSION);
Expand Down Expand Up @@ -296,6 +297,7 @@ public void testVCoresAreSetCorrectlyAndJobManagerHostnameAreShownInWebInterface
//
assertThat(flinkConfig, hasEntry("fancy-configuration-value", "veryFancy"));
assertThat(flinkConfig, hasEntry("yarn.maximum-failed-containers", "3"));
assertThat(flinkConfig, hasEntry(YarnConfigOptions.APPLICATION_PRIORITY.key(), "5"));

//
// FLINK-2213: assert that vcores are set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
import org.apache.hadoop.yarn.api.records.LocalResource;
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.NodeState;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
Expand Down Expand Up @@ -988,6 +989,13 @@ public ApplicationReport startAppMaster(
appContext.setAMContainerSpec(amContainer);
appContext.setResource(capability);

// Set priority for application
int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
if (priorityNum >= 0) {
Priority priority = Priority.newInstance(priorityNum);
appContext.setPriority(priority);
}

if (yarnQueue != null) {
appContext.setQueue(yarnQueue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,22 @@ public class YarnConfigOptions {
" Flink on YARN on an environment with a restrictive firewall, this option allows specifying a range of" +
" allowed ports.");

/**
* A non-negative integer indicating the priority for submitting a Flink YARN application. It will only take effect
* if YARN priority scheduling setting is enabled. Larger integer corresponds with higher priority. If priority is
* negative or set to '-1'(default), Flink will unset yarn priority setting and use cluster default priority.
*
* @see <a href="https://hadoop.apache.org/docs/r2.8.5/hadoop-yarn/hadoop-yarn-site/CapacityScheduler.html">YARN Capacity Scheduling Doc</a>
*/
public static final ConfigOption<Integer> APPLICATION_PRIORITY =
key("yarn.application.priority")
.defaultValue(-1)
.withDescription("A non-negative integer indicating the priority for submitting a Flink YARN application. It" +
" will only take effect if YARN priority scheduling setting is enabled. Larger integer corresponds" +
" with higher priority. If priority is negative or set to '-1'(default), Flink will unset yarn priority" +
" setting and use cluster default priority. Please refer to YARN's official documentation for specific" +
" settings required to enable priority scheduling for the targeted YARN version.");

/**
* A comma-separated list of strings to use as YARN application tags.
*/
Expand Down

0 comments on commit a9d34dd

Please sign in to comment.