Skip to content

Commit

Permalink
Revert "[FLINK-10848][YARN] properly remove YARN ContainerRequest upo…
Browse files Browse the repository at this point in the history
…n container allocation success"

This reverts commit e26d90f.
  • Loading branch information
tillrohrmann committed Jan 8, 2019
1 parent 392647d commit d77151c
Show file tree
Hide file tree
Showing 5 changed files with 1 addition and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,8 +165,6 @@ public abstract class YarnTestBase extends TestLogger {
YARN_CONFIGURATION.setInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 3600);
YARN_CONFIGURATION.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
YARN_CONFIGURATION.setInt(YarnConfiguration.NM_VCORES, 666); // memory is overwritten in the MiniYARNCluster.
YARN_CONFIGURATION.set("yarn.scheduler.capacity.resource-calculator",
"org.apache.hadoop.yarn.util.resource.DominantResourceCalculator");
// so we have to change the number of cores for testing.
YARN_CONFIGURATION.setInt(YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS, 20000); // 20 seconds expiry (to ensure we properly heartbeat with YARN).
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -438,8 +438,6 @@ private void containersAllocated(List<Container> containers) {
numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1);
LOG.info("Received new container: {} - Remaining pending container requests: {}",
container.getId(), numPendingContainerRequests);
resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
container.getResource(), null, null, container.getPriority()));

// decide whether to return the container, or whether to start a TaskManager
if (numRegistered + containersInLaunch.size() < numRequired) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -361,8 +361,7 @@ public void onContainersAllocated(List<Container> containers) {
"Received new container: {} - Remaining pending container requests: {}",
container.getId(),
numPendingContainerRequests);
resourceManagerClient.removeContainerRequest(new AMRMClient.ContainerRequest(
container.getResource(), null, null, container.getPriority()));

if (numPendingContainerRequests > 0) {
numPendingContainerRequests--;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.client.api.AMRMClient;
import org.apache.hadoop.yarn.client.api.NMClient;
import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
Expand All @@ -71,11 +69,8 @@
import scala.concurrent.duration.FiniteDuration;

import static org.junit.Assert.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -130,8 +125,6 @@ public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Except
1),
i));
when(mockContainer.getNodeId()).thenReturn(NodeId.newInstance("container", 1234));
when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1));
when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED);
containerList.add(mockContainer);
}

Expand Down Expand Up @@ -240,8 +233,6 @@ public Object answer(InvocationOnMock invocation) throws Throwable {

int numberOfRegisteredResources = (Integer) Await.result(numberOfRegisteredResourcesFuture, deadline.timeLeft());

verify(resourceManagerClient, times(numInitialTaskManagers)).removeContainerRequest(
any(AMRMClient.ContainerRequest.class));
assertEquals(numInitialTaskManagers, numberOfRegisteredResources);
} finally {
if (resourceManager != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -401,8 +401,6 @@ public void testStopWorker() throws Exception {

resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
verify(mockResourceManagerClient).removeContainerRequest(
any(AMRMClient.ContainerRequest.class));
verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));

// Remote task executor registers with YarnResourceManager.
Expand Down Expand Up @@ -498,8 +496,6 @@ public void testOnContainerCompleted() throws Exception {

resourceManager.onContainersAllocated(ImmutableList.of(testingContainer));
verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class));
verify(mockResourceManagerClient).removeContainerRequest(
any(AMRMClient.ContainerRequest.class));
verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class));

// Callback from YARN when container is Completed, pending request can not be fulfilled by pending
Expand Down

0 comments on commit d77151c

Please sign in to comment.