Skip to content

Commit

Permalink
[FLINK-14287] Decouple leader address from LeaderContender
Browse files Browse the repository at this point in the history
Change LeaderElectionService#confirmLeadership to accept leader address so that
the LeaderContender does not need to know the address of the potential leader
before gaining leadership. This allows to decouple the leader election from the
actual leader component.

This closes apache#9813.
  • Loading branch information
tillrohrmann committed Oct 10, 2019
1 parent 11c2610 commit 7e7ee28
Show file tree
Hide file tree
Showing 17 changed files with 85 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,7 @@ public void grantLeadership(final UUID newLeaderSessionID) {
recoveredJobsFuture,
BiFunctionWithException.unchecked((Boolean confirmLeadership, Collection<JobGraph> recoveredJobs) -> {
if (confirmLeadership) {
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress());
} else {
for (JobGraph recoveredJob : recoveredJobs) {
jobGraphStore.releaseJobGraph(recoveredJob.getJobID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,10 @@ private void removeContender(EmbeddedLeaderElectionService service) {
/**
* Callback from leader contenders when they confirm a leader grant.
*/
private void confirmLeader(final EmbeddedLeaderElectionService service, final UUID leaderSessionId) {
private void confirmLeader(
final EmbeddedLeaderElectionService service,
final UUID leaderSessionId,
final String leaderAddress) {
synchronized (lock) {
// if the service was shut down in the meantime, ignore this confirmation
if (!service.running || shutdown) {
Expand All @@ -246,16 +249,15 @@ private void confirmLeader(final EmbeddedLeaderElectionService service, final UU
try {
// check if the confirmation is for the same grant, or whether it is a stale grant
if (service == currentLeaderProposed && currentLeaderSessionId.equals(leaderSessionId)) {
final String address = service.contender.getAddress();
LOG.info("Received confirmation of leadership for leader {} , session={}", address, leaderSessionId);
LOG.info("Received confirmation of leadership for leader {} , session={}", leaderAddress, leaderSessionId);

// mark leadership
currentLeaderConfirmed = service;
currentLeaderAddress = address;
currentLeaderAddress = leaderAddress;
currentLeaderProposed = null;

// notify all listeners
notifyAllListeners(address, leaderSessionId);
notifyAllListeners(leaderAddress, leaderSessionId);
}
else {
LOG.debug("Received confirmation of leadership for a stale leadership grant. Ignoring.");
Expand Down Expand Up @@ -434,9 +436,10 @@ public void stop() throws Exception {
}

@Override
public void confirmLeaderSessionID(UUID leaderSessionID) {
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
checkNotNull(leaderSessionID);
confirmLeader(this, leaderSessionID);
checkNotNull(leaderAddress);
confirmLeader(this, leaderSessionID, leaderAddress);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ public void stop() {
}

@Override
public void confirmLeaderSessionID(UUID leaderSessionID) {
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
checkNotNull(leaderSessionID, "leaderSessionID");
checkArgument(leaderSessionID.equals(leaderId), "confirmed wrong leader session id");

Expand All @@ -151,14 +151,13 @@ public void confirmLeaderSessionID(UUID leaderSessionID) {
checkState(leader == null, "leader already confirmed");

// accept the confirmation
final String address = proposedLeader.getAddress();
leaderAddress = address;
this.leaderAddress = leaderAddress;
leader = proposedLeader;

// notify all listeners
for (EmbeddedLeaderRetrievalService listener : listeners) {
notificationExecutor.execute(
new NotifyOfLeaderCall(address, leaderId, listener.listener, LOG));
new NotifyOfLeaderCall(leaderAddress, leaderId, listener.listener, LOG));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,10 @@ private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {

final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
return startFuture.thenAcceptAsync(
(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
(Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(
leaderSessionId,
jobMasterService.getAddress(),
currentLeaderGatewayFuture),
executor);
}

Expand All @@ -358,10 +361,14 @@ private CompletableFuture<JobSchedulingStatus> getJobSchedulingStatus() {
}
}

private void confirmLeaderSessionIdIfStillLeader(UUID leaderSessionId, CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture) {
private void confirmLeaderSessionIdIfStillLeader(
UUID leaderSessionId,
String leaderAddress,
CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture) {

if (leaderElectionService.hasLeadership(leaderSessionId)) {
currentLeaderGatewayFuture.complete(jobMasterService.getGateway());
leaderElectionService.confirmLeaderSessionID(leaderSessionId);
leaderElectionService.confirmLeadership(leaderSessionId, leaderAddress);
} else {
log.debug("Ignoring confirmation of leader session id because {} is no longer the leader.", getAddress());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@
* to instantiate its own leader election service.
*
* Once a contender has been granted leadership he has to confirm the received leader session ID
* by calling the method confirmLeaderSessionID. This will notify the leader election service, that
* the contender has received the new leader session ID and that it can now be published for
* leader retrieval services.
* by calling the method {@link #confirmLeadership(UUID, String)}. This will notify the leader election
* service, that the contender has accepted the leadership specified and that the leader session id as
* well as the leader address can now be published for leader retrieval services.
*/
public interface LeaderElectionService {

Expand All @@ -51,16 +51,18 @@ public interface LeaderElectionService {
void stop() throws Exception;

/**
* Confirms that the new leader session ID has been successfully received by the new leader.
* This method is usually called by the newly appointed {@link LeaderContender}.
* Confirms that the {@link LeaderContender} has accepted the leadership identified by the
* given leader session id. It also publishes the leader address under which the leader is
* reachable.
*
* The rational behind this method is to establish an order between setting the new leader
* session ID in the {@link LeaderContender} and publishing the new leader session ID to the
* leader retrieval services.
* <p>The rational behind this method is to establish an order between setting the new leader
* session ID in the {@link LeaderContender} and publishing the new leader session ID as well
* as the leader address to the leader retrieval services.
*
* @param leaderSessionID The new leader session ID
* @param leaderAddress The address of the new leader
*/
void confirmLeaderSessionID(UUID leaderSessionID);
void confirmLeadership(UUID leaderSessionID, String leaderAddress);

/**
* Returns true if the {@link LeaderContender} with which the service has been started owns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public void stop() {
}

@Override
public void confirmLeaderSessionID(UUID leaderSessionID) {}
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {}

@Override
public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ public class ZooKeeperLeaderElectionService implements LeaderElectionService, Le

private volatile UUID confirmedLeaderSessionID;

private volatile String confirmedLeaderAddress;

/** The leader contender which applies for leadership. */
private volatile LeaderContender leaderContender;

Expand Down Expand Up @@ -100,6 +102,7 @@ public ZooKeeperLeaderElectionService(CuratorFramework client, String latchPath,

issuedLeaderSessionID = null;
confirmedLeaderSessionID = null;
confirmedLeaderAddress = null;
leaderContender = null;

running = false;
Expand Down Expand Up @@ -177,12 +180,12 @@ public void stop() throws Exception{
}

@Override
public void confirmLeaderSessionID(UUID leaderSessionID) {
public void confirmLeadership(UUID leaderSessionID, String leaderAddress) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Confirm leader session ID {} for leader {}.",
leaderSessionID,
leaderContender.getAddress());
leaderAddress);
}

Preconditions.checkNotNull(leaderSessionID);
Expand All @@ -192,8 +195,8 @@ public void confirmLeaderSessionID(UUID leaderSessionID) {
synchronized (lock) {
if (running) {
if (leaderSessionID.equals(this.issuedLeaderSessionID)) {
confirmedLeaderSessionID = leaderSessionID;
writeLeaderInformation(confirmedLeaderSessionID);
confirmLeaderInformation(leaderSessionID, leaderAddress);
writeLeaderInformation();
}
} else {
LOG.debug("Ignoring the leader session Id {} confirmation, since the " +
Expand All @@ -206,6 +209,11 @@ public void confirmLeaderSessionID(UUID leaderSessionID) {
}
}

private void confirmLeaderInformation(UUID leaderSessionID, String leaderAddress) {
confirmedLeaderSessionID = leaderSessionID;
confirmedLeaderAddress = leaderAddress;
}

@Override
public boolean hasLeadership(@Nonnull UUID leaderSessionId) {
return leaderLatch.hasLeadership() && leaderSessionId.equals(issuedLeaderSessionID);
Expand All @@ -216,7 +224,7 @@ public void isLeader() {
synchronized (lock) {
if (running) {
issuedLeaderSessionID = UUID.randomUUID();
confirmedLeaderSessionID = null;
clearConfirmedLeaderInformation();

if (LOG.isDebugEnabled()) {
LOG.debug(
Expand All @@ -233,16 +241,23 @@ public void isLeader() {
}
}

private void clearConfirmedLeaderInformation() {
confirmedLeaderSessionID = null;
confirmedLeaderAddress = null;
}

@Override
public void notLeader() {
synchronized (lock) {
if (running) {
issuedLeaderSessionID = null;
confirmedLeaderSessionID = null;
LOG.debug(
"Revoke leadership of {} ({}@{}).",
leaderContender,
confirmedLeaderSessionID,
confirmedLeaderAddress);

if (LOG.isDebugEnabled()) {
LOG.debug("Revoke leadership of {}.", leaderContender.getAddress());
}
issuedLeaderSessionID = null;
clearConfirmedLeaderInformation();

leaderContender.revokeLeadership();
} else {
Expand Down Expand Up @@ -275,7 +290,7 @@ public void nodeChanged() throws Exception {
"Writing leader information into empty node by {}.",
leaderContender.getAddress());
}
writeLeaderInformation(confirmedLeaderSessionID);
writeLeaderInformation();
} else {
byte[] data = childData.getData();

Expand All @@ -286,23 +301,23 @@ public void nodeChanged() throws Exception {
"Writing leader information into node with empty data field by {}.",
leaderContender.getAddress());
}
writeLeaderInformation(confirmedLeaderSessionID);
writeLeaderInformation();
} else {
ByteArrayInputStream bais = new ByteArrayInputStream(data);
ObjectInputStream ois = new ObjectInputStream(bais);

String leaderAddress = ois.readUTF();
UUID leaderSessionID = (UUID) ois.readObject();

if (!leaderAddress.equals(this.leaderContender.getAddress()) ||
if (!leaderAddress.equals(confirmedLeaderAddress) ||
(leaderSessionID == null || !leaderSessionID.equals(confirmedLeaderSessionID))) {
// the data field does not correspond to the expected leader information
if (LOG.isDebugEnabled()) {
LOG.debug(
"Correcting leader information by {}.",
leaderContender.getAddress());
}
writeLeaderInformation(confirmedLeaderSessionID);
writeLeaderInformation();
}
}
}
Expand All @@ -320,24 +335,22 @@ public void nodeChanged() throws Exception {

/**
* Writes the current leader's address as well the given leader session ID to ZooKeeper.
*
* @param leaderSessionID Leader session ID which is written to ZooKeeper
*/
protected void writeLeaderInformation(UUID leaderSessionID) {
protected void writeLeaderInformation() {
// this method does not have to be synchronized because the curator framework client
// is thread-safe
try {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Write leader information: Leader={}, session ID={}.",
leaderContender.getAddress(),
leaderSessionID);
confirmedLeaderAddress,
confirmedLeaderSessionID);
}
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);

oos.writeUTF(leaderContender.getAddress());
oos.writeObject(leaderSessionID);
oos.writeUTF(confirmedLeaderAddress);
oos.writeObject(confirmedLeaderSessionID);

oos.close();

Expand Down Expand Up @@ -381,8 +394,8 @@ protected void writeLeaderInformation(UUID leaderSessionID) {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Successfully wrote leader information: Leader={}, session ID={}.",
leaderContender.getAddress(),
leaderSessionID);
confirmedLeaderAddress,
confirmedLeaderSessionID);
}
} catch (Exception e) {
leaderContender.handleError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -901,7 +901,7 @@ public void grantLeadership(final UUID newLeaderSessionID) {
(acceptLeadership) -> {
if (acceptLeadership) {
// confirming the leader session ID might be blocking,
leaderElectionService.confirmLeaderSessionID(newLeaderSessionID);
leaderElectionService.confirmLeadership(newLeaderSessionID, getAddress());
}
},
getRpcService().getExecutor());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -710,7 +710,7 @@ protected CompletableFuture<Void> shutDownInternal() {
@Override
public void grantLeadership(final UUID leaderSessionID) {
log.info("{} was granted leadership with leaderSessionID={}", getRestBaseUrl(), leaderSessionID);
leaderElectionService.confirmLeaderSessionID(leaderSessionID);
leaderElectionService.confirmLeadership(leaderSessionID, getRestBaseUrl());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ public void testJobManagerLeaderRetrieval() throws Exception {

final UUID leaderId = leaderIdArgumentCaptor.getValue();

leaderElectionService.confirmLeaderSessionID(leaderId);
leaderElectionService.confirmLeadership(leaderId, address);

verify(leaderRetrievalListener).notifyLeaderAddress(eq(address), eq(leaderId));
}
Expand All @@ -163,7 +163,7 @@ public void testResourceManagerLeaderRetrieval() throws Exception {

final UUID leaderId = leaderIdArgumentCaptor.getValue();

leaderElectionService.confirmLeaderSessionID(leaderId);
leaderElectionService.confirmLeadership(leaderId, address);

verify(leaderRetrievalListener).notifyLeaderAddress(eq(address), eq(leaderId));
}
Expand Down Expand Up @@ -191,8 +191,8 @@ public void testConcurrentLeadershipOperations() throws Exception {

assertThat(dispatcherLeaderElectionService.hasLeadership(newLeaderSessionId), is(true));

dispatcherLeaderElectionService.confirmLeaderSessionID(oldLeaderSessionId);
dispatcherLeaderElectionService.confirmLeaderSessionID(newLeaderSessionId);
dispatcherLeaderElectionService.confirmLeadership(oldLeaderSessionId, leaderContender.getAddress());
dispatcherLeaderElectionService.confirmLeadership(newLeaderSessionId, leaderContender.getAddress());

assertThat(dispatcherLeaderElectionService.hasLeadership(newLeaderSessionId), is(true));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ private static LeaderContender mockContender(final LeaderElectionService service
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
final UUID uuid = (UUID) invocation.getArguments()[0];
service.confirmLeaderSessionID(uuid);
service.confirmLeadership(uuid, address);
return null;
}
}).when(mockContender).grantLeadership(any(UUID.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testHasLeadership() throws Exception {
assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(true));
assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()), is(false));

leaderElectionService.confirmLeaderSessionID(leaderSessionId);
leaderElectionService.confirmLeadership(leaderSessionId, manualLeaderContender.getAddress());

assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(true));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void grantLeadership(UUID leaderSessionID) {

this.leaderSessionID = leaderSessionID;

leaderElectionService.confirmLeaderSessionID(leaderSessionID);
leaderElectionService.confirmLeadership(leaderSessionID, getAddress());

leader = true;

Expand Down
Loading

0 comments on commit 7e7ee28

Please sign in to comment.