Skip to content

Commit

Permalink
[FLINK-17994][checkpointing] Fix the race condition between Checkpoin…
Browse files Browse the repository at this point in the history
…tBarrierUnaligner#processBarrier and #notifyBarrierReceived

The race condition happens as following:
1. CheckpointBarrierUnaligner#notifyBarrierReceived triggers an async checkpoint(ch1) in mailbox by netty thread.
2. CheckpointBarrierUnaligner#processBarrier also triggers a sync checkpoint(ch2) by task thread and executes immediately.
3. When ch1 is taken from mailbox by task thread to execute, it will cause illegal argument exception because it is smaller than the previous executed ch2.

For async checkpoint action, before it is actual executing, we can compare its id with previous executed checkpoint id. If it is not larger than the previous
one, we should ignore it to exit directly.

This closes apache#12406.
  • Loading branch information
zhijiangW committed Jun 4, 2020
1 parent 9add433 commit aa882c1
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,9 @@ public boolean isBlocked(int channelIndex) {
}

/**
* We still need to trigger checkpoint while reading the first barrier from one channel, because
* this might happen earlier than the previous async trigger via mailbox by netty thread.
* We still need to trigger checkpoint via {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}
* while reading the first barrier from one channel, because this might happen
* earlier than the previous async trigger via mailbox by netty thread.
*
* <p>Note this is also suitable for the trigger case of local input channel.
*/
Expand Down Expand Up @@ -256,8 +257,20 @@ int getNumOpenChannels() {
return threadSafeUnaligner.getNumOpenChannels();
}

@VisibleForTesting
ThreadSafeUnaligner getThreadSafeUnaligner() {
return threadSafeUnaligner;
}

private void notifyCheckpoint(CheckpointBarrier barrier) throws IOException {
// ignore the previous triggered checkpoint by netty thread if it was already canceled or aborted before.
if (barrier.getId() >= threadSafeUnaligner.getCurrentCheckpointId()) {
super.notifyCheckpoint(barrier, 0);
}
}

@ThreadSafe
private static class ThreadSafeUnaligner implements BufferReceivedListener, Closeable {
static class ThreadSafeUnaligner implements BufferReceivedListener, Closeable {

/**
* Tag the state of which input channel has not received the barrier, such that newly arriving buffers need
Expand All @@ -280,7 +293,6 @@ private static class ThreadSafeUnaligner implements BufferReceivedListener, Clos
*/
private long currentReceivedCheckpointId = -1L;

/** The number of open channels. */
private int numOpenChannels;

private final ChannelStateWriter channelStateWriter;
Expand All @@ -300,7 +312,7 @@ public synchronized void notifyBarrierReceived(CheckpointBarrier barrier, InputC

if (currentReceivedCheckpointId < barrierId) {
handleNewCheckpoint(barrier);
handler.executeInTaskThread(() -> handler.notifyCheckpoint(barrier, 0), "notifyCheckpoint");
handler.executeInTaskThread(() -> handler.notifyCheckpoint(barrier), "notifyCheckpoint");
}

int channelIndex = handler.getFlattenedChannelIndex(channelInfo);
Expand Down Expand Up @@ -396,5 +408,9 @@ synchronized void setCurrentReceivedCheckpointId(long currentReceivedCheckpointI
synchronized int getNumOpenChannels() {
return numOpenChannels;
}

synchronized long getCurrentCheckpointId() {
return currentReceivedCheckpointId;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -548,7 +548,8 @@ public final void invoke() throws Exception {
cleanUpInvoke();
}

protected boolean runMailboxStep() throws Exception {
@VisibleForTesting
public boolean runMailboxStep() throws Exception {
return mailboxProcessor.runMailboxStep();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.checkpoint.channel.RecordingChannelStateWriter;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
import org.apache.flink.runtime.io.network.NettyShuffleEnvironmentBuilder;
Expand All @@ -37,6 +39,9 @@
import org.apache.flink.runtime.io.network.util.TestBufferFactory;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.streaming.runtime.io.CheckpointBarrierUnaligner.ThreadSafeUnaligner;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
import org.apache.flink.util.function.ThrowingRunnable;

import org.junit.After;
Expand All @@ -48,6 +53,9 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
Expand Down Expand Up @@ -79,11 +87,15 @@ public void setUp() {

@After
public void ensureEmpty() throws Exception {
assertFalse(inputGate.pollNext().isPresent());
assertTrue(inputGate.isFinished());
if (inputGate != null) {
assertFalse(inputGate.pollNext().isPresent());
assertTrue(inputGate.isFinished());
inputGate.close();
}

channelStateWriter.close();
inputGate.close();
if (channelStateWriter != null) {
channelStateWriter.close();
}
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -463,6 +475,43 @@ public void testMultiChannelAbortCheckpoint() throws Exception {
assertInflightData();
}

/**
* Tests the race condition between {@link CheckpointBarrierUnaligner#processBarrier(CheckpointBarrier, int)}
* and {@link ThreadSafeUnaligner#notifyBarrierReceived(CheckpointBarrier, InputChannelInfo)}. The barrier
* notification will trigger an async checkpoint (ch1) via mailbox, and meanwhile the barrier processing will
* execute the next checkpoint (ch2) directly in advance. When the ch1 action is taken from mailbox to execute,
* it should be exit because it is smaller than the finished ch2.
*/
@Test
public void testConcurrentProcessBarrierAndNotifyBarrierReceived() throws Exception {
final ValidatingCheckpointInvokable invokable = new ValidatingCheckpointInvokable();
final CheckpointBarrierUnaligner handler = new CheckpointBarrierUnaligner(new int[] { 1 }, ChannelStateWriter.NO_OP, "test", invokable);
final InputChannelInfo channelInfo = new InputChannelInfo(0, 0);
final ExecutorService executor = Executors.newFixedThreadPool(1);

try {
// Enqueue the checkpoint (ch0) action into the mailbox of invokable because it is triggered by other thread.
Callable<Void> notifyTask = () -> {
handler.getThreadSafeUnaligner().notifyBarrierReceived(buildCheckpointBarrier(0), channelInfo);
return null;
};
Future<Void> result = executor.submit(notifyTask);
result.get();

// Execute the checkpoint (ch1) directly because it is triggered by main thread.
handler.processBarrier(buildCheckpointBarrier(1), 0);

// Run the previous queued mailbox action to execute ch0.
invokable.runMailboxStep();

// ch0 will not be executed finally because it is smaller than the previously executed ch1.
assertEquals(1, invokable.getTriggeredCheckpointId());
assertEquals(1, invokable.getTotalTriggeredCheckpoints());
} finally {
executor.shutdown();
}
}

// ------------------------------------------------------------------------
// Utils
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -571,6 +620,10 @@ private List<Object> getIds(Collection<BufferOrEvent> buffers) {
.collect(Collectors.toList());
}

private CheckpointBarrier buildCheckpointBarrier(int id) {
return new CheckpointBarrier(id, 0, CheckpointOptions.forCheckpointWithDefaultLocation());
}

// ------------------------------------------------------------------------
// Testing Mocks
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -639,4 +692,43 @@ public long getLastCanceledCheckpointId() {
return lastCanceledCheckpointId;
}
}

/**
* Specific {@link AbstractInvokable} implementation to record and validate which checkpoint
* id is executed and how many checkpoints are executed.
*/
private static final class ValidatingCheckpointInvokable extends StreamTask {

private long expectedCheckpointId;

private int totalNumCheckpoints;

ValidatingCheckpointInvokable() throws Exception {
super(new DummyEnvironment("test", 1, 0));
}

@Override
public void init() {
}

@Override
protected void processInput(MailboxDefaultAction.Controller controller) {
}

public void triggerCheckpointOnBarrier(
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) {
expectedCheckpointId = checkpointMetaData.getCheckpointId();
totalNumCheckpoints++;
}

long getTriggeredCheckpointId() {
return expectedCheckpointId;
}

int getTotalTriggeredCheckpoints() {
return totalNumCheckpoints;
}
}
}

0 comments on commit aa882c1

Please sign in to comment.