diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java index 8e686bbbe34c1..e8e55c3330361 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java @@ -41,15 +41,20 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.slf4j.Logger; +import javax.annotation.Nonnull; + import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; @@ -72,6 +77,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager containersInLaunch; @@ -314,6 +321,21 @@ protected void fatalError(String message, Throwable error) { @Override protected void requestNewWorkers(int numWorkers) { + final Resource capability = getContainerResource(); + + for (int i = 0; i < numWorkers; i++) { + numPendingContainerRequests++; + LOG.info("Requesting new TaskManager container with {} megabytes memory. Pending requests: {}", + capability.getMemory(), numPendingContainerRequests); + + resourceManagerClient.addContainerRequest(createContainerRequest(capability)); + } + + // make sure we transmit the request fast and receive fast news of granted allocations + resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); + } + + private Resource getContainerResource() { final long mem = taskManagerParameters.taskManagerTotalMemoryMB(); final int containerMemorySizeMB; @@ -325,25 +347,15 @@ protected void requestNewWorkers(int numWorkers) { mem, containerMemorySizeMB); } - for (int i = 0; i < numWorkers; i++) { - numPendingContainerRequests++; - LOG.info("Requesting new TaskManager container with {} megabytes memory. Pending requests: {}", - containerMemorySizeMB, numPendingContainerRequests); - - // Priority for worker containers - priorities are intra-application - Priority priority = Priority.newInstance(0); - - // Resource requirements for worker containers - int taskManagerSlots = taskManagerParameters.numSlots(); - int vcores = config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1)); - Resource capability = Resource.newInstance(containerMemorySizeMB, vcores); - - resourceManagerClient.addContainerRequest( - new AMRMClient.ContainerRequest(capability, null, null, priority)); - } + // Resource requirements for worker containers + int taskManagerSlots = taskManagerParameters.numSlots(); + int vcores = config.getInteger(YarnConfigOptions.VCORES, Math.max(taskManagerSlots, 1)); + return Resource.newInstance(containerMemorySizeMB, vcores); + } - // make sure we transmit the request fast and receive fast news of granted allocations - resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); + @Nonnull + private AMRMClient.ContainerRequest createContainerRequest(Resource capability) { + return new AMRMClient.ContainerRequest(capability, null, null, RM_REQUEST_PRIORITY); } @Override @@ -434,7 +446,14 @@ private void containersAllocated(List containers) { final int numRequired = getDesignatedWorkerPoolSize(); final int numRegistered = getNumberOfStartedTaskManagers(); + final Collection pendingRequests = getPendingRequests(); + final Iterator pendingRequestsIterator = pendingRequests.iterator(); + for (Container container : containers) { + if (numPendingContainerRequests > 0) { + numPendingContainerRequests -= 1; + resourceManagerClient.removeContainerRequest(pendingRequestsIterator.next()); + } numPendingContainerRequests = Math.max(0, numPendingContainerRequests - 1); LOG.info("Received new container: {} - Remaining pending container requests: {}", container.getId(), numPendingContainerRequests); @@ -487,6 +506,24 @@ private void containersAllocated(List containers) { triggerCheckWorkers(); } + private Collection getPendingRequests() { + final List> matchingRequests = resourceManagerClient.getMatchingRequests(RM_REQUEST_PRIORITY, ResourceRequest.ANY, getContainerResource()); + + final Collection result; + + if (matchingRequests.isEmpty()) { + result = Collections.emptyList(); + } else { + result = new ArrayList<>(matchingRequests.get(0)); + } + + Preconditions.checkState( + result.size() == numPendingContainerRequests, + "The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged. Number client's pending container requests %s != Number RM's pending container requests %s.", result.size(), numPendingContainerRequests); + + return result; + } + /** * Invoked when the ResourceManager informs of completed containers. * Called via an actor message by the callback from the ResourceManager client. diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java index 6ff5cd6648711..609ab42f590f0 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ConfigurationUtils; @@ -41,6 +42,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.apache.flink.yarn.configuration.YarnConfigOptions; import org.apache.hadoop.yarn.api.ApplicationConstants; @@ -52,15 +54,20 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.CompletableFuture; @@ -73,6 +80,7 @@ */ public class YarnResourceManager extends ResourceManager implements AMRMClientAsync.CallbackHandler { + private static final Priority RM_REQUEST_PRIORITY = Priority.newInstance(1); /** The process environment variables. */ private final Map env; @@ -119,6 +127,8 @@ public class YarnResourceManager extends ResourceManager impleme private final Collection slotsPerWorker; + private final Resource resource; + public YarnResourceManager( RpcService rpcService, String resourceManagerEndpointId, @@ -169,6 +179,7 @@ public YarnResourceManager( this.numberOfTaskSlots = flinkConfig.getInteger(TaskManagerOptions.NUM_TASK_SLOTS); this.defaultTaskManagerMemoryMB = ConfigurationUtils.getTaskManagerHeapMemory(flinkConfig).getMebiBytes(); this.defaultCpus = flinkConfig.getInteger(YarnConfigOptions.VCORES, numberOfTaskSlots); + this.resource = Resource.newInstance(defaultTaskManagerMemoryMB, defaultCpus); this.slotsPerWorker = createSlotsPerWorker(numberOfTaskSlots); } @@ -292,17 +303,19 @@ protected void internalDeregisterApplication( @Override public Collection startNewWorker(ResourceProfile resourceProfile) { - // Priority for worker containers - priorities are intra-application - //TODO: set priority according to the resource allocated - Priority priority = Priority.newInstance(generatePriority(resourceProfile)); - int mem = resourceProfile.getMemoryInMB() < 0 ? defaultTaskManagerMemoryMB : resourceProfile.getMemoryInMB(); - int vcore = resourceProfile.getCpuCores() < 1 ? defaultCpus : (int) resourceProfile.getCpuCores(); - Resource capability = Resource.newInstance(mem, vcore); - requestYarnContainer(capability, priority); + Preconditions.checkArgument( + ResourceProfile.UNKNOWN.equals(resourceProfile), + "The YarnResourceManager does not support custom ResourceProfiles yet. It assumes that all containers have the same resources."); + requestYarnContainer(); return slotsPerWorker; } + @VisibleForTesting + Resource getContainerResource() { + return resource; + } + @Override public boolean stopWorker(final YarnWorkerNode workerNode) { final Container container = workerNode.getContainer(); @@ -343,8 +356,7 @@ public void onContainersCompleted(final List statuses) { if (yarnWorkerNode != null) { // Container completed unexpectedly ~> start a new one - final Container container = yarnWorkerNode.getContainer(); - requestYarnContainerIfRequired(container.getResource(), yarnWorkerNode.getContainer().getPriority()); + requestYarnContainerIfRequired(); } // Eagerly close the connection with task manager. closeTaskManagerConnection(resourceId, new Exception(containerStatus.getDiagnostics())); @@ -356,6 +368,9 @@ public void onContainersCompleted(final List statuses) { @Override public void onContainersAllocated(List containers) { runAsync(() -> { + final Collection pendingRequests = getPendingRequests(); + final Iterator pendingRequestsIterator = pendingRequests.iterator(); + for (Container container : containers) { log.info( "Received new container: {} - Remaining pending container requests: {}", @@ -363,7 +378,7 @@ public void onContainersAllocated(List containers) { numPendingContainerRequests); if (numPendingContainerRequests > 0) { - numPendingContainerRequests--; + removeContainerRequest(pendingRequestsIterator.next()); final String containerIdStr = container.getId().toString(); final ResourceID resourceId = new ResourceID(containerIdStr); @@ -385,7 +400,7 @@ public void onContainersAllocated(List containers) { workerNodeMap.remove(resourceId); resourceManagerClient.releaseAssignedContainer(container.getId()); // and ask for a new one - requestYarnContainerIfRequired(container.getResource(), container.getPriority()); + requestYarnContainerIfRequired(); } } else { // return the excessive containers @@ -402,6 +417,36 @@ public void onContainersAllocated(List containers) { }); } + private void removeContainerRequest(AMRMClient.ContainerRequest pendingContainerRequest) { + numPendingContainerRequests--; + + log.info("Removing container request {}. Pending container requests {}.", pendingContainerRequest, numPendingContainerRequests); + + resourceManagerClient.removeContainerRequest(pendingContainerRequest); + } + + private Collection getPendingRequests() { + final List> matchingRequests = resourceManagerClient.getMatchingRequests( + RM_REQUEST_PRIORITY, + ResourceRequest.ANY, + getContainerResource()); + + final Collection matchingContainerRequests; + + if (matchingRequests.isEmpty()) { + matchingContainerRequests = Collections.emptyList(); + } else { + final Collection collection = matchingRequests.get(0); + matchingContainerRequests = new ArrayList<>(collection); + } + + Preconditions.checkState( + matchingContainerRequests.size() == numPendingContainerRequests, + "The RMClient's and YarnResourceManagers internal state about the number of pending container requests has diverged. Number client's pending container requests %s != Number RM's pending container requests %s.", matchingContainerRequests.size(), numPendingContainerRequests); + + return matchingContainerRequests; + } + @Override public void onShutdownRequest() { shutDown(); @@ -456,17 +501,17 @@ private static Tuple2 parseHostPort(String address) { /** * Request new container if pending containers cannot satisfies pending slot requests. */ - private void requestYarnContainerIfRequired(Resource resource, Priority priority) { + private void requestYarnContainerIfRequired() { int requiredTaskManagerSlots = getNumberRequiredTaskManagerSlots(); int pendingTaskManagerSlots = numPendingContainerRequests * numberOfTaskSlots; if (requiredTaskManagerSlots > pendingTaskManagerSlots) { - requestYarnContainer(resource, priority); + requestYarnContainer(); } } - private void requestYarnContainer(Resource resource, Priority priority) { - resourceManagerClient.addContainerRequest(new AMRMClient.ContainerRequest(resource, null, null, priority)); + private void requestYarnContainer() { + resourceManagerClient.addContainerRequest(getContainerRequest()); // make sure we transmit the request fast and receive fast news of granted allocations resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS); @@ -478,6 +523,16 @@ private void requestYarnContainer(Resource resource, Priority priority) { numPendingContainerRequests); } + @Nonnull + @VisibleForTesting + AMRMClient.ContainerRequest getContainerRequest() { + return new AMRMClient.ContainerRequest( + getContainerResource(), + null, + null, + RM_REQUEST_PRIORITY); + } + private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource, String containerId, String host) throws Exception { // init the ContainerLaunchContext @@ -514,22 +569,4 @@ private ContainerLaunchContext createTaskExecutorLaunchContext(Resource resource .put(ENV_FLINK_NODE_ID, host); return taskExecutorLaunchContext; } - - - - /** - * Generate priority by given resource profile. - * Priority is only used for distinguishing request of different resource. - * @param resourceProfile The resource profile of a request - * @return The priority of this resource profile. - */ - private int generatePriority(ResourceProfile resourceProfile) { - if (resourcePriorities.containsKey(resourceProfile)) { - return resourcePriorities.get(resourceProfile); - } else { - int priority = resourcePriorities.size(); - resourcePriorities.put(resourceProfile, priority); - return priority; - } - } } diff --git a/flink-yarn/src/main/resources/log4j.properties b/flink-yarn/src/main/resources/log4j.properties index b2ad0d3c465bc..e84cd49f01aea 100644 --- a/flink-yarn/src/main/resources/log4j.properties +++ b/flink-yarn/src/main/resources/log4j.properties @@ -17,7 +17,7 @@ ################################################################################ # Convenience file for local debugging of the JobManager/TaskManager. -log4j.rootLogger=INFO, console +log4j.rootLogger=OFF, console log4j.appender.console=org.apache.log4j.ConsoleAppender log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%d{HH:mm:ss,SSS} %-5p %-60c %x - %m%n diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java index 10b2ce97d6fe4..4108249548e56 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnFlinkResourceManagerTest.java @@ -43,6 +43,8 @@ import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerLaunchContext; import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.client.api.AMRMClient; import org.apache.hadoop.yarn.client.api.NMClient; import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; @@ -52,7 +54,6 @@ import org.junit.Test; import org.mockito.Matchers; import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; import java.util.ArrayList; import java.util.Collections; @@ -69,7 +70,10 @@ import scala.concurrent.duration.FiniteDuration; import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -128,20 +132,6 @@ public void testYarnFlinkResourceManagerJobManagerLostLeadership() throws Except containerList.add(mockContainer); } - doAnswer(new Answer() { - int counter = 0; - @Override - public Object answer(InvocationOnMock invocation) throws Throwable { - if (counter < containerList.size()) { - callbackHandler.onContainersAllocated( - Collections.singletonList( - containerList.get(counter++) - )); - } - return null; - } - }).when(resourceManagerClient).addContainerRequest(Matchers.any(AMRMClient.ContainerRequest.class)); - final CompletableFuture resourceManagerFuture = new CompletableFuture<>(); final CompletableFuture leaderGatewayFuture = new CompletableFuture<>(); @@ -191,6 +181,9 @@ public Object answer(InvocationOnMock invocation) throws Throwable { nodeManagerClient )); + doReturn(Collections.singletonList(Collections.nCopies(numInitialTaskManagers, new AMRMClient.ContainerRequest(Resource.newInstance(1024 * 1024, 1), null, null, Priority.newInstance(0))))) + .when(resourceManagerClient).getMatchingRequests(any(Priority.class), anyString(), any(Resource.class)); + leaderRetrievalService.notifyListener(leader1.path().toString(), leaderSessionID); final AkkaActorGateway leader1Gateway = new AkkaActorGateway(leader1, leaderSessionID); @@ -203,6 +196,8 @@ public Object answer(InvocationOnMock invocation) throws Throwable { resourceManagerGateway.tell(new RegisterResourceManagerSuccessful(leader1, Collections.emptyList())); + callbackHandler.onContainersAllocated(containerList); + for (int i = 0; i < containerList.size(); i++) { expectMsgClass(deadline.timeLeft(), Acknowledge.class); } diff --git a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java index d41d42d7a05f9..368e95cc76030 100644 --- a/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java +++ b/flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerTest.java @@ -105,8 +105,10 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Matchers.any; import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; @@ -243,7 +245,7 @@ class Context { final HardwareDescription hardwareDescription = new HardwareDescription(1, 2L, 3L, 4L); // domain objects for test purposes - final ResourceProfile resourceProfile1 = new ResourceProfile(1.0, 200); + final ResourceProfile resourceProfile1 = ResourceProfile.UNKNOWN; public ContainerId task = ContainerId.newInstance( ApplicationAttemptId.newInstance(ApplicationId.newInstance(1L, 0), 0), 1); @@ -351,7 +353,7 @@ void runTest(RunnableWithException testMethod) throws Exception { } } - private static Container mockContainer(String host, int port, int containerId) { + private static Container mockContainer(String host, int port, int containerId, Resource resource) { Container mockContainer = mock(Container.class); NodeId mockNodeId = NodeId.newInstance(host, port); @@ -365,7 +367,7 @@ private static Container mockContainer(String host, int port, int containerId) { when(mockContainer.getId()).thenReturn(mockContainerId); when(mockContainer.getNodeId()).thenReturn(mockNodeId); - when(mockContainer.getResource()).thenReturn(Resource.newInstance(200, 1)); + when(mockContainer.getResource()).thenReturn(resource); when(mockContainer.getPriority()).thenReturn(Priority.UNDEFINED); return mockContainer; @@ -397,7 +399,10 @@ public void testStopWorker() throws Exception { registerSlotRequestFuture.get(); // Callback from YARN when container is allocated. - Container testingContainer = mockContainer("container", 1234, 1); + Container testingContainer = mockContainer("container", 1234, 1, resourceManager.getContainerResource()); + + doReturn(Collections.singletonList(Collections.singletonList(resourceManager.getContainerRequest()))) + .when(mockResourceManagerClient).getMatchingRequests(any(Priority.class), anyString(), any(Resource.class)); resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); @@ -492,10 +497,14 @@ public void testOnContainerCompleted() throws Exception { registerSlotRequestFuture.get(); // Callback from YARN when container is allocated. - Container testingContainer = mockContainer("container", 1234, 1); + Container testingContainer = mockContainer("container", 1234, 1, resourceManager.getContainerResource()); + + doReturn(Collections.singletonList(Collections.singletonList(resourceManager.getContainerRequest()))) + .when(mockResourceManagerClient).getMatchingRequests(any(Priority.class), anyString(), any(Resource.class)); resourceManager.onContainersAllocated(ImmutableList.of(testingContainer)); verify(mockResourceManagerClient).addContainerRequest(any(AMRMClient.ContainerRequest.class)); + verify(mockResourceManagerClient).removeContainerRequest(any(AMRMClient.ContainerRequest.class)); verify(mockNMClient).startContainer(eq(testingContainer), any(ContainerLaunchContext.class)); // Callback from YARN when container is Completed, pending request can not be fulfilled by pending diff --git a/flink-yarn/src/test/resources/log4j-test.properties b/flink-yarn/src/test/resources/log4j-test.properties index 2226f68653181..5b1e4ed8e7a74 100644 --- a/flink-yarn/src/test/resources/log4j-test.properties +++ b/flink-yarn/src/test/resources/log4j-test.properties @@ -18,7 +18,7 @@ # Set root logger level to OFF to not flood build logs # set manually to INFO for debugging purposes -log4j.rootLogger=OFF, testlogger +log4j.rootLogger=INFO, testlogger # A1 is set to be a ConsoleAppender. log4j.appender.testlogger=org.apache.log4j.ConsoleAppender