Skip to content

Commit

Permalink
[FLINK-3134][yarn] asynchronous YarnJobManager heartbeats
Browse files Browse the repository at this point in the history
- use AMRMClientAsync instead of AMRMClient
- handle allocation and startup of containers in callbacks
- remove YarnHeartbeat message

The AMRMClientAsync uses one thread to communicate with the resource
manager and an additional thread to execute the callbacks.

This closes apache#1450.
  • Loading branch information
mxm committed Dec 17, 2015
1 parent d6e9fa0 commit 4e52fe4
Show file tree
Hide file tree
Showing 5 changed files with 301 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -284,14 +284,14 @@ public void testTaskManagerFailure() {
}

// stateful termination check:
// wait until we saw a container being killed and AFTERWARDS a new one launced
// wait until we saw a container being killed and AFTERWARDS a new one launched
boolean ok = false;
do {
LOG.debug("Waiting for correct order of events. Output: {}", errContent.toString());

String o = errContent.toString();
int killedOff = o.indexOf("Container killed by the ApplicationMaster");
if(killedOff != -1) {
if (killedOff != -1) {
o = o.substring(killedOff);
ok = o.indexOf("Launching container") > 0;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,10 @@ public abstract class YarnTestBase extends TestLogger {

/** These strings are white-listed, overriding teh prohibited strings */
protected final static String[] WHITELISTED_STRINGS = {
"akka.remote.RemoteTransportExceptionNoStackTrace"
"akka.remote.RemoteTransportExceptionNoStackTrace",
// workaround for annoying InterruptedException logging:
// https://issues.apache.org/jira/browse/YARN-1022
"java.lang.InterruptedException"
};

// Temp directory which is deleted after the unit test.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ import scala.concurrent.duration.FiniteDuration
* @param defaultExecutionRetries Number of default execution retries
* @param delayBetweenRetries Delay between retries
* @param timeout Timeout for futures
* @param mode StreamingMode in which the system shall be started
* @param leaderElectionService LeaderElectionService to participate in the leader election
*/
class TestingYarnJobManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ package org.apache.flink.yarn
import java.util.UUID

import akka.actor._
import akka.pattern
import akka.util.Timeout
import grizzled.slf4j.Logger
import org.apache.flink.configuration.Configuration
import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, LeaderRetrievalService}
Expand Down
Loading

0 comments on commit 4e52fe4

Please sign in to comment.