Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-2291] [runtime] Adds high availability support via ZooKeeper #1016

Merged
merged 1 commit into from
Aug 31, 2015

Conversation

tillrohrmann
Copy link
Contributor

Idea

This PR introduces cluster high availability via ZooKeeper. The idea is to use ZooKeeper to do leader election among a group of registered JobManagers. The elected leader writes his akka connection URL and his assigned leader session ID to ZooKeeper from where the TaskManagers can retrieve it.

Activation

In order to use the high availability mode, one has to select Flinks zookeeper recovery mode and specify a valid ZK quorum. Both is done in the flink-conf.yaml by setting recovery.mode: zookeeper and ha.zookeeper.quorum: address1:2181[,...],addressX:2181 where the zk quorum addresses point to ZooKeeper servers.

Implementation

In order to support HA ZK and also the standalone recovery mode (no HA), this PR introduces the LeaderElectionService and the LeaderRetrievalService. The former service is used by leader contenders to be elected as the leader. The latter is used to obtain the address of the current leader. In standalone mode (StandaloneLeaderElectionService, StandaloneLeaderRetrievalService), these services just return the JobManager address which was found in the Flink configuration. With ZooKeeper, the services use the Curator framework to connect to the ZooKeeper quorum to do leader election and to read the ZkNode which contains the information of the current leader.

In the wake of introducing these services, the FlinkMiniCluster was also adapted to support HA with ZooKeeper. The ForkableFlinkMiniCluster starts automatically a ZK TestingCluster if recovery.mode was set to zookeeper in the provided configuration and if the ha.zookeeper.quorum was not set. Due to the refactorings it is now necessary to explicitly start a FlinkMiniCluster with the start() method after creating it.

A ListeningBehaviour for the SubmitJob message is introduced, too. The listening behaviour defines to what events the JobClientActor will be listening/what he expects from the JobManager to be sent. There are currently three different modes supported:

  • DETACHED: Starts a job in detached mode. The JobManager will only send the Acknowledge message as the response to the SubmitJob message.
  • EXECUTION_RESULT: Starts a job in non-detached mode. The JobClientActor waits to receive the SerializedJobExecutionResult from the JobManager. The execution result is sent to the JobClientActor after the job has entered a terminal state.
  • EXECUTION_RESULT_AND_STATE_CHANGES: Starts a job in non-detached mode. Additionally to the final SerializedJobExecutionResult, the JobClientActor wants to receive all intermediate state change notifications.

Since hadoop-2.6.0 uses an incompatible org.apache.curator version than we do, we additionally shade the org.apache.curator package in the flink-shaded-hadoop jar.

The current tooling to start a ZooKeeper quorum was extended so that the masters file not only specifies the hosts on which a JobManager shall be started, but also the port of the web UI. The format is the following: <host>:<webUIPort>.

In order to also support the automatic network interface retrieval to which the TaskManager should bind, the LeaderRetrievalUtils offer a method findConnectingAddress. It works similarly to the NetUtils.findConnectingAddress with the difference that it can react to changing leaders. In case of a leader change, the newly elected JobManager will be used as the connection target to retrieve the correct network interface.

Limitations

Currently in HA ZK mode, one web server is started per JobManager. The reason for not having a dedicated web server is that parts of the information it requires are not serializable yet. Thus, each JobManager has a local web server showing its own state. Since the web servers are running dedicatedly for one JobManager, they don't know about the current leader session ID. Therefore, it is not possible to cancel jobs via the web servers. This is because a CancelJob message is a RequiresLeaderSessionMessage.

In case of an exception in the LeaderElectionService and the LeaderRetrievalService, the Contender or the LeaderRetrievalListener will be informed about the occurred exception. In the current implementation, the JobManager (implementing the Contender interface) and the TaskManager (implementing the LeaderRetrievalListener) will log the exception and then kill themselves. This might be changed to a policy where the respective manager tries to restart the service.

The JobClientActor cannot handle changing leaders, yet. In case that the current leader dies, the JobClientActor will receive a Terminated message because it is watching the current leader. Upon receiving this message, the JobClientActor sends a Failure message to the submitter. In the future, when a currently executed job is restarted on a newly elected leader, the JobClientActor should automatically reconnect to the new leader and receive the status update messages and the job execution result from it.

This PR, does not contain HA for YARN, even though leader election and address retrieval with ZooKeeper also works with it. The problem is the ApplicationMaster which would have to replicated as well. Alternatively, we could also rely on YARN to simply start a new ApplicationMaster and a new JobManager once it detects that the current JobManager has died.

Since this PR touches a lot of files, a close review, as far as possible, would be helpful.

@StephanEwen
Copy link
Contributor

Big piece of work.

I'd like to have a look at this, but it may take a few days...

@StephanEwen
Copy link
Contributor

From a first glance this looks super nice! Very excited to get this in and try it out seriously on a cluster!

Good day, today :-)

@uce
Copy link
Contributor

uce commented Aug 19, 2015

Very nice, Till. I will rebase my changes for FLINK-2354 on this. Your description in this PR and the docs were already very helpful in this regard.

I got delayed last week with FLINK-2354 and some other critical issues came up in the meantime. I will look at the critical issues first and then catch up and review this as well.

@StephanEwen
Copy link
Contributor

Looks very good. Minor comments that we may address after this pull request:

  • The Flink Mini cluster becomes tricky, the configurations ever more intransparent. This could use a rework.
  • You shade curator in Hadoop, but not in Flink. Do we expect collisions with other systems that use Curator, like newer versions of the Kafka consumers? (IIRC 0.8.3 starts using Curator).

@StephanEwen
Copy link
Contributor

+1 to merge, we should follow up on the Mini cluster and Curator shading separately

@tillrohrmann
Copy link
Contributor Author

Ok, thanks for your feedback @StephanEwen and @uce. I'll rebase on the latest master and then merge it. I've created two issues [1, 2] to track the problems Stephan mentioned.

[1] https://issues.apache.org/jira/browse/FLINK-2592
[2] https://issues.apache.org/jira/browse/FLINK-2593

tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Aug 28, 2015
… set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.

Refactors FlinkMiniCluster to support multiple JobManager

Adds proper remote address resolution for actors

Clean up of LeaderElection and LeaderRetrievalService. Removes synchronization to avoid deadlock.

Adds ZooKeeper start option to TestBaseUtils.startCluster

Removes registration session IDs, using the leader session IDs instead. Sets the leader session ID
 directly in the grantLeadership method. Let the LeaderElectionService select the leader session I
D. Return leader session ID to LeaderRetrievalListeners.

Removes direct ActorRef interaction

Introduces LeaderRetrievalService for the Client and the CliFrontend.

Make ApplicationClient to use the LeaderRetrievalService for JobManager resolution

Adds LeaderElection/Retrieval tests

Added test for exception forwarding from the CuratorFramework to a Contender

Adds test job submission with changing leaders

Adds new test cases for job cleanup after leader election change

Adds new LeaderChangeStateCleanup test case

Adds LeaderElectionRetrievalTestingCluster

Introduces ListeningBehaviour for job submissions

Relocation of org.apache.curator in flink-shaded-hadoop jar

Adds Apache ZooKeeper and Apache Curator to LICENSE and NOTICE files

Increases zookeeper connection timeout to 20000 ms for the KafkaITCase to fix failing tests on Travis

Increased timeouts of ZooKeeperLeaderElectionTest for Travis

Makes the WebInfoServer and the WebRuntimeMonitor to use the LeaderRetrievalService to retrieve the current leading JobManager

Adds proper synchronization to ZooKeeperLeaderElectionService. Fixes StateCheckpointedITCase and PartitionedStateCheckpointingITCase

Adds configuration description for new ZooKeeper configuration values

Fixed port selection of JobManager at startup

Improves logging output

Extends masters file to also specify the webui ports

Adds proper network interface resolution by retrieving the current leader address

Makes the ZooKeeperLeaderElectionService write the leader information in ephemeral nodes so that the information is deleted once the leader has terminated. Fixes a bug in the TaskManager due to call by name semantics of scheduler.scheduleOnce.

Adds jobManagerURL to TriggerTaskManagerRegistration message

Enables findConnectingAddress to use the ZooKeeperLeaderRetrievalService. This allows to test the connection to a possibly changing master node.

Changes startup scripts to respect the recovery mode instead of the ZK_QUORUM

Adjust travis log file to only log zookeeper errors

Updates high availability setup guide

Adds TestLogger to leader election tests

This closes apache#1016.
tillrohrmann added a commit to tillrohrmann/flink that referenced this pull request Aug 31, 2015
… set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.

Refactors FlinkMiniCluster to support multiple JobManager

Adds proper remote address resolution for actors

Clean up of LeaderElection and LeaderRetrievalService. Removes synchronization to avoid deadlock.

Adds ZooKeeper start option to TestBaseUtils.startCluster

Removes registration session IDs, using the leader session IDs instead. Sets the leader session ID
 directly in the grantLeadership method. Let the LeaderElectionService select the leader session I
D. Return leader session ID to LeaderRetrievalListeners.

Removes direct ActorRef interaction

Introduces LeaderRetrievalService for the Client and the CliFrontend.

Make ApplicationClient to use the LeaderRetrievalService for JobManager resolution

Adds LeaderElection/Retrieval tests

Added test for exception forwarding from the CuratorFramework to a Contender

Adds test job submission with changing leaders

Adds new test cases for job cleanup after leader election change

Adds new LeaderChangeStateCleanup test case

Adds LeaderElectionRetrievalTestingCluster

Introduces ListeningBehaviour for job submissions

Relocation of org.apache.curator in flink-shaded-hadoop jar

Adds Apache ZooKeeper and Apache Curator to LICENSE and NOTICE files

Increases zookeeper connection timeout to 20000 ms for the KafkaITCase to fix failing tests on Travis

Increased timeouts of ZooKeeperLeaderElectionTest for Travis

Makes the WebInfoServer and the WebRuntimeMonitor to use the LeaderRetrievalService to retrieve the current leading JobManager

Adds proper synchronization to ZooKeeperLeaderElectionService. Fixes StateCheckpointedITCase and PartitionedStateCheckpointingITCase

Adds configuration description for new ZooKeeper configuration values

Fixed port selection of JobManager at startup

Improves logging output

Extends masters file to also specify the webui ports

Adds proper network interface resolution by retrieving the current leader address

Makes the ZooKeeperLeaderElectionService write the leader information in ephemeral nodes so that the information is deleted once the leader has terminated. Fixes a bug in the TaskManager due to call by name semantics of scheduler.scheduleOnce.

Adds jobManagerURL to TriggerTaskManagerRegistration message

Enables findConnectingAddress to use the ZooKeeperLeaderRetrievalService. This allows to test the connection to a possibly changing master node.

Changes startup scripts to respect the recovery mode instead of the ZK_QUORUM

Adjust travis log file to only log zookeeper errors

Updates high availability setup guide

Adds TestLogger to leader election tests

This closes apache#1016.
… set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.

Refactors FlinkMiniCluster to support multiple JobManager

Adds proper remote address resolution for actors

Clean up of LeaderElection and LeaderRetrievalService. Removes synchronization to avoid deadlock.

Adds ZooKeeper start option to TestBaseUtils.startCluster

Removes registration session IDs, using the leader session IDs instead. Sets the leader session ID
 directly in the grantLeadership method. Let the LeaderElectionService select the leader session I
D. Return leader session ID to LeaderRetrievalListeners.

Removes direct ActorRef interaction

Introduces LeaderRetrievalService for the Client and the CliFrontend.

Make ApplicationClient to use the LeaderRetrievalService for JobManager resolution

Adds LeaderElection/Retrieval tests

Added test for exception forwarding from the CuratorFramework to a Contender

Adds test job submission with changing leaders

Adds new test cases for job cleanup after leader election change

Adds new LeaderChangeStateCleanup test case

Adds LeaderElectionRetrievalTestingCluster

Introduces ListeningBehaviour for job submissions

Relocation of org.apache.curator in flink-shaded-hadoop jar

Adds Apache ZooKeeper and Apache Curator to LICENSE and NOTICE files

Increases zookeeper connection timeout to 20000 ms for the KafkaITCase to fix failing tests on Travis

Increased timeouts of ZooKeeperLeaderElectionTest for Travis

Makes the WebInfoServer and the WebRuntimeMonitor to use the LeaderRetrievalService to retrieve the current leading JobManager

Adds proper synchronization to ZooKeeperLeaderElectionService. Fixes StateCheckpointedITCase and PartitionedStateCheckpointingITCase

Adds configuration description for new ZooKeeper configuration values

Fixed port selection of JobManager at startup

Improves logging output

Extends masters file to also specify the webui ports

Adds proper network interface resolution by retrieving the current leader address

Makes the ZooKeeperLeaderElectionService write the leader information in ephemeral nodes so that the information is deleted once the leader has terminated. Fixes a bug in the TaskManager due to call by name semantics of scheduler.scheduleOnce.

Adds jobManagerURL to TriggerTaskManagerRegistration message

Enables findConnectingAddress to use the ZooKeeperLeaderRetrievalService. This allows to test the connection to a possibly changing master node.

Changes startup scripts to respect the recovery mode instead of the ZK_QUORUM

Adjust travis log file to only log zookeeper errors

Updates high availability setup guide

Adds TestLogger to leader election tests

This closes apache#1016.
@tillrohrmann
Copy link
Contributor Author

Failing test is a Yarn test. Locally Travis passed. Will merge it now.

@asfgit asfgit merged commit b9de4ed into apache:master Aug 31, 2015
nikste pushed a commit to nikste/flink that referenced this pull request Sep 29, 2015
… set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers.

Refactors FlinkMiniCluster to support multiple JobManager

Adds proper remote address resolution for actors

Clean up of LeaderElection and LeaderRetrievalService. Removes synchronization to avoid deadlock.

Adds ZooKeeper start option to TestBaseUtils.startCluster

Removes registration session IDs, using the leader session IDs instead. Sets the leader session ID
 directly in the grantLeadership method. Let the LeaderElectionService select the leader session I
D. Return leader session ID to LeaderRetrievalListeners.

Removes direct ActorRef interaction

Introduces LeaderRetrievalService for the Client and the CliFrontend.

Make ApplicationClient to use the LeaderRetrievalService for JobManager resolution

Adds LeaderElection/Retrieval tests

Added test for exception forwarding from the CuratorFramework to a Contender

Adds test job submission with changing leaders

Adds new test cases for job cleanup after leader election change

Adds new LeaderChangeStateCleanup test case

Adds LeaderElectionRetrievalTestingCluster

Introduces ListeningBehaviour for job submissions

Relocation of org.apache.curator in flink-shaded-hadoop jar

Adds Apache ZooKeeper and Apache Curator to LICENSE and NOTICE files

Increases zookeeper connection timeout to 20000 ms for the KafkaITCase to fix failing tests on Travis

Increased timeouts of ZooKeeperLeaderElectionTest for Travis

Makes the WebInfoServer and the WebRuntimeMonitor to use the LeaderRetrievalService to retrieve the current leading JobManager

Adds proper synchronization to ZooKeeperLeaderElectionService. Fixes StateCheckpointedITCase and PartitionedStateCheckpointingITCase

Adds configuration description for new ZooKeeper configuration values

Fixed port selection of JobManager at startup

Improves logging output

Extends masters file to also specify the webui ports

Adds proper network interface resolution by retrieving the current leader address

Makes the ZooKeeperLeaderElectionService write the leader information in ephemeral nodes so that the information is deleted once the leader has terminated. Fixes a bug in the TaskManager due to call by name semantics of scheduler.scheduleOnce.

Adds jobManagerURL to TriggerTaskManagerRegistration message

Enables findConnectingAddress to use the ZooKeeperLeaderRetrievalService. This allows to test the connection to a possibly changing master node.

Changes startup scripts to respect the recovery mode instead of the ZK_QUORUM

Adjust travis log file to only log zookeeper errors

Updates high availability setup guide

Adds TestLogger to leader election tests

This closes apache#1016.
pnowojski pushed a commit that referenced this pull request Jul 3, 2024
* [MATRIX-241] API changes for Gemini models

* spotless
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants