Skip to content

Commit

Permalink
[FLINK-14298] Replace LeaderContender#getAddress with #getDescription
Browse files Browse the repository at this point in the history
This commit changes the LeaderContender to only require implementations to
report a description of the contender used for logging purposes instead of
the actual leader address.

This closes apache#9821.
  • Loading branch information
tillrohrmann committed Oct 11, 2019
1 parent eea0561 commit 4813a23
Show file tree
Hide file tree
Showing 14 changed files with 78 additions and 84 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -299,8 +299,7 @@ private CompletableFuture<Void> updateLeader() {
currentLeaderProposed = leaderService;
currentLeaderProposed.isLeader = true;

LOG.info("Proposing leadership to contender {} @ {}",
leaderService.contender, leaderService.contender.getAddress());
LOG.info("Proposing leadership to contender {}", leaderService.contender.getDescription());

return execute(new GrantLeadershipCall(leaderService.contender, leaderSessionId, LOG));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,7 @@ private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID

private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());
jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, jobMasterService.getAddress());

try {
runningJobsRegistry.setJobRunning(jobGraph.getJobID());
Expand Down Expand Up @@ -370,7 +370,7 @@ private void confirmLeaderSessionIdIfStillLeader(
currentLeaderGatewayFuture.complete(jobMasterService.getGateway());
leaderElectionService.confirmLeadership(leaderSessionId, leaderAddress);
} else {
log.debug("Ignoring confirmation of leader session id because {} is no longer the leader.", getAddress());
log.debug("Ignoring confirmation of leader session id because {} is no longer the leader.", getDescription());
}
}

Expand All @@ -394,8 +394,8 @@ public void revokeLeadership() {
}

private CompletableFuture<Void> revokeJobMasterLeadership() {
log.info("JobManager for job {} ({}) was revoked leadership at {}.",
jobGraph.getName(), jobGraph.getJobID(), getAddress());
log.info("JobManager for job {} ({}) at {} was revoked leadership.",
jobGraph.getName(), jobGraph.getJobID(), jobMasterService.getAddress());

setNewLeaderGatewayFuture();

Expand Down Expand Up @@ -431,7 +431,7 @@ private void setNewLeaderGatewayFuture() {
}

@Override
public String getAddress() {
public String getDescription() {
return jobMasterService.getAddress();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,20 @@ public interface LeaderContender {
*/
void revokeLeadership();

/**
* Returns the address of the {@link LeaderContender} under which other instances can connect
* to it.
*
* @return Address of this contender.
*/
String getAddress();

/**
* Callback method which is called by {@link LeaderElectionService} in case of an error in the
* service thread.
*
* @param exception Caught exception
*/
void handleError(Exception exception);

/**
* Returns the description of the {@link LeaderContender} for logging purposes.
*
* @return Description of this contender.
*/
default String getDescription() {
return "LeaderContender: " + getClass().getSimpleName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ public void isLeader() {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Grant leadership to contender {} with session ID {}.",
leaderContender.getAddress(),
leaderContender.getDescription(),
issuedLeaderSessionID);
}

Expand All @@ -252,7 +252,7 @@ public void notLeader() {
if (running) {
LOG.debug(
"Revoke leadership of {} ({}@{}).",
leaderContender,
leaderContender.getDescription(),
confirmedLeaderSessionID,
confirmedLeaderAddress);

Expand All @@ -277,7 +277,7 @@ public void nodeChanged() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Leader node changed while {} is the leader with session ID {}.",
leaderContender.getAddress(),
leaderContender.getDescription(),
confirmedLeaderSessionID);
}

Expand All @@ -288,7 +288,7 @@ public void nodeChanged() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Writing leader information into empty node by {}.",
leaderContender.getAddress());
leaderContender.getDescription());
}
writeLeaderInformation();
} else {
Expand All @@ -299,7 +299,7 @@ public void nodeChanged() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Writing leader information into node with empty data field by {}.",
leaderContender.getAddress());
leaderContender.getDescription());
}
writeLeaderInformation();
} else {
Expand All @@ -315,7 +315,7 @@ public void nodeChanged() throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug(
"Correcting leader information by {}.",
leaderContender.getAddress());
leaderContender.getDescription());
}
writeLeaderInformation();
}
Expand Down Expand Up @@ -410,15 +410,15 @@ protected void handleStateChange(ConnectionState newState) {
LOG.debug("Connected to ZooKeeper quorum. Leader election can start.");
break;
case SUSPENDED:
LOG.warn("Connection to ZooKeeper suspended. The contender " + leaderContender.getAddress()
LOG.warn("Connection to ZooKeeper suspended. The contender " + leaderContender.getDescription()
+ " no longer participates in the leader election.");
break;
case RECONNECTED:
LOG.info("Connection to ZooKeeper was reconnected. Leader election can be restarted.");
break;
case LOST:
// Maybe we have to throw an exception here to terminate the JobManager
LOG.warn("Connection to ZooKeeper lost. The contender " + leaderContender.getAddress()
LOG.warn("Connection to ZooKeeper lost. The contender " + leaderContender.getDescription()
+ " no longer participates in the leader election.");
break;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class LeaderConnectionInfo {

private final String address;

LeaderConnectionInfo(UUID leaderSessionId, String address) {
public LeaderConnectionInfo(UUID leaderSessionId, String address) {
this.leaderSessionId = leaderSessionId;
this.address = address;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -719,7 +719,7 @@ public void revokeLeadership() {
}

@Override
public String getAddress() {
public String getDescription() {
return getRestBaseUrl();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@
import org.apache.flink.runtime.concurrent.Executors;
import org.apache.flink.runtime.leaderelection.LeaderContender;
import org.apache.flink.runtime.leaderelection.LeaderElectionService;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener;
import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
import org.apache.flink.runtime.util.LeaderConnectionInfo;
import org.apache.flink.runtime.util.LeaderRetrievalUtils;
import org.apache.flink.util.TestLogger;

import org.junit.After;
Expand All @@ -37,17 +38,17 @@
import static org.hamcrest.Matchers.is;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.atLeast;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
* Tests for the {@link EmbeddedHaServices}.
*/
public class EmbeddedHaServicesTest extends TestLogger {

private static final String ADDRESS = "foobar";

private EmbeddedHaServices embeddedHaServices;

@Before
Expand Down Expand Up @@ -120,52 +121,40 @@ public void testResourceManagerLeaderElection() throws Exception {
*/
@Test
public void testJobManagerLeaderRetrieval() throws Exception {
final String address = "foobar";
JobID jobId = new JobID();
LeaderRetrievalListener leaderRetrievalListener = mock(LeaderRetrievalListener.class);
LeaderContender leaderContender = mock(LeaderContender.class);
when(leaderContender.getAddress()).thenReturn(address);

LeaderElectionService leaderElectionService = embeddedHaServices.getJobManagerLeaderElectionService(jobId);
LeaderRetrievalService leaderRetrievalService = embeddedHaServices.getJobManagerLeaderRetriever(jobId);

runLeaderRetrievalTest(leaderElectionService, leaderRetrievalService);
}

private void runLeaderRetrievalTest(LeaderElectionService leaderElectionService, LeaderRetrievalService leaderRetrievalService) throws Exception {
LeaderRetrievalUtils.LeaderConnectionInfoListener leaderRetrievalListener = new LeaderRetrievalUtils.LeaderConnectionInfoListener();
TestingLeaderContender leaderContender = new TestingLeaderContender();

leaderRetrievalService.start(leaderRetrievalListener);
leaderElectionService.start(leaderContender);

ArgumentCaptor<UUID> leaderIdArgumentCaptor = ArgumentCaptor.forClass(UUID.class);
verify(leaderContender).grantLeadership(leaderIdArgumentCaptor.capture());
final UUID leaderId = leaderContender.getLeaderSessionFuture().get();

final UUID leaderId = leaderIdArgumentCaptor.getValue();
leaderElectionService.confirmLeadership(leaderId, ADDRESS);

leaderElectionService.confirmLeadership(leaderId, address);
final LeaderConnectionInfo leaderConnectionInfo = leaderRetrievalListener.getLeaderConnectionInfoFuture().get();

verify(leaderRetrievalListener).notifyLeaderAddress(eq(address), eq(leaderId));
assertThat(leaderConnectionInfo.getAddress(), is(ADDRESS));
assertThat(leaderConnectionInfo.getLeaderSessionId(), is(leaderId));
}

/**
* Tests the ResourceManager leader retrieval for a given job.
*/
@Test
public void testResourceManagerLeaderRetrieval() throws Exception {
final String address = "foobar";
LeaderRetrievalListener leaderRetrievalListener = mock(LeaderRetrievalListener.class);
LeaderContender leaderContender = mock(LeaderContender.class);
when(leaderContender.getAddress()).thenReturn(address);

LeaderElectionService leaderElectionService = embeddedHaServices.getResourceManagerLeaderElectionService();
LeaderRetrievalService leaderRetrievalService = embeddedHaServices.getResourceManagerLeaderRetriever();

leaderRetrievalService.start(leaderRetrievalListener);
leaderElectionService.start(leaderContender);

ArgumentCaptor<UUID> leaderIdArgumentCaptor = ArgumentCaptor.forClass(UUID.class);
verify(leaderContender).grantLeadership(leaderIdArgumentCaptor.capture());

final UUID leaderId = leaderIdArgumentCaptor.getValue();

leaderElectionService.confirmLeadership(leaderId, address);

verify(leaderRetrievalListener).notifyLeaderAddress(eq(address), eq(leaderId));
runLeaderRetrievalTest(leaderElectionService, leaderRetrievalService);
}

/**
Expand All @@ -191,8 +180,8 @@ public void testConcurrentLeadershipOperations() throws Exception {

assertThat(dispatcherLeaderElectionService.hasLeadership(newLeaderSessionId), is(true));

dispatcherLeaderElectionService.confirmLeadership(oldLeaderSessionId, leaderContender.getAddress());
dispatcherLeaderElectionService.confirmLeadership(newLeaderSessionId, leaderContender.getAddress());
dispatcherLeaderElectionService.confirmLeadership(oldLeaderSessionId, ADDRESS);
dispatcherLeaderElectionService.confirmLeadership(newLeaderSessionId, ADDRESS);

assertThat(dispatcherLeaderElectionService.hasLeadership(newLeaderSessionId), is(true));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ public void revokeLeadership() {
}

@Override
public String getAddress() {
public String getDescription() {
return "foobar";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,19 @@
import org.apache.flink.util.StringUtils;

import org.junit.Test;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.Executor;

import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;

/**
* Tests for the {@link SingleLeaderElectionService}.
Expand Down Expand Up @@ -165,7 +169,7 @@ public void testImmediateShutdown() throws Exception {
service.shutdown();

final LeaderContender contender = mock(LeaderContender.class);

// should not be possible to start
try {
service.start(contender);
Expand Down Expand Up @@ -210,15 +214,10 @@ private static LeaderContender mockContender(final LeaderElectionService service
private static LeaderContender mockContender(final LeaderElectionService service, final String address) {
LeaderContender mockContender = mock(LeaderContender.class);

when(mockContender.getAddress()).thenReturn(address);

doAnswer(new Answer<Void>() {
@Override
public Void answer(InvocationOnMock invocation) throws Throwable {
final UUID uuid = (UUID) invocation.getArguments()[0];
service.confirmLeadership(uuid, address);
return null;
}
doAnswer((Answer<Void>) invocation -> {
final UUID uuid = (UUID) invocation.getArguments()[0];
service.confirmLeadership(uuid, address);
return null;
}).when(mockContender).grantLeadership(any(UUID.class));

return mockContender;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ public void testHasLeadership() throws Exception {
assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(true));
assertThat(leaderElectionService.hasLeadership(UUID.randomUUID()), is(false));

leaderElectionService.confirmLeadership(leaderSessionId, manualLeaderContender.getAddress());
leaderElectionService.confirmLeadership(leaderSessionId, "foobar");

assertThat(leaderElectionService.hasLeadership(leaderSessionId), is(true));

Expand Down Expand Up @@ -132,7 +132,7 @@ public void revokeLeadership() {
}

@Override
public String getAddress() {
public String getDescription() {
return "foobar";
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ public void grantLeadership(UUID leaderSessionID) {

this.leaderSessionID = leaderSessionID;

leaderElectionService.confirmLeadership(leaderSessionID, getAddress());
leaderElectionService.confirmLeadership(leaderSessionID, address);

leader = true;

Expand All @@ -139,7 +139,7 @@ public void revokeLeadership() {
}

@Override
public String getAddress() {
public String getDescription() {
return address;
}

Expand Down
Loading

0 comments on commit 4813a23

Please sign in to comment.