Skip to content

Commit

Permalink
[FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase work wit…
Browse files Browse the repository at this point in the history
…h Adaptive Scheduler.

The test previously relied on an implicit contract that instances of OperatorCoordinators are never recreated
on the same JobManager. That implicit contract is no longer true with the Adaptive Scheduler.

This change adjusts the test to no longer make that assumption.

This closes apache#15739
  • Loading branch information
StephanEwen committed Apr 25, 2021
1 parent bddcbc4 commit e676442
Showing 1 changed file with 86 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,13 @@
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
Expand Down Expand Up @@ -169,6 +171,9 @@ public static void shutdownMiniCluster() throws Exception {

@Test
public void test() throws Exception {
// this captures variables communicated across instances, recoveries, etc.
TestScript.reset();

final int numEvents1 = 200;
final int numEvents2 = 5;
final int delay1 = 1;
Expand Down Expand Up @@ -296,26 +301,33 @@ private static final class EventSendingCoordinator implements OperatorCoordinato

private final int delay;
private final int maxNumber;
private final int failAtMessage;
private int nextNumber;

private CompletableFuture<byte[]> requestedCheckpoint;
private CompletableFuture<byte[]> nextToComplete;

private final int failAtMessage;
private boolean failedBefore;

private final ArrayDeque<CountDownLatch> recoveredTaskRunning = new ArrayDeque<>();

private SubtaskGateway subtaskGateway;
private boolean workLoopRunning;

/**
* This contains all variables that are necessary to track the progress of the test, and
* which need to be tracked across instances of this coordinator (some scheduler
* implementations may re-instantiate the ExecutionGraph and the coordinators around global
* failures).
*/
private final TestScript testScript;

private EventSendingCoordinator(Context context, String name, int numEvents, int delay) {
checkArgument(delay > 0);
checkArgument(numEvents >= 3);

this.context = context;
this.maxNumber = numEvents;
this.delay = delay;

this.testScript = TestScript.getForOperator(name);

this.mailboxExecutor =
Executors.newSingleThreadExecutor(
new DispatcherThreadFactory(
Expand Down Expand Up @@ -349,17 +361,12 @@ public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exc
String.format("Don't recognize event '%s' from task %d.", event, subtask));
}

// We complete all events that were enqueued. We may need to complete
// multiple ones here, because it can happen that after a failure no real recovery
// happens that results in an event being sent (and this method being called), but that
// immediately another failure comes, triggered by the other operator coordinator (or
// its task).
synchronized (recoveredTaskRunning) {
for (CountDownLatch latch : recoveredTaskRunning) {
latch.countDown();
}
recoveredTaskRunning.clear();
}
// this unblocks all the delayed actions that where kicked off while the previous
// task was still running (if there was a previous task). this is part of simulating
// the extreme race where the coordinator thread stalls for so long that a new
// task execution attempt gets deployed before the last events targeted at the old task
// where sent.
testScript.signalRecoveredTaskReady();

// first, we hand this over to the mailbox thread, so we preserve order on operations,
// even if the action is only to do a thread safe scheduling into the scheduledExecutor
Expand All @@ -375,13 +382,13 @@ public void handleEventFromOperator(int subtask, OperatorEvent event) throws Exc

@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {
// we need to create and enqueue this outside the mailbox, so that the
// enqueuing is strictly ordered with the completion (which also happens outside
// the mail box executor).
// we need to create and register this outside the mailbox so that the
// registration is not affected by the artificial stall on the mailbox, but happens
// strictly before the tasks are restored and the operator events are received (to
// trigger the latches) which also happens outside the mailbox.

final CountDownLatch successorIsRunning = new CountDownLatch(1);
synchronized (recoveredTaskRunning) {
recoveredTaskRunning.addLast(successorIsRunning);
}
testScript.registerHookToNotifyAfterTaskRecovered(successorIsRunning);

// simulate a heavy thread race here: the mailbox has a last enqueued action before the
// cancellation is processed. But through a race, the mailbox freezes for a while and in
Expand Down Expand Up @@ -483,7 +490,12 @@ private void executeSingleAction() {
System.exit(-1);
}

// schedule the next step
// schedule the next step. we do this here, after the previous step concluded, rather
// than scheduling a periodic action. Otherwise, the periodic task would enqueue many
// actions while the mailbox stalls and process them all instantaneously after the
// un-stalling. That wouldn't break the test, but it voids the differences in event
// sending delays between the different coordinators, which are part of provoking the
// situation that requires checkpoint alignment between the coordinators' event streams.
scheduleSingleAction();
}

Expand Down Expand Up @@ -515,8 +527,8 @@ private void sendNextEvent() {
}

private void checkWhetherToTriggerFailure() {
if (nextNumber >= failAtMessage && !failedBefore) {
failedBefore = true;
if (nextNumber >= failAtMessage && !testScript.hasAlreadyFailed()) {
testScript.recordHasFailed();
context.failJob(new Exception("test failure"));
}
}
Expand Down Expand Up @@ -622,6 +634,54 @@ private void restoreState(List<Integer> target) throws Exception {
}
}

// ------------------------------------------------------------------------
// dedicated class to hold the "test script"
// ------------------------------------------------------------------------

private static final class TestScript {

private static final Map<String, TestScript> MAP_FOR_OPERATOR = new HashMap<>();

static TestScript getForOperator(String operatorName) {
return MAP_FOR_OPERATOR.computeIfAbsent(operatorName, (key) -> new TestScript());
}

static void reset() {
MAP_FOR_OPERATOR.clear();
}

private final Collection<CountDownLatch> recoveredTaskRunning = new ArrayList<>();
private boolean failedBefore;

void recordHasFailed() {
this.failedBefore = true;
}

boolean hasAlreadyFailed() {
return failedBefore;
}

void registerHookToNotifyAfterTaskRecovered(CountDownLatch latch) {
synchronized (recoveredTaskRunning) {
recoveredTaskRunning.add(latch);
}
}

void signalRecoveredTaskReady() {
// We complete all latches that were registered. We may need to complete
// multiple ones here, because it can happen that after a previous failure, the next
// executions fails immediately again, before even registering at the coordinator.
// in that case, we have multiple latches from multiple failure notifications waiting
// to be completed.
synchronized (recoveredTaskRunning) {
for (CountDownLatch latch : recoveredTaskRunning) {
latch.countDown();
}
recoveredTaskRunning.clear();
}
}
}

// ------------------------------------------------------------------------
// serialization shenannigans
// ------------------------------------------------------------------------
Expand Down

0 comments on commit e676442

Please sign in to comment.