Skip to content

Commit

Permalink
[FLINK-12342][yarn] Add config option for heartbeat interval during c…
Browse files Browse the repository at this point in the history
…ontainer requests

Flink's YarnResourceManager sets a faster heartbeat interval when it is requesting containers
from Yarn's ResourceManager. Since requests and responses are transported via heartbeats, this
speeds up requests. However, it can also put additional load on Yarn due to excessive container
requests. Therefore, this commit introduces a config option which allows to control this heartbeat.

This closes apache#8306.
  • Loading branch information
HuangZhenQiu authored and tillrohrmann committed May 7, 2019
1 parent 417d6d2 commit 3871d4d
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
7 changes: 6 additions & 1 deletion docs/_includes/generated/yarn_config_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,12 @@
<td>The number of virtual cores (vcores) per YARN container. By default, the number of vcores is set to the number of slots per TaskManager, if set, or to 1, otherwise. In order for this parameter to be used your cluster must have CPU scheduling enabled. You can do this by setting the <span markdown="span">`org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler`</span>.</td>
</tr>
<tr>
<td><h5>yarn.heartbeat-delay</h5></td>
<td><h5>yarn.heartbeat.container-request-interval</h5></td>
<td style="word-wrap: break-word;">500</td>
<td>Time between heartbeats with the ResourceManager in milliseconds if Flink requests containers:<ul><li>The lower this value is, the faster Flink will get notified about container allocations since requests and allocations are transmitted via heartbeats.</li><li>The lower this value is, the more excessive containers might get allocated which will eventually be released but put pressure on Yarn.</li></ul>If you observe too many container allocations on the ResourceManager, then it is recommended to increase this value. See <a href="https://issues.apache.org/jira/browse/YARN-1902">this link</a> for more information.</td>
</tr>
<tr>
<td><h5>yarn.heartbeat.interval</h5></td>
<td style="word-wrap: break-word;">5</td>
<td>Time between heartbeats with the ResourceManager in seconds.</td>
</tr>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,6 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme

/** YARN container map. Package private for unit test purposes. */
private final ConcurrentMap<ResourceID, YarnWorkerNode> workerNodeMap;

/** The heartbeat interval while the resource master is waiting for containers. */
private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;

/** Environment variable name of the final container id used by the YarnResourceManager.
* Container ID generation may vary across Hadoop versions. */
static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
Expand All @@ -114,6 +110,9 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme

private final int defaultCpus;

/** The heartbeat interval while the resource master is waiting for containers. */
private final int containerRequestHeartbeatIntervalMillis;

/** Client to communicate with the Resource Manager (YARN's master). */
private AMRMClientAsync<AMRMClient.ContainerRequest> resourceManagerClient;

Expand Down Expand Up @@ -173,6 +172,7 @@ public YarnResourceManager(
yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
}
yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
containerRequestHeartbeatIntervalMillis = flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
numPendingContainerRequests = 0;

this.webInterfaceUrl = webInterfaceUrl;
Expand Down Expand Up @@ -514,8 +514,7 @@ private void requestYarnContainer() {
resourceManagerClient.addContainerRequest(getContainerRequest());

// make sure we transmit the request fast and receive fast news of granted allocations
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);

resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
numPendingContainerRequests++;

log.info("Requesting new TaskExecutor container with resources {}. Number pending requests {}.",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
import org.apache.flink.configuration.description.Description;

import static org.apache.flink.configuration.ConfigOptions.key;
import static org.apache.flink.configuration.description.LinkElement.link;
import static org.apache.flink.configuration.description.TextElement.code;
import static org.apache.flink.configuration.description.TextElement.text;

/**
* This class holds configuration constants used by Flink's YARN runners.
Expand Down Expand Up @@ -103,10 +105,27 @@ public class YarnConfigOptions {
* The heartbeat interval between the Application Master and the YARN Resource Manager.
*/
public static final ConfigOption<Integer> HEARTBEAT_DELAY_SECONDS =
key("yarn.heartbeat-delay")
key("yarn.heartbeat.interval")
.defaultValue(5)
.withDeprecatedKeys("yarn.heartbeat-delay")
.withDescription("Time between heartbeats with the ResourceManager in seconds.");

/**
* The heartbeat interval between the Application Master and the YARN Resource Manager
* if Flink is requesting containers.
*/
public static final ConfigOption<Integer> CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS =
key("yarn.heartbeat.container-request-interval")
.defaultValue(500)
.withDescription(
new Description.DescriptionBuilder()
.text("Time between heartbeats with the ResourceManager in milliseconds if Flink requests containers:")
.list(
text("The lower this value is, the faster Flink will get notified about container allocations since requests and allocations are transmitted via heartbeats."),
text("The lower this value is, the more excessive containers might get allocated which will eventually be released but put pressure on Yarn."))
.text("If you observe too many container allocations on the ResourceManager, then it is recommended to increase this value. See %s for more information.", link("https://issues.apache.org/jira/browse/YARN-1902", "this link"))
.build());

/**
* When a Flink job is submitted to YARN, the JobManager's host and the number of available
* processing slots is written into a properties file, so that the Flink client is able
Expand Down

0 comments on commit 3871d4d

Please sign in to comment.