Skip to content

Commit

Permalink
[FLINK-7805][flip6] Recover YARN containers after AM restart.
Browse files Browse the repository at this point in the history
Recover previously running containers after a restart of the ApplicationMaster.
This is a port of a feature that was already implemented prior to FLIP-6.
Extract RegisterApplicationMasterResponseReflector class into separate file.

This closes apache#5597.
  • Loading branch information
GJL authored and tillrohrmann committed Feb 28, 2018
1 parent 035257e commit 45397fe
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 53 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import org.apache.flink.annotation.VisibleForTesting;

import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.slf4j.Logger;

import java.lang.reflect.Method;
import java.util.Collections;
import java.util.List;

import static java.util.Objects.requireNonNull;

/**
* Looks up the method {@link RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()}
* once and saves the method. This saves computation time on subsequent calls.
*/
class RegisterApplicationMasterResponseReflector {

private final Logger logger;

/**
* Reflected method {@link RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()}.
*/
private Method method;

RegisterApplicationMasterResponseReflector(final Logger log) {
this(log, RegisterApplicationMasterResponse.class);
}

@VisibleForTesting
RegisterApplicationMasterResponseReflector(final Logger log, final Class<?> clazz) {
this.logger = requireNonNull(log);
requireNonNull(clazz);

try {
method = clazz.getMethod("getContainersFromPreviousAttempts");
} catch (NoSuchMethodException e) {
// that happens in earlier Hadoop versions (pre 2.2)
logger.info("Cannot reconnect to previously allocated containers. " +
"This YARN version does not support 'getContainersFromPreviousAttempts()'");
}
}

/**
* Checks if a YARN application still has registered containers. If the application master
* registered at the ResourceManager for the first time, this list will be empty. If the
* application master registered a repeated time (after a failure and recovery), this list
* will contain the containers that were previously allocated.
*
* @param response The response object from the registration at the ResourceManager.
* @return A list with containers from previous application attempt.
*/
List<Container> getContainersFromPreviousAttempts(final RegisterApplicationMasterResponse response) {
return getContainersFromPreviousAttemptsUnsafe(response);
}

/**
* Same as {@link #getContainersFromPreviousAttempts(RegisterApplicationMasterResponse)} but
* allows to pass objects that are not of type {@link RegisterApplicationMasterResponse}.
*/
@VisibleForTesting
List<Container> getContainersFromPreviousAttemptsUnsafe(final Object response) {
if (method != null && response != null) {
try {
@SuppressWarnings("unchecked")
final List<Container> containers = (List<Container>) method.invoke(response);
if (containers != null && !containers.isEmpty()) {
return containers;
}
} catch (Exception t) {
logger.error("Error invoking 'getContainersFromPreviousAttempts()'", t);
}
}

return Collections.emptyList();
}

@VisibleForTesting
Method getMethod() {
return method;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,8 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.slf4j.Logger;

import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -616,56 +614,6 @@ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
}
}

/**
* Looks up the getContainersFromPreviousAttempts method on RegisterApplicationMasterResponse
* once and saves the method. This saves computation time on the sequent calls.
*/
private static class RegisterApplicationMasterResponseReflector {

private Logger logger;
private Method method;

public RegisterApplicationMasterResponseReflector(Logger log) {
this.logger = log;

try {
method = RegisterApplicationMasterResponse.class
.getMethod("getContainersFromPreviousAttempts");

} catch (NoSuchMethodException e) {
// that happens in earlier Hadoop versions
logger.info("Cannot reconnect to previously allocated containers. " +
"This YARN version does not support 'getContainersFromPreviousAttempts()'");
}
}

/**
* Checks if a YARN application still has registered containers. If the application master
* registered at the ResourceManager for the first time, this list will be empty. If the
* application master registered a repeated time (after a failure and recovery), this list
* will contain the containers that were previously allocated.
*
* @param response The response object from the registration at the ResourceManager.
* @return A list with containers from previous application attempt.
*/
private List<Container> getContainersFromPreviousAttempts(RegisterApplicationMasterResponse response) {
if (method != null && response != null) {
try {
@SuppressWarnings("unchecked")
List<Container> list = (List<Container>) method.invoke(response);
if (list != null && !list.isEmpty()) {
return list;
}
} catch (Throwable t) {
logger.error("Error invoking 'getContainersFromPreviousAttempts()'", t);
}
}

return Collections.emptyList();
}

}

// ------------------------------------------------------------------------
// Actor props factory
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.flink.yarn.configuration.YarnConfigOptions;

import org.apache.hadoop.yarn.api.ApplicationConstants;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
Expand Down Expand Up @@ -192,11 +193,24 @@ protected AMRMClientAsync<AMRMClient.ContainerRequest> createAndStartResourceMan
restPort = -1;
}

resourceManagerClient.registerApplicationMaster(hostPort.f0, restPort, webInterfaceUrl);
final RegisterApplicationMasterResponse registerApplicationMasterResponse =
resourceManagerClient.registerApplicationMaster(hostPort.f0, restPort, webInterfaceUrl);
getContainersFromPreviousAttempts(registerApplicationMasterResponse);

return resourceManagerClient;
}

private void getContainersFromPreviousAttempts(final RegisterApplicationMasterResponse registerApplicationMasterResponse) {
final List<Container> containersFromPreviousAttempts =
new RegisterApplicationMasterResponseReflector(log).getContainersFromPreviousAttempts(registerApplicationMasterResponse);

log.info("Recovered {} containers from previous attempts ({}).", containersFromPreviousAttempts.size(), containersFromPreviousAttempts);

for (final Container container : containersFromPreviousAttempts) {
workerNodeMap.put(new ResourceID(container.getId().toString()), new YarnWorkerNode(container));
}
}

protected NMClient createAndStartNodeManagerClient(YarnConfiguration yarnConfiguration) {
// create the client to communicate with the node managers
NMClient nodeManagerClient = NMClient.createNMClient();
Expand Down Expand Up @@ -315,6 +329,7 @@ public void onContainersCompleted(List<ContainerStatus> list) {
closeTaskManagerConnection(new ResourceID(
container.getContainerId().toString()), new Exception(container.getDiagnostics()));
}
workerNodeMap.remove(new ResourceID(container.getContainerId().toString()));
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.yarn;

import org.apache.flink.util.TestLogger;

import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
import org.apache.hadoop.yarn.api.records.Container;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;

import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
import static org.junit.Assert.assertThat;
import static org.junit.Assume.assumeTrue;

/**
* Tests for {@link RegisterApplicationMasterResponseReflector}.
*/
public class RegisterApplicationMasterResponseReflectorTest extends TestLogger {

private static final Logger LOG = LoggerFactory.getLogger(RegisterApplicationMasterResponseReflectorTest.class);

@Mock
private Container mockContainer;

@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
}

@Test
public void testCallsMethodIfPresent() {
final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector =
new RegisterApplicationMasterResponseReflector(LOG, HasMethod.class);

final List<Container> containersFromPreviousAttemptsUnsafe =
registerApplicationMasterResponseReflector.getContainersFromPreviousAttemptsUnsafe(new
HasMethod());

assertThat(containersFromPreviousAttemptsUnsafe, hasSize(1));
}

@Test
public void testDoesntCallMethodIfAbsent() {
final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector =
new RegisterApplicationMasterResponseReflector(LOG, HasMethod.class);

final List<Container> containersFromPreviousAttemptsUnsafe =
registerApplicationMasterResponseReflector.getContainersFromPreviousAttemptsUnsafe(new
Object());

assertThat(containersFromPreviousAttemptsUnsafe, empty());
}

@Test
public void testGetMethodReflectiveHadoop22() {
assumeTrue(
"Method getContainersFromPreviousAttempts is not supported by Hadoop: " +
VersionInfo.getVersion(),
isHadoopVersionGreaterThanOrEquals(2, 2));

final RegisterApplicationMasterResponseReflector registerApplicationMasterResponseReflector =
new RegisterApplicationMasterResponseReflector(LOG);

final Method method = registerApplicationMasterResponseReflector.getMethod();
assertThat(method, notNullValue());
}

private static boolean isHadoopVersionGreaterThanOrEquals(final int major, final int minor) {
final String[] splitVersion = VersionInfo.getVersion().split("\\.");
final int[] versions = Arrays.stream(splitVersion).mapToInt(Integer::parseInt).toArray();
return versions[0] >= major && versions[1] >= minor;
}

/**
* Class which has a method with the same signature as
* {@link RegisterApplicationMasterResponse#getContainersFromPreviousAttempts()}.
*/
private class HasMethod {

/**
* Called from {@link #testCallsMethodIfPresent()}.
*/
@SuppressWarnings("unused")
public List<Container> getContainersFromPreviousAttempts() {
return Collections.singletonList(mockContainer);
}
}
}

0 comments on commit 45397fe

Please sign in to comment.