Skip to content

Commit

Permalink
[FLINK-2291] [runtime] Add ZooKeeper support to elect a leader from a…
Browse files Browse the repository at this point in the history
… set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.

Refactors FlinkMiniCluster to support multiple JobManager

Adds proper remote address resolution for actors

Clean up of LeaderElection and LeaderRetrievalService. Removes synchronization to avoid deadlock.

Adds ZooKeeper start option to TestBaseUtils.startCluster

Removes registration session IDs, using the leader session IDs instead. Sets the leader session ID
 directly in the grantLeadership method. Let the LeaderElectionService select the leader session I
D. Return leader session ID to LeaderRetrievalListeners.

Removes direct ActorRef interaction

Introduces LeaderRetrievalService for the Client and the CliFrontend.

Make ApplicationClient to use the LeaderRetrievalService for JobManager resolution

Adds LeaderElection/Retrieval tests

Added test for exception forwarding from the CuratorFramework to a Contender

Adds test job submission with changing leaders

Adds new test cases for job cleanup after leader election change

Adds new LeaderChangeStateCleanup test case

Adds LeaderElectionRetrievalTestingCluster

Introduces ListeningBehaviour for job submissions

Relocation of org.apache.curator in flink-shaded-hadoop jar

Adds Apache ZooKeeper and Apache Curator to LICENSE and NOTICE files

Increases zookeeper connection timeout to 20000 ms for the KafkaITCase to fix failing tests on Travis

Increased timeouts of ZooKeeperLeaderElectionTest for Travis

Makes the WebInfoServer and the WebRuntimeMonitor to use the LeaderRetrievalService to retrieve the current leading JobManager

Adds proper synchronization to ZooKeeperLeaderElectionService. Fixes StateCheckpointedITCase and PartitionedStateCheckpointingITCase

Adds configuration description for new ZooKeeper configuration values

Fixed port selection of JobManager at startup

Improves logging output

Extends masters file to also specify the webui ports

Adds proper network interface resolution by retrieving the current leader address

Makes the ZooKeeperLeaderElectionService write the leader information in ephemeral nodes so that the information is deleted once the leader has terminated. Fixes a bug in the TaskManager due to call by name semantics of scheduler.scheduleOnce.

Adds jobManagerURL to TriggerTaskManagerRegistration message

Enables findConnectingAddress to use the ZooKeeperLeaderRetrievalService. This allows to test the connection to a possibly changing master node.

Changes startup scripts to respect the recovery mode instead of the ZK_QUORUM

Adjust travis log file to only log zookeeper errors

Updates high availability setup guide

Adds TestLogger to leader election tests

This closes apache#1016.
  • Loading branch information
tillrohrmann committed Aug 31, 2015
1 parent 0858d9f commit b9de4ed
Show file tree
Hide file tree
Showing 187 changed files with 7,997 additions and 2,687 deletions.
25 changes: 25 additions & 0 deletions docs/setup/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,31 @@ so that the Flink client is able to pick those details up. This configuration pa
changing the default location of that file (for example for environments sharing a Flink
installation between users)

## High Availability Mode

- `recovery.mode`: (Default 'standalone') Defines the recovery mode used for the cluster execution. Currently,
Flink supports the 'standalone' mode where only a single JobManager runs and no JobManager state is checkpointed.
The high availability mode 'zookeeper' supports the execution of multiple JobManagers and JobManager state checkpointing.
Among the group of JobManagers, ZooKeeper elects one of them as the leader which is responsible for the cluster execution.
In case of a JobManager failure, a standby JobManager will be elected as the new leader and is given the last checkpointed JobManager state.
In order to use the 'zookeeper' mode, it is mandatory to also define the `ha.zookeeper.quorum` configuration value.

- `ha.zookeeper.quorum`: Defines the ZooKeeper quorum URL which is used to connet to the ZooKeeper cluster when the 'zookeeper' recovery mode is selected

- `ha.zookeeper.dir`: (Default '/flink') Defines the root dir under which the ZooKeeper recovery mode will create znodes.

- `ha.zookeeper.dir.latch`: (Default '/leaderlatch') Defines the znode of the leader latch which is used to elect the leader.

- `ha.zookeeper.dir.leader`: (Default '/leader') Defines the znode of the leader which contains the URL to the leader and the current leader session ID

- `ha.zookeeper.client.session-timeout`: (Default '60000') Defines the session timeout for the ZooKeeper session in ms.

- `ha.zookeeper.client.connection-timeout`: (Default '15000') Defines the connection timeout for ZooKeeper in ms.

- `ha.zookeeper.client.retry-wait`: (Default '5000') Defines the pause between consecutive retries in ms.

- `ha.zookeeper.client.max-retry-attempts`: (Default '3') Defines the number of connection retries before the client gives up.

## Background

### Configuring the Network Buffers
Expand Down
28 changes: 19 additions & 9 deletions docs/setup/jobmanager_high_availability.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,19 @@ As an example, consider the following setup with three JobManager instances:

## Configuration

To enable JobManager High Availability you have to configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts.
To enable JobManager High Availability you have to set the **recovery mode** to *zookeeper*, configure a **ZooKeeper quorum** and set up a **masters file** with all JobManagers hosts and their web ui ports.

Flink leverages **[ZooKeeper](http:https://zookeeper.apache.org)** for *distributed coordination* between all running JobManager instances. ZooKeeper is a separate service from Flink, which provides highly reliable distirbuted coordination via leader election and light-weight consistent state storage. Check out [ZooKeeper's Getting Started Guide](http:https://zookeeper.apache.org/doc/trunk/zookeeperStarted.html) for more information about ZooKeeper.

Configuring a ZooKeeper quorum in `conf/flink-conf.yaml` *enables* high availability mode and all Flink components try to connect to a JobManager via coordination through ZooKeeper.
Setting Flink's **recovery mode** to *zookeeper* in `conf/flink-conf.yaml` *enables* high availability mode.

Additionally, you have to configure a **ZooKeeper quorum** in the same configuration file.

In high availabliity mode, all Flink components try to connect to a JobManager via coordination through ZooKeeper.

- **Recovery mode** (required): The *recovery mode* has to be set in `conf/flink-conf.yaml` to *zookeeper* in order to enable high availability mode.

<pre>recovery.mode: zookeeper</pre>

- **ZooKeeper quorum** (required): A *ZooKeeper quorum* is a replicated group of ZooKeeper servers, which provide the distributed coordination service.

Expand All @@ -55,12 +63,12 @@ Configuring a ZooKeeper quorum in `conf/flink-conf.yaml` *enables* high availabi

In order to start an HA-cluster configure the *masters* file in `conf/masters`:

- **masters file**: The *masters file* contains all hosts, on which JobManagers are started.
- **masters file**: The *masters file* contains all hosts, on which JobManagers are started, and the ports to which the web user interface binds.

<pre>
jobManagerAddress1
jobManagerAddress1:webUIPort1
[...]
jobManagerAddressX
jobManagerAddressX:webUIPortX
</pre>

After configuring the masters and the ZooKeeper quorum, you can use the provided cluster startup scripts as usual. They will start a HA-cluster. **Keep in mind that the ZooKeeper quorum has to be running when you call the scripts**.
Expand All @@ -81,15 +89,17 @@ The script `bin/start-zookeeper-quorum.sh` will start a ZooKeeper server on each

## Example: Start and stop a local HA-cluster with 2 JobManagers

1. **Configure ZooKeeper quorum** in `conf/flink.yaml`:
1. **Configure recovery mode and ZooKeeper quorum** in `conf/flink.yaml`:

<pre>ha.zookeeper.quorum: localhost</pre>
<pre>
recovery.mode: zookeeper
ha.zookeeper.quorum: localhost</pre>

2. **Configure masters** in `conf/masters`:

<pre>
localhost
localhost</pre>
localhost:8081
localhost:8082</pre>

3. **Configure ZooKeeper server** in `conf/zoo.cfg` (currently it's only possible to run a single ZooKeeper server per machine):

Expand Down
8 changes: 8 additions & 0 deletions flink-clients/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,14 @@ under the License.
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
Expand Down
138 changes: 84 additions & 54 deletions flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import java.util.Map;
import java.util.Properties;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;

import org.apache.commons.cli.CommandLine;
Expand Down Expand Up @@ -64,14 +63,15 @@
import org.apache.flink.optimizer.plandump.PlanJSONDumpGenerator;
import org.apache.flink.runtime.client.JobStatusMessage;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.apache.flink.runtime.security.SecurityUtils;
import org.apache.flink.runtime.util.EnvironmentInformation;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnClient;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobmanager.JobManager;
import org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
import org.apache.flink.runtime.messages.JobManagerMessages.RunningJobsStatus;
import org.apache.flink.runtime.yarn.AbstractFlinkYarnCluster;
Expand Down Expand Up @@ -129,8 +129,6 @@ public class CliFrontend {

private final FiniteDuration lookupTimeout;

private InetSocketAddress jobManagerAddress;

private ActorSystem actorSystem;

private AbstractFlinkYarnCluster yarnCluster;
Expand Down Expand Up @@ -202,9 +200,12 @@ public CliFrontend(String configDir) throws Exception {

// get the JobManager address from the YARN properties
String address = yarnProperties.getProperty(YARN_PROPERTIES_JOBMANAGER_KEY);
InetSocketAddress jobManagerAddress;
if (address != null) {
try {
jobManagerAddress = parseJobManagerAddress(address);
jobManagerAddress = parseHostPortAddress(address);
// store address in config from where it is retrieved by the retrieval service
writeJobManagerAddressToConfig(jobManagerAddress);
}
catch (Exception e) {
throw new Exception("YARN properties contain an invalid entry for JobManager address.", e);
Expand All @@ -226,6 +227,24 @@ public CliFrontend(String configDir) throws Exception {
}


// --------------------------------------------------------------------------------------------
// Getter & Setter
// --------------------------------------------------------------------------------------------

/**
* Getter which returns a copy of the associated configuration
*
* @return Copy of the associated configuration
*/
public Configuration getConfiguration() {
Configuration copiedConfiguration = new Configuration();

copiedConfiguration.addAll(config);

return copiedConfiguration;
}


// --------------------------------------------------------------------------------------------
// Execute Actions
// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -688,42 +707,26 @@ else if (!jarFile.isFile()) {
new PackagedProgram(jarFile, entryPointClass, programArgs);
}

protected InetSocketAddress getJobManagerAddress(CommandLineOptions options) throws Exception {

// first, check if the address is specified as an option
if (options.getJobManagerAddress() != null) {
return parseJobManagerAddress(options.getJobManagerAddress());
}

// second, check whether the address was already parsed, or configured through the YARN properties
if (jobManagerAddress == null) {
// config file must have the address
String jobManagerHost = config.getString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, null);

// verify that there is a jobmanager address and port in the configuration
if (jobManagerHost == null) {
throw new Exception("Found no configuration in the config directory '" + configDirectory
+ "' that specifies the JobManager address.");
}

int jobManagerPort;
try {
jobManagerPort = config.getInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, -1);
}
catch (NumberFormatException e) {
throw new Exception("Invalid value for the JobManager port (" +
ConfigConstants.JOB_MANAGER_IPC_PORT_KEY + ") in the configuration.");
}

if (jobManagerPort == -1) {
throw new Exception("Found no configuration in the config directory '" + configDirectory
+ "' that specifies the JobManager port.");
}
/**
* Writes the given job manager address to the associated configuration object
*
* @param address Address to write to the configuration
*/
protected void writeJobManagerAddressToConfig(InetSocketAddress address) {
config.setString(ConfigConstants.JOB_MANAGER_IPC_ADDRESS_KEY, address.getHostName());
config.setInteger(ConfigConstants.JOB_MANAGER_IPC_PORT_KEY, address.getPort());
}

jobManagerAddress = new InetSocketAddress(jobManagerHost, jobManagerPort);
/**
* Updates the associated configuration with the given command line options
*
* @param options Command line options
*/
protected void updateConfig(CommandLineOptions options) {
if(options.getJobManagerAddress() != null){
InetSocketAddress jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
writeJobManagerAddressToConfig(jobManagerAddress);
}

return jobManagerAddress;
}

/**
Expand All @@ -735,16 +738,16 @@ protected InetSocketAddress getJobManagerAddress(CommandLineOptions options) thr
* @throws Exception
*/
protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws Exception {
//TODO: Get ActorRef from YarnCluster if we are in YARN mode.

InetSocketAddress address = getJobManagerAddress(options);
// overwrite config values with given command line options
updateConfig(options);

// start an actor system if needed
if (this.actorSystem == null) {
LOG.info("Starting actor system to communicate with JobManager");
try {
scala.Tuple2<String, Object> systemEndpoint = new scala.Tuple2<String, Object>("", 0);
this.actorSystem = AkkaUtils.createActorSystem(config,
this.actorSystem = AkkaUtils.createActorSystem(
config,
new Some<scala.Tuple2<String, Object>>(systemEndpoint));
}
catch (Exception e) {
Expand All @@ -754,20 +757,33 @@ protected ActorGateway getJobManagerGateway(CommandLineOptions options) throws E
LOG.info("Actor system successfully started");
}

LOG.info("Trying to lookup JobManager");
ActorRef jmActor = JobManager.getJobManagerRemoteReference(address, actorSystem, lookupTimeout);
LOG.info("JobManager is at " + jmActor.path());
LOG.info("Trying to lookup the JobManager gateway");
// Retrieve the ActorGateway from the LeaderRetrievalService
LeaderRetrievalService lrs = LeaderRetrievalUtils.createLeaderRetrievalService(config);

// Retrieve the ActorGateway from the JobManager's ActorRef
return JobManager.getJobManagerGateway(jmActor, lookupTimeout);
return LeaderRetrievalUtils.retrieveLeaderGateway(lrs, actorSystem, lookupTimeout);
}

/**
* @param userParallelism The parallelism requested by the user in the CLI frontend.
* Retrieves a {@link Client} object from the given command line options and other parameters.
*
* @param options Command line options which contain JobManager address
* @param classLoader Class loader to use by the Client
* @param programName Program name
* @param userParallelism Given user parallelism
* @return
* @throws Exception
*/
protected Client getClient(CommandLineOptions options, ClassLoader classLoader, String programName, int userParallelism) throws Exception {
InetSocketAddress jobManagerAddress;
protected Client getClient(
CommandLineOptions options,
ClassLoader classLoader,
String programName,
int userParallelism)
throws Exception {
InetSocketAddress jobManagerAddress = null;

int maxSlots = -1;

if (YARN_DEPLOY_JOBMANAGER.equals(options.getJobManagerAddress())) {
logAndSysout("YARN cluster mode detected. Switching Log4j output to console");

Expand Down Expand Up @@ -830,9 +846,16 @@ protected Client getClient(CommandLineOptions options, ClassLoader classLoader,
}
}
else {
jobManagerAddress = getJobManagerAddress(options);
if(options.getJobManagerAddress() != null) {
jobManagerAddress = parseHostPortAddress(options.getJobManagerAddress());
}
}

if(jobManagerAddress != null) {
writeJobManagerAddressToConfig(jobManagerAddress);
}
return new Client(jobManagerAddress, config, classLoader, maxSlots);

return new Client(config, classLoader, maxSlots);
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -992,7 +1015,14 @@ public static void main(String[] args) {
// Miscellaneous Utilities
// --------------------------------------------------------------------------------------------

private static InetSocketAddress parseJobManagerAddress(String hostAndPort) {
/**
* Parses a given host port address of the format URL:PORT and returns an {@link InetSocketAddress}
*
* @param hostAndPort host port string to be parsed
* @return InetSocketAddress object containing the parsed host port information
*/
private static InetSocketAddress parseHostPortAddress(String hostAndPort) {
// code taken from http:https://stackoverflow.com/questions/2345063/java-common-way-to-validate-and-convert-hostport-to-inetsocketaddress
URI uri;
try {
uri = new URI("my:https://" + hostAndPort);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ public void start() throws Exception {
}
// start it up
this.flink = new LocalFlinkMiniCluster(configuration, true);
this.flink.start();
} else {
throw new IllegalStateException("The local executor was already started.");
}
Expand Down Expand Up @@ -168,7 +169,7 @@ public JobExecutionResult executePlan(Plan plan) throws Exception {
}

try {
Optimizer pc = new Optimizer(new DataStatistics(), this.flink.getConfiguration());
Optimizer pc = new Optimizer(new DataStatistics(), this.flink.configuration());
OptimizedPlan op = pc.compile(plan);

JobGraphGenerator jgg = new JobGraphGenerator();
Expand Down Expand Up @@ -251,7 +252,7 @@ public static String optimizerPlanAsJSON(Plan plan) throws Exception {
LocalExecutor exec = new LocalExecutor();
try {
exec.start();
Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.getConfiguration());
Optimizer pc = new Optimizer(new DataStatistics(), exec.flink.configuration());
OptimizedPlan op = pc.compile(plan);
PlanJSONDumpGenerator gen = new PlanJSONDumpGenerator();

Expand Down
Loading

0 comments on commit b9de4ed

Please sign in to comment.