Skip to content

Commit

Permalink
[FLINK-10848] Remove container requests after successful container al…
Browse files Browse the repository at this point in the history
…location

This commit removes container requests after containers have been allocated. This prevents that
we will request more and more containers from Yarn in case of a recovery.

Since we cannot rely on the reported container Resource, we remove the container request by
using the requested Resource. This is due Yarn's DefaultResourceCalculator which neglects the
number of vCores when allocating containers.

This closes apache#7430.
  • Loading branch information
tillrohrmann committed Jan 11, 2019
1 parent 205f906 commit 3903b81
Show file tree
Hide file tree
Showing 6 changed files with 151 additions and 73 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,20 @@
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;

import javax.annotation.Nonnull;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

Expand All @@ -72,6 +77,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar
* Container ID generation may vary across Hadoop versions. */
static final String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";

private static final Priority RM_REQUEST_PRIORITY = Priority.newInstance(0);

/** The containers where a TaskManager is starting and we are waiting for it to register. */
private final Map<ResourceID, YarnContainerInLaunch> containersInLaunch;

Expand Down Expand Up @@ -314,6 +321,21 @@ protected void fatalError(String message, Throwable error) {

@Override
protected void requestNewWorkers(int numWorkers) {
final Resource capability = getContainerResource();

for (int i = 0; i < numWorkers; i++) {
numPendingContainerRequests++;
LOG.info("Requesting new TaskManager container with {} megabytes memory. Pending requests: {}",
capability.getMemory(), numPendingContainerRequests);

resourceManagerClient.addContainerRequest(createContainerRequest(capability));
}

// make sure we transmit the request fast and receive fast news of granted allocations
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
}

private Resource getContainerResource() {
final long mem = taskManagerParameters.taskManagerTotalMemoryMB();
final int containerMemorySizeMB;

Expand All @@ -325,25 +347,15 @@ protected void requestNewWorkers(int numWorkers) {
mem, containerMemorySizeMB);
}

for (int i = 0; i < numWorkers; i++) {
numPendingContainerRequests++;
LOG.info("Requesting new TaskManager container with {} megabytes memory. Pending requests: {}",
containerMemorySizeMB, numPendingContainerRequests);

// Priority for worker containers - priorities are intra-application
Priority priority = Priority.newInstance(0);

// Resource requirements for worker containers
int taskManagerSlots = taskManagerParameters.numSlots();
int vcores = config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1));
Resource capability = Resource.newInstance(containerMemorySizeMB, vcores);

resourceManagerClient.addContainerRequest(
new AMRMClient.ContainerRequest(capability, null, null, priority));
}
// Resource requirements for worker containers
int taskManagerSlots = taskManagerParameters.numSlots();
int vcores = config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1));
return Resource.newInstance(containerMemorySizeMB, vcores);
}

// make sure we transmit the request fast and receive fast news of granted allocations
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
@Nonnull
private AMRMClient.ContainerRequest createContainerRequest(Resource capability) {
return new AMRMClient.ContainerRequest(capability, null, null, RM_REQUEST_PRIORITY);
}

@Override
Expand Down Expand Up @@ -434,7 +446,14 @@ private void containersAllocated(List<Container> containers) {
final int numRequired = getDesignatedWorkerPoolSize();
final int numRegistered = getNumberOfStartedTaskManagers();

final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests();
final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();

for (Container container : containers) {
if (numPendingContainerRequests > 0) {
numPendingContainerRequests -= 1;
resourceManagerClient.removeContainerRequest(pendingRequestsIterator.next());
}
numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
LOG.info("Received new container: {} - Remaining pending container requests: {}",
container.getId(), numPendingContainerRequests);
Expand Down Expand Up @@ -487,6 +506,24 @@ private void containersAllocated(List<Container> containers) {
triggerCheckWorkers();
}

private Collection<AMRMClient.ContainerRequest> getPendingRequests() {
final List<? extends Collection<AMRMClient.ContainerRequest>> matchingRequests = resourceManagerClient.getMatchingRequests(RM_REQUEST_PRIORITY, ResourceRequest.ANY, getContainerResource());

final Collection<AMRMClient.ContainerRequest> result;

if (matchingRequests.isEmpty()) {
result = Collections.emptyList();
} else {
result = new ArrayList<>(matchingRequests.get(0));
}

Preconditions.checkState(
result.size() == numPendingContainerRequests,
"The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged. Number client's pending container requests %s != Number RM's pending container requests %s.", result.size(), numPendingContainerRequests);

return result;
}

/**
* Invoked when the ResourceManager informs of completed containers.
* Called via an actor message by the callback from the ResourceManager client.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.yarn;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ConfigurationUtils;
Expand All @@ -41,6 +42,7 @@
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.Preconditions;
import org.apache.flink.yarn.configuration.YarnConfigOptions;

import org.apache.hadoop.yarn.api.ApplicationConstants;
Expand All @@ -52,15 +54,20 @@
import org.apache.hadoop.yarn.api.records.NodeReport;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
import org.apache.hadoop.yarn.conf.YarnConfiguration;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
Expand All @@ -73,6 +80,7 @@
*/
public class YarnResourceManager extends ResourceManager<YarnWorkerNode> implements AMRMClientAsync.CallbackHandler {

private static final Priority RM_REQUEST_PRIORITY = Priority.newInstance(1);
/** The process environment variables. */
private final Map<String, String> env;

Expand Down Expand Up @@ -119,6 +127,8 @@ public class YarnResourceManager extends ResourceManager<YarnWorkerNode> impleme

private final Collection<ResourceProfile> slotsPerWorker;

private final Resource resource;

public YarnResourceManager(
RpcService rpcService,
String resourceManagerEndpointId,
Expand Down Expand Up @@ -169,6 +179,7 @@ public YarnResourceManager(
this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS);
this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes();
this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots);
this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus);

this.slotsPerWorker = createSlotsPerWorker(numberOfTaskSlots);
}
Expand Down Expand Up @@ -292,17 +303,19 @@ protected void internalDeregisterApplication(

@Override
public Collection<ResourceProfile> startNewWorker(ResourceProfile resourceProfile) {
// Priority for worker containers - priorities are intra-application
//TODO: set priority according to the resource allocated
Priority priority = Priority.newInstance(generatePriority(resourceProfile));
int mem = resourceProfile.getMemoryInMB() < 0 ? defaultTaskManagerMemoryMB : resourceProfile.getMemoryInMB();
int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus : (int) resourceProfile.getCpuCores();
Resource capability = Resource.newInstance(mem, vcore);
requestYarnContainer(capability, priority);
Preconditions.checkArgument(
ResourceProfile.UNKNOWN.equals(resourceProfile),
"The YarnResourceManager does not support custom ResourceProfiles yet. It assumes that all containers have the same resources.");
requestYarnContainer();

return slotsPerWorker;
}

@VisibleForTesting
Resource getContainerResource() {
return resource;
}

@Override
public boolean stopWorker(final YarnWorkerNode workerNode) {
final Container container = workerNode.getContainer();
Expand Down Expand Up @@ -343,8 +356,7 @@ public void onContainersCompleted(final List<ContainerStatus> statuses) {

if (yarnWorkerNode != null) {
// Container completed unexpectedly ~> start a new one
final Container container = yarnWorkerNode.getContainer();
requestYarnContainerIfRequired(container.getResource(), yarnWorkerNode.getContainer().getPriority());
requestYarnContainerIfRequired();
}
// Eagerly close the connection with task manager.
closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics()));
Expand All @@ -356,14 +368,17 @@ public void onContainersCompleted(final List<ContainerStatus> statuses) {
@Override
public void onContainersAllocated(List<Container> containers) {
runAsync(() -> {
final Collection<AMRMClient.ContainerRequest> pendingRequests = getPendingRequests();
final Iterator<AMRMClient.ContainerRequest> pendingRequestsIterator = pendingRequests.iterator();

for (Container container : containers) {
log.info(
"Received new container: {} - Remaining pending container requests: {}",
container.getId(),
numPendingContainerRequests);

if (numPendingContainerRequests > 0) {
numPendingContainerRequests--;
removeContainerRequest(pendingRequestsIterator.next());

final String containerIdStr = container.getId().toString();
final ResourceID resourceId = new ResourceID(containerIdStr);
Expand All @@ -385,7 +400,7 @@ public void onContainersAllocated(List<Container> containers) {
workerNodeMap.remove(resourceId);
resourceManagerClient.releaseAssignedContainer(container.getId());
// and ask for a new one
requestYarnContainerIfRequired(container.getResource(), container.getPriority());
requestYarnContainerIfRequired();
}
} else {
// return the excessive containers
Expand All @@ -402,6 +417,36 @@ public void onContainersAllocated(List<Container> containers) {
});
}

private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) {
numPendingContainerRequests--;

log.info("Removing container request {}. Pending container requests {}.", pendingContainerRequest, numPendingContainerRequests);

resourceManagerClient.removeContainerRequest(pendingContainerRequest);
}

private Collection<AMRMClient.ContainerRequest> getPendingRequests() {
final List<? extends Collection<AMRMClient.ContainerRequest>> matchingRequests = resourceManagerClient.getMatchingRequests(
RM_REQUEST_PRIORITY,
ResourceRequest.ANY,
getContainerResource());

final Collection<AMRMClient.ContainerRequest> matchingContainerRequests;

if (matchingRequests.isEmpty()) {
matchingContainerRequests = Collections.emptyList();
} else {
final Collection<AMRMClient.ContainerRequest> collection = matchingRequests.get(0);
matchingContainerRequests = new ArrayList<>(collection);
}

Preconditions.checkState(
matchingContainerRequests.size() == numPendingContainerRequests,
"The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged. Number client's pending container requests %s != Number RM's pending container requests %s.", matchingContainerRequests.size(), numPendingContainerRequests);

return matchingContainerRequests;
}

@Override
public void onShutdownRequest() {
shutDown();
Expand Down Expand Up @@ -456,17 +501,17 @@ private static Tuple2<String, Integer> parseHostPort(String address) {
/**
* Request new container if pending containers cannot satisfies pending slot requests.
*/
private void requestYarnContainerIfRequired(Resource resource, Priority priority) {
private void requestYarnContainerIfRequired() {
int requiredTaskManagerSlots = getNumberRequiredTaskManagerSlots();
int pendingTaskManagerSlots = numPendingContainerRequests * numberOfTaskSlots;

if (requiredTaskManagerSlots > pendingTaskManagerSlots) {
requestYarnContainer(resource, priority);
requestYarnContainer();
}
}

private void requestYarnContainer(Resource resource, Priority priority) {
resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority));
private void requestYarnContainer() {
resourceManagerClient.addContainerRequest(getContainerRequest());

// make sure we transmit the request fast and receive fast news of granted allocations
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
Expand All @@ -478,6 +523,16 @@ private void requestYarnContainer(Resource resource, Priority priority) {
numPendingContainerRequests);
}

@Nonnull
@VisibleForTesting
AMRMClient.ContainerRequest getContainerRequest() {
return new AMRMClient.ContainerRequest(
getContainerResource(),
null,
null,
RM_REQUEST_PRIORITY);
}

private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host)
throws Exception {
// init the ContainerLaunchContext
Expand Down Expand Up @@ -514,22 +569,4 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource
.put(ENV_FLINK_NODE_ID, host);
return taskExecutorLaunchContext;
}



/**
* Generate priority by given resource profile.
* Priority is only used for distinguishing request of different resource.
* @param resourceProfile The resource profile of a request
* @return The priority of this resource profile.
*/
private int generatePriority(ResourceProfile resourceProfile) {
if (resourcePriorities.containsKey(resourceProfile)) {
return resourcePriorities.get(resourceProfile);
} else {
int priority = resourcePriorities.size();
resourcePriorities.put(resourceProfile, priority);
return priority;
}
}
}
2 changes: 1 addition & 1 deletion flink-yarn/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
################################################################################

# Convenience file for local debugging of the JobManager/TaskManager.
log4j.rootLogger=INFO, console
log4j.rootLogger=OFF, console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n
Loading

0 comments on commit 3903b81

Please sign in to comment.