-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-2291] [runtime] Adds high availability support via ZooKeeper #1016
Conversation
Big piece of work. I'd like to have a look at this, but it may take a few days... |
From a first glance this looks super nice! Very excited to get this in and try it out seriously on a cluster! Good day, today :-) |
Very nice, Till. I will rebase my changes for FLINK-2354 on this. Your description in this PR and the docs were already very helpful in this regard. I got delayed last week with FLINK-2354 and some other critical issues came up in the meantime. I will look at the critical issues first and then catch up and review this as well. |
Looks very good. Minor comments that we may address after this pull request:
|
+1 to merge, we should follow up on the Mini cluster and Curator shading separately |
Ok, thanks for your feedback @StephanEwen and @uce. I'll rebase on the latest master and then merge it. I've created two issues [1, 2] to track the problems Stephan mentioned. [1] https://issues.apache.org/jira/browse/FLINK-2592 |
682bf6f
to
4b0ce08
Compare
… set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers. Refactors FlinkMiniCluster to support multiple JobManager Adds proper remote address resolution for actors Clean up of LeaderElection and LeaderRetrievalService. Removes synchronization to avoid deadlock. Adds ZooKeeper start option to TestBaseUtils.startCluster Removes registration session IDs, using the leader session IDs instead. Sets the leader session ID directly in the grantLeadership method. Let the LeaderElectionService select the leader session I D. Return leader session ID to LeaderRetrievalListeners. Removes direct ActorRef interaction Introduces LeaderRetrievalService for the Client and the CliFrontend. Make ApplicationClient to use the LeaderRetrievalService for JobManager resolution Adds LeaderElection/Retrieval tests Added test for exception forwarding from the CuratorFramework to a Contender Adds test job submission with changing leaders Adds new test cases for job cleanup after leader election change Adds new LeaderChangeStateCleanup test case Adds LeaderElectionRetrievalTestingCluster Introduces ListeningBehaviour for job submissions Relocation of org.apache.curator in flink-shaded-hadoop jar Adds Apache ZooKeeper and Apache Curator to LICENSE and NOTICE files Increases zookeeper connection timeout to 20000 ms for the KafkaITCase to fix failing tests on Travis Increased timeouts of ZooKeeperLeaderElectionTest for Travis Makes the WebInfoServer and the WebRuntimeMonitor to use the LeaderRetrievalService to retrieve the current leading JobManager Adds proper synchronization to ZooKeeperLeaderElectionService. Fixes StateCheckpointedITCase and PartitionedStateCheckpointingITCase Adds configuration description for new ZooKeeper configuration values Fixed port selection of JobManager at startup Improves logging output Extends masters file to also specify the webui ports Adds proper network interface resolution by retrieving the current leader address Makes the ZooKeeperLeaderElectionService write the leader information in ephemeral nodes so that the information is deleted once the leader has terminated. Fixes a bug in the TaskManager due to call by name semantics of scheduler.scheduleOnce. Adds jobManagerURL to TriggerTaskManagerRegistration message Enables findConnectingAddress to use the ZooKeeperLeaderRetrievalService. This allows to test the connection to a possibly changing master node. Changes startup scripts to respect the recovery mode instead of the ZK_QUORUM Adjust travis log file to only log zookeeper errors Updates high availability setup guide Adds TestLogger to leader election tests This closes apache#1016.
… set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers. Refactors FlinkMiniCluster to support multiple JobManager Adds proper remote address resolution for actors Clean up of LeaderElection and LeaderRetrievalService. Removes synchronization to avoid deadlock. Adds ZooKeeper start option to TestBaseUtils.startCluster Removes registration session IDs, using the leader session IDs instead. Sets the leader session ID directly in the grantLeadership method. Let the LeaderElectionService select the leader session I D. Return leader session ID to LeaderRetrievalListeners. Removes direct ActorRef interaction Introduces LeaderRetrievalService for the Client and the CliFrontend. Make ApplicationClient to use the LeaderRetrievalService for JobManager resolution Adds LeaderElection/Retrieval tests Added test for exception forwarding from the CuratorFramework to a Contender Adds test job submission with changing leaders Adds new test cases for job cleanup after leader election change Adds new LeaderChangeStateCleanup test case Adds LeaderElectionRetrievalTestingCluster Introduces ListeningBehaviour for job submissions Relocation of org.apache.curator in flink-shaded-hadoop jar Adds Apache ZooKeeper and Apache Curator to LICENSE and NOTICE files Increases zookeeper connection timeout to 20000 ms for the KafkaITCase to fix failing tests on Travis Increased timeouts of ZooKeeperLeaderElectionTest for Travis Makes the WebInfoServer and the WebRuntimeMonitor to use the LeaderRetrievalService to retrieve the current leading JobManager Adds proper synchronization to ZooKeeperLeaderElectionService. Fixes StateCheckpointedITCase and PartitionedStateCheckpointingITCase Adds configuration description for new ZooKeeper configuration values Fixed port selection of JobManager at startup Improves logging output Extends masters file to also specify the webui ports Adds proper network interface resolution by retrieving the current leader address Makes the ZooKeeperLeaderElectionService write the leader information in ephemeral nodes so that the information is deleted once the leader has terminated. Fixes a bug in the TaskManager due to call by name semantics of scheduler.scheduleOnce. Adds jobManagerURL to TriggerTaskManagerRegistration message Enables findConnectingAddress to use the ZooKeeperLeaderRetrievalService. This allows to test the connection to a possibly changing master node. Changes startup scripts to respect the recovery mode instead of the ZK_QUORUM Adjust travis log file to only log zookeeper errors Updates high availability setup guide Adds TestLogger to leader election tests This closes apache#1016.
… set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers. Refactors FlinkMiniCluster to support multiple JobManager Adds proper remote address resolution for actors Clean up of LeaderElection and LeaderRetrievalService. Removes synchronization to avoid deadlock. Adds ZooKeeper start option to TestBaseUtils.startCluster Removes registration session IDs, using the leader session IDs instead. Sets the leader session ID directly in the grantLeadership method. Let the LeaderElectionService select the leader session I D. Return leader session ID to LeaderRetrievalListeners. Removes direct ActorRef interaction Introduces LeaderRetrievalService for the Client and the CliFrontend. Make ApplicationClient to use the LeaderRetrievalService for JobManager resolution Adds LeaderElection/Retrieval tests Added test for exception forwarding from the CuratorFramework to a Contender Adds test job submission with changing leaders Adds new test cases for job cleanup after leader election change Adds new LeaderChangeStateCleanup test case Adds LeaderElectionRetrievalTestingCluster Introduces ListeningBehaviour for job submissions Relocation of org.apache.curator in flink-shaded-hadoop jar Adds Apache ZooKeeper and Apache Curator to LICENSE and NOTICE files Increases zookeeper connection timeout to 20000 ms for the KafkaITCase to fix failing tests on Travis Increased timeouts of ZooKeeperLeaderElectionTest for Travis Makes the WebInfoServer and the WebRuntimeMonitor to use the LeaderRetrievalService to retrieve the current leading JobManager Adds proper synchronization to ZooKeeperLeaderElectionService. Fixes StateCheckpointedITCase and PartitionedStateCheckpointingITCase Adds configuration description for new ZooKeeper configuration values Fixed port selection of JobManager at startup Improves logging output Extends masters file to also specify the webui ports Adds proper network interface resolution by retrieving the current leader address Makes the ZooKeeperLeaderElectionService write the leader information in ephemeral nodes so that the information is deleted once the leader has terminated. Fixes a bug in the TaskManager due to call by name semantics of scheduler.scheduleOnce. Adds jobManagerURL to TriggerTaskManagerRegistration message Enables findConnectingAddress to use the ZooKeeperLeaderRetrievalService. This allows to test the connection to a possibly changing master node. Changes startup scripts to respect the recovery mode instead of the ZK_QUORUM Adjust travis log file to only log zookeeper errors Updates high availability setup guide Adds TestLogger to leader election tests This closes apache#1016.
Failing test is a Yarn test. Locally Travis passed. Will merge it now. |
… set of JobManager. The leader will then be retrieved from ZooKeeper by the TaskManagers. Refactors FlinkMiniCluster to support multiple JobManager Adds proper remote address resolution for actors Clean up of LeaderElection and LeaderRetrievalService. Removes synchronization to avoid deadlock. Adds ZooKeeper start option to TestBaseUtils.startCluster Removes registration session IDs, using the leader session IDs instead. Sets the leader session ID directly in the grantLeadership method. Let the LeaderElectionService select the leader session I D. Return leader session ID to LeaderRetrievalListeners. Removes direct ActorRef interaction Introduces LeaderRetrievalService for the Client and the CliFrontend. Make ApplicationClient to use the LeaderRetrievalService for JobManager resolution Adds LeaderElection/Retrieval tests Added test for exception forwarding from the CuratorFramework to a Contender Adds test job submission with changing leaders Adds new test cases for job cleanup after leader election change Adds new LeaderChangeStateCleanup test case Adds LeaderElectionRetrievalTestingCluster Introduces ListeningBehaviour for job submissions Relocation of org.apache.curator in flink-shaded-hadoop jar Adds Apache ZooKeeper and Apache Curator to LICENSE and NOTICE files Increases zookeeper connection timeout to 20000 ms for the KafkaITCase to fix failing tests on Travis Increased timeouts of ZooKeeperLeaderElectionTest for Travis Makes the WebInfoServer and the WebRuntimeMonitor to use the LeaderRetrievalService to retrieve the current leading JobManager Adds proper synchronization to ZooKeeperLeaderElectionService. Fixes StateCheckpointedITCase and PartitionedStateCheckpointingITCase Adds configuration description for new ZooKeeper configuration values Fixed port selection of JobManager at startup Improves logging output Extends masters file to also specify the webui ports Adds proper network interface resolution by retrieving the current leader address Makes the ZooKeeperLeaderElectionService write the leader information in ephemeral nodes so that the information is deleted once the leader has terminated. Fixes a bug in the TaskManager due to call by name semantics of scheduler.scheduleOnce. Adds jobManagerURL to TriggerTaskManagerRegistration message Enables findConnectingAddress to use the ZooKeeperLeaderRetrievalService. This allows to test the connection to a possibly changing master node. Changes startup scripts to respect the recovery mode instead of the ZK_QUORUM Adjust travis log file to only log zookeeper errors Updates high availability setup guide Adds TestLogger to leader election tests This closes apache#1016.
* [MATRIX-241] API changes for Gemini models * spotless
Idea
This PR introduces cluster high availability via ZooKeeper. The idea is to use ZooKeeper to do leader election among a group of registered
JobManagers
. The elected leader writes his akka connection URL and his assigned leader session ID to ZooKeeper from where theTaskManagers
can retrieve it.Activation
In order to use the high availability mode, one has to select Flinks zookeeper recovery mode and specify a valid ZK quorum. Both is done in the
flink-conf.yaml
by settingrecovery.mode: zookeeper
andha.zookeeper.quorum: address1:2181[,...],addressX:2181
where the zk quorum addresses point to ZooKeeper servers.Implementation
In order to support HA ZK and also the standalone recovery mode (no HA), this PR introduces the
LeaderElectionService
and theLeaderRetrievalService
. The former service is used by leader contenders to be elected as the leader. The latter is used to obtain the address of the current leader. In standalone mode (StandaloneLeaderElectionService
,StandaloneLeaderRetrievalService
), these services just return theJobManager
address which was found in the Flink configuration. With ZooKeeper, the services use the Curator framework to connect to the ZooKeeper quorum to do leader election and to read the ZkNode which contains the information of the current leader.In the wake of introducing these services, the
FlinkMiniCluster
was also adapted to support HA with ZooKeeper. TheForkableFlinkMiniCluster
starts automatically a ZK TestingCluster ifrecovery.mode
was set to zookeeper in the provided configuration and if theha.zookeeper.quorum
was not set. Due to the refactorings it is now necessary to explicitly start aFlinkMiniCluster
with thestart()
method after creating it.A
ListeningBehaviour
for theSubmitJob
message is introduced, too. The listening behaviour defines to what events theJobClientActor
will be listening/what he expects from theJobManager
to be sent. There are currently three different modes supported:JobManager
will only send theAcknowledge
message as the response to theSubmitJob
message.JobClientActor
waits to receive theSerializedJobExecutionResult
from theJobManager
. The execution result is sent to theJobClientActor
after the job has entered a terminal state.SerializedJobExecutionResult
, theJobClientActor
wants to receive all intermediate state change notifications.Since
hadoop-2.6.0
uses an incompatible org.apache.curator version than we do, we additionally shade theorg.apache.curator
package in the flink-shaded-hadoop jar.The current tooling to start a ZooKeeper quorum was extended so that the
masters
file not only specifies the hosts on which a JobManager shall be started, but also the port of the web UI. The format is the following:<host>:<webUIPort>
.In order to also support the automatic network interface retrieval to which the
TaskManager
should bind, theLeaderRetrievalUtils
offer a methodfindConnectingAddress
. It works similarly to theNetUtils.findConnectingAddress
with the difference that it can react to changing leaders. In case of a leader change, the newly electedJobManager
will be used as the connection target to retrieve the correct network interface.Limitations
Currently in HA ZK mode, one web server is started per
JobManager
. The reason for not having a dedicated web server is that parts of the information it requires are not serializable yet. Thus, eachJobManager
has a local web server showing its own state. Since the web servers are running dedicatedly for oneJobManager
, they don't know about the current leader session ID. Therefore, it is not possible to cancel jobs via the web servers. This is because aCancelJob
message is aRequiresLeaderSessionMessage
.In case of an exception in the
LeaderElectionService
and theLeaderRetrievalService
, theContender
or theLeaderRetrievalListener
will be informed about the occurred exception. In the current implementation, theJobManager
(implementing theContender
interface) and theTaskManager
(implementing theLeaderRetrievalListener
) will log the exception and then kill themselves. This might be changed to a policy where the respective manager tries to restart the service.The
JobClientActor
cannot handle changing leaders, yet. In case that the current leader dies, theJobClientActor
will receive aTerminated
message because it is watching the current leader. Upon receiving this message, theJobClientActor
sends aFailure
message to the submitter. In the future, when a currently executed job is restarted on a newly elected leader, theJobClientActor
should automatically reconnect to the new leader and receive the status update messages and the job execution result from it.This PR, does not contain HA for YARN, even though leader election and address retrieval with ZooKeeper also works with it. The problem is the
ApplicationMaster
which would have to replicated as well. Alternatively, we could also rely on YARN to simply start a newApplicationMaster
and a newJobManager
once it detects that the currentJobManager
has died.Since this PR touches a lot of files, a close review, as far as possible, would be helpful.