Skip to content

Commit

Permalink
[FLINK-22577][tests] Harden KubernetesLeaderElectionAndRetrievalITCase
Browse files Browse the repository at this point in the history
This commit introduces closing logic to the TestingLeaderElectionEventHandler which would
otherwise forward calls after the KubernetesLeaderElectionDriver is closed.

This closes apache#15849.
  • Loading branch information
tillrohrmann committed May 10, 2021
1 parent 56a7a11 commit fbf84ac
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ void runTest(RunnableWithException testMethod) throws Exception {
electionEventHandler.init(leaderElectionDriver);
testMethod.run();

electionEventHandler.close();
leaderElectionDriver.close();
leaderRetrievalDriver.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,10 @@ public void testLeaderElectionAndRetrieval() throws Exception {
KubernetesLeaderElectionDriver leaderElectionDriver = null;
KubernetesLeaderRetrievalDriver leaderRetrievalDriver = null;

final TestingLeaderElectionEventHandler electionEventHandler =
new TestingLeaderElectionEventHandler(LEADER_INFORMATION);

try {
final TestingLeaderElectionEventHandler electionEventHandler =
new TestingLeaderElectionEventHandler(LEADER_INFORMATION);
leaderElectionDriver =
new KubernetesLeaderElectionDriver(
kubernetesResource.getFlinkKubeClient(),
Expand Down Expand Up @@ -88,6 +89,7 @@ public void testLeaderElectionAndRetrieval() throws Exception {
assertThat(
retrievalEventHandler.getAddress(), is(LEADER_INFORMATION.getLeaderAddress()));
} finally {
electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
public class TestingLeaderElectionEventHandler extends TestingLeaderBase
implements LeaderElectionEventHandler {

private final Object lock = new Object();

private final LeaderInformation leaderInformation;

private final OneShotLatch initializationLatch;
Expand All @@ -40,6 +42,8 @@ public class TestingLeaderElectionEventHandler extends TestingLeaderBase

private LeaderInformation confirmedLeaderInformation = LeaderInformation.empty();

private boolean running = true;

public TestingLeaderElectionEventHandler(LeaderInformation leaderInformation) {
this.leaderInformation = leaderInformation;
this.initializationLatch = new OneShotLatch();
Expand All @@ -51,35 +55,53 @@ public void init(LeaderElectionDriver leaderElectionDriver) {
initializationLatch.trigger();
}

private void ifRunning(Runnable action) {
synchronized (lock) {
if (running) {
action.run();
}
}
}

@Override
public void onGrantLeadership() {
waitForInitialization(
leaderElectionDriver -> {
confirmedLeaderInformation = leaderInformation;
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
leaderEventQueue.offer(confirmedLeaderInformation);
});
ifRunning(
() ->
waitForInitialization(
leaderElectionDriver -> {
confirmedLeaderInformation = leaderInformation;
leaderElectionDriver.writeLeaderInformation(
confirmedLeaderInformation);
leaderEventQueue.offer(confirmedLeaderInformation);
}));
}

@Override
public void onRevokeLeadership() {
waitForInitialization(
(leaderElectionDriver) -> {
confirmedLeaderInformation = LeaderInformation.empty();
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
leaderEventQueue.offer(confirmedLeaderInformation);
});
ifRunning(
() ->
waitForInitialization(
(leaderElectionDriver) -> {
confirmedLeaderInformation = LeaderInformation.empty();
leaderElectionDriver.writeLeaderInformation(
confirmedLeaderInformation);
leaderEventQueue.offer(confirmedLeaderInformation);
}));
}

@Override
public void onLeaderInformationChange(LeaderInformation leaderInformation) {
waitForInitialization(
leaderElectionDriver -> {
if (confirmedLeaderInformation.getLeaderSessionID() != null
&& !this.confirmedLeaderInformation.equals(leaderInformation)) {
leaderElectionDriver.writeLeaderInformation(confirmedLeaderInformation);
}
});
ifRunning(
() ->
waitForInitialization(
leaderElectionDriver -> {
if (confirmedLeaderInformation.getLeaderSessionID() != null
&& !this.confirmedLeaderInformation.equals(
leaderInformation)) {
leaderElectionDriver.writeLeaderInformation(
confirmedLeaderInformation);
}
}));
}

private void waitForInitialization(Consumer<? super LeaderElectionDriver> operation) {
Expand All @@ -94,6 +116,14 @@ private void waitForInitialization(Consumer<? super LeaderElectionDriver> operat
}

public LeaderInformation getConfirmedLeaderInformation() {
return confirmedLeaderInformation;
synchronized (lock) {
return confirmedLeaderInformation;
}
}

public void close() {
synchronized (lock) {
running = false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@ public void testZooKeeperLeaderElectionRetrieval() throws Exception {
is(TEST_LEADER.getLeaderSessionID()));
assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress()));
} finally {
electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
Expand Down Expand Up @@ -401,6 +402,7 @@ public void testLeaderShouldBeCorrectedWhenOverwritten() throws Exception {
is(TEST_LEADER.getLeaderSessionID()));
assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress()));
} finally {
electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
Expand Down Expand Up @@ -450,6 +452,7 @@ public void testExceptionForwarding() throws Exception {
.isPresent(),
is(true));
} finally {
electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
Expand Down Expand Up @@ -528,6 +531,7 @@ public void testEphemeralZooKeeperNodes() throws Exception {
// that was expected
}
} finally {
electionEventHandler.close();
if (leaderRetrievalDriver != null) {
leaderRetrievalDriver.close();
}
Expand Down Expand Up @@ -577,6 +581,7 @@ public void testNotLeaderShouldNotCleanUpTheLeaderInformation() throws Exception
is(TEST_LEADER.getLeaderSessionID()));
assertThat(retrievalEventHandler.getAddress(), is(TEST_LEADER.getLeaderAddress()));
} finally {
electionEventHandler.close();
if (leaderElectionDriver != null) {
leaderElectionDriver.close();
}
Expand Down

0 comments on commit fbf84ac

Please sign in to comment.