Skip to content

Commit

Permalink
[FLINK-2790] [yarn] [ha] Add high availability support for Yarn
Browse files Browse the repository at this point in the history
This squashes the following commits:
- Refactor JobManager's start actors method to be reusable
- Yarn refactoring to introduce yarn testing functionality
- Add support for testing yarn cluster. Extracted JobManager's and TaskManager's testing messages into stackable traits.
- Implement YarnHighAvailabilityITCase using Akka messages for synchronization.
- Logging statements
- Fix registration at JobManager when the leader address is null
- Fix curator dependency conflict
- Shades Flink's curator dependency in flink-runtime so that it cannot be overriden by external dependencies in the class path. This solves the problem with Hadoop 2.6.0 which adds Curator 2.6.0 to the class path. The curator version of this Hadoop version is not compatible to Flink's Curator version 2.8.0. Furthermore, Flink's Guava version is forced to be included in flink-shaded-curator jar to avoid to many different Guava version in the resulting dist jar.
- Unify two shade executions of flink-runtime into one
- Exclude log4j and slf4j-log4j12 dependency from flink-shaded-curator
- Set default number of application attempts to 1 in standalone case

This closes apache#1213
  • Loading branch information
tillrohrmann authored and uce committed Oct 8, 2015
1 parent f332fa5 commit fa2bb8f
Show file tree
Hide file tree
Showing 52 changed files with 3,242 additions and 2,018 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,6 @@ public void getYARNSessionCLIOptions(Options options) {
}

public int run(String[] args) {

//
// Command Line Options
//
Expand Down Expand Up @@ -418,7 +417,7 @@ public int run(String[] args) {
return 1;
}
//------------------ Cluster deployed, handle connection details
String jobManagerAddress = yarnCluster.getJobManagerAddress().getHostName() + ":" + yarnCluster.getJobManagerAddress().getPort();
String jobManagerAddress = yarnCluster.getJobManagerAddress().getAddress().getHostAddress() + ":" + yarnCluster.getJobManagerAddress().getPort();
System.out.println("Flink JobManager is now running on " + jobManagerAddress);
System.out.println("JobManager Web Interface: " + yarnCluster.getWebInterfaceURL());
// file that we write into the conf/ dir containing the jobManager address and the dop.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,10 +235,10 @@ public final class ConfigConstants {
public static final String YARN_MAX_FAILED_CONTAINERS = "yarn.maximum-failed-containers";

/**
* Set the number of retries for failed YARN ApplicationMasters/JobManagers.
* This value is usually limited by YARN.
* Set the number of retries for failed YARN ApplicationMasters/JobManagers in high
* availability mode. This value is usually limited by YARN.
*
* By default, its 1.
* By default, it's 1 in the standalone case and 2 in the high availability case.
*/
public static final String YARN_APPLICATION_ATTEMPTS = "yarn.application-attempts";

Expand Down
28 changes: 27 additions & 1 deletion flink-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ under the License.
<version>${curator.version}</version>
<scope>test</scope>
</dependency>

</dependencies>

<build>
Expand Down Expand Up @@ -422,6 +421,33 @@ under the License.
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<id>shade-flink</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<artifactSet>
<includes combine.children="append">
<include>org.apache.flink:flink-shaded-curator</include>
</includes>
</artifactSet>
<relocations combine.children="append">
<relocation>
<pattern>org.apache.curator</pattern>
<shadedPattern>org.apache.flink.shaded.org.apache.curator</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@

package org.apache.flink.runtime.jobmanager;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;

/**
* Recovery mode for Flink's cluster execution. Currently supported modes are:
*
Expand All @@ -29,5 +32,28 @@
*/
public enum RecoveryMode {
STANDALONE,
ZOOKEEPER
ZOOKEEPER;

/**
* Returns true if the defined recovery mode supports high availability.
*
* @param configuration Configuration which contains the recovery mode
* @return true if high availability is supported by the recovery mode, otherwise false
*/
public static boolean isHighAvailabilityModeActivated(Configuration configuration) {
String recoveryMode = configuration.getString(
ConfigConstants.RECOVERY_MODE,
ConfigConstants.DEFAULT_RECOVERY_MODE).toUpperCase();

RecoveryMode mode = RecoveryMode.valueOf(recoveryMode);

switch(mode) {
case STANDALONE:
return false;
case ZOOKEEPER:
return true;
default:
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Mapper;
import akka.dispatch.OnComplete;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
Expand Down Expand Up @@ -95,9 +97,7 @@ public static ActorGateway retrieveLeaderGateway(

Future<ActorGateway> actorGatewayFuture = listener.getActorGatewayFuture();

ActorGateway gateway = Await.result(actorGatewayFuture, timeout);

return gateway;
return Await.result(actorGatewayFuture, timeout);
} catch (Exception e) {
throw new LeaderRetrievalException("Could not retrieve the leader gateway", e);
} finally {
Expand Down Expand Up @@ -131,9 +131,7 @@ public static LeaderConnectionInfo retrieveLeaderConnectionInfo(

Future<LeaderConnectionInfo> connectionInfoFuture = listener.getLeaderConnectionInfoFuture();

LeaderConnectionInfo result = Await.result(connectionInfoFuture, timeout);

return result;
return Await.result(connectionInfoFuture, timeout);
} catch (Exception e) {
throw new LeaderRetrievalException("Could not retrieve the leader address and leader " +
"session ID.", e);
Expand All @@ -160,9 +158,7 @@ public static InetAddress findConnectingAddress(
LOG.info("TaskManager will try to connect for " + timeout +
" before falling back to heuristics");

InetAddress result = listener.findConnectingAddress(timeout);

return result;
return listener.findConnectingAddress(timeout);
} catch (Exception e) {
throw new LeaderRetrievalException("Could not find the connecting address by " +
"connecting to the current leader.", e);
Expand All @@ -183,6 +179,7 @@ public static class LeaderGatewayListener implements LeaderRetrievalListener {

private final ActorSystem actorSystem;
private final FiniteDuration timeout;
private final Object lock = new Object();

private final Promise<ActorGateway> futureActorGateway = new scala.concurrent.impl.Promise.DefaultPromise<ActorGateway>();

Expand All @@ -191,22 +188,36 @@ public LeaderGatewayListener(ActorSystem actorSystem, FiniteDuration timeout) {
this.timeout = timeout;
}

@Override
public void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID) {
if(leaderAddress != null && !leaderAddress.equals("") && !futureActorGateway.isCompleted()) {
try {
ActorRef actorRef = AkkaUtils.getActorRef(leaderAddress, actorSystem, timeout);

ActorGateway gateway = new AkkaActorGateway(actorRef, leaderSessionID);

private void completePromise(ActorGateway gateway) {
synchronized (lock) {
if (!futureActorGateway.isCompleted()) {
futureActorGateway.success(gateway);

} catch(Exception e){
futureActorGateway.failure(e);
}
}
}

@Override
public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) {
if(leaderAddress != null && !leaderAddress.equals("") && !futureActorGateway.isCompleted()) {
AkkaUtils.getActorRefFuture(leaderAddress, actorSystem, timeout)
.map(new Mapper<ActorRef, ActorGateway>() {
public ActorGateway apply(ActorRef ref) {
return new AkkaActorGateway(ref, leaderSessionID);
}
}, actorSystem.dispatcher())
.onComplete(new OnComplete<ActorGateway>() {
@Override
public void onComplete(Throwable failure, ActorGateway success) throws Throwable {
if (failure == null) {
completePromise(success);
} else {
LOG.debug("Could not retrieve the leader for address " + leaderAddress + ".", failure);
}
}
}, actorSystem.dispatcher());
}
}

@Override
public void handleError(Exception exception) {
if (!futureActorGateway.isCompleted()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ trait LeaderSessionMessageFilter extends FlinkActor {
msg: LeaderSessionMessage)
: Unit = {
log.warn(s"Discard message $msg because the expected leader session ID " +
s"$expectedLeaderSessionID did not equal the received leader session ID" +
s"${msg.leaderSessionID}.")
s"$expectedLeaderSessionID did not equal the received leader session ID " +
s"${Option(msg.leaderSessionID)}.")
}

/** Wrap [[RequiresLeaderSessionID]] messages in a [[LeaderSessionMessage]]
Expand Down
Loading

0 comments on commit fa2bb8f

Please sign in to comment.