From 50af0b161b1962d9db5c692aa965b310a4000da9 Mon Sep 17 00:00:00 2001 From: Roman Khachatryan Date: Wed, 25 Nov 2020 13:26:06 +0100 Subject: [PATCH] [FLINK-20331][checkpointing][task] Don't fail the task if unaligned checkpoint was subsumed --- .../consumer/CheckpointableInput.java | 3 ++- .../partition/consumer/IndexedInputGate.java | 3 ++- .../partition/consumer/InputChannel.java | 3 ++- .../consumer/RemoteInputChannel.java | 18 ++++++++----- .../consumer/RemoteInputChannelTest.java | 3 ++- .../runtime/io/AlternatingController.java | 2 +- .../CheckpointBarrierBehaviourController.java | 2 +- .../io/SingleCheckpointBarrierHandler.java | 27 ++++++++++++------- .../runtime/io/UnalignedController.java | 2 +- 9 files changed, 39 insertions(+), 24 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java index 04fb7d91ebea1..2316a90d5a207 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/CheckpointableInput.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -38,7 +39,7 @@ public interface CheckpointableInput { int getNumberOfInputChannels(); - void checkpointStarted(CheckpointBarrier barrier); + void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException; void checkpointStopped(long cancelledCheckpointId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java index af0f3a5b3776a..3b8ecb2b60a49 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/IndexedInputGate.java @@ -17,6 +17,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; @@ -30,7 +31,7 @@ public abstract class IndexedInputGate extends InputGate implements Checkpointab public abstract int getGateIndex(); @Override - public void checkpointStarted(CheckpointBarrier barrier) { + public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException { for (int index = 0, numChannels = getNumberOfInputChannels(); index < numChannels; index++) { getChannel(index).checkpointStarted(barrier); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java index 02a05263a7808..792392b95fb0a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputChannel.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.event.TaskEvent; import org.apache.flink.runtime.execution.CancelTaskException; @@ -173,7 +174,7 @@ protected void notifyBufferAvailable(int numAvailableBuffers) throws IOException /** * Called by task thread when checkpointing is started (e.g., any input channel received barrier). */ - public void checkpointStarted(CheckpointBarrier barrier) { + public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException { } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java index 85fbe4a0ff890..1992cdbd6579a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannel.java @@ -20,6 +20,8 @@ import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.metrics.Counter; +import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.event.TaskEvent; @@ -36,7 +38,6 @@ import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException; import org.apache.flink.runtime.io.network.partition.PrioritizedDeque; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.util.Preconditions; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators; @@ -507,7 +508,7 @@ private SequenceBuffer announce(SequenceBuffer sequenceBuffer) throws IOExceptio * Spills all queued buffers on checkpoint start. If barrier has already been received (and reordered), spill only * the overtaken buffers. */ - public void checkpointStarted(CheckpointBarrier barrier) { + public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException { synchronized (receivedBuffers) { channelStatePersister.startPersisting( barrier.getId(), @@ -526,7 +527,7 @@ public void checkpointStopped(long checkpointId) { } @VisibleForTesting - List getInflightBuffers(long checkpointId) { + List getInflightBuffers(long checkpointId) throws CheckpointException { synchronized (receivedBuffers) { return getInflightBuffersUnsafe(checkpointId); } @@ -535,13 +536,16 @@ List getInflightBuffers(long checkpointId) { /** * Returns a list of buffers, checking the first n non-priority buffers, and skipping all events. */ - private List getInflightBuffersUnsafe(long checkpointId) { + private List getInflightBuffersUnsafe(long checkpointId) throws CheckpointException { assert Thread.holdsLock(receivedBuffers); + if (checkpointId < lastBarrierId) { + throw new CheckpointException( + String.format("Sequence number for checkpoint %d is not known (it was likely been overwritten by a newer checkpoint %d)", checkpointId, lastBarrierId), + CheckpointFailureReason.CHECKPOINT_SUBSUMED); // currently, at most one active unaligned checkpoint is possible + } + final List inflightBuffers = new ArrayList<>(); - Preconditions.checkState( - checkpointId >= lastBarrierId, - "Sequence number for checkpoint %s is not known (it was likely been overwritten by a newer checkpoint %s)", checkpointId, lastBarrierId); Iterator iterator = receivedBuffers.iterator(); // skip all priority events (only buffers are stored anyways) Iterators.advance(iterator, receivedBuffers.getNumPriorityElements()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java index 24334515c9f02..c74cf4e872418 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/RemoteInputChannelTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.io.network.partition.consumer; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.ExecutionState; @@ -1254,7 +1255,7 @@ private void send(RemoteInputChannel channel, int sequenceNumber, Buffer buffer) channel.checkError(); } - private void assertInflightBufferSizes(RemoteInputChannel channel, Integer ...bufferSizes) { + private void assertInflightBufferSizes(RemoteInputChannel channel, Integer ...bufferSizes) throws CheckpointException { assertEquals(Arrays.asList(bufferSizes), toBufferSizes(channel.getInflightBuffers(CHECKPOINT_ID))); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java index 3f241c1b7364e..eb302ceaf0222 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingController.java @@ -52,7 +52,7 @@ public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barr @Override public boolean preProcessFirstBarrier( InputChannelInfo channelInfo, - CheckpointBarrier barrier) throws IOException { + CheckpointBarrier barrier) throws IOException, CheckpointException { activeController = chooseController(barrier); return activeController.preProcessFirstBarrier(channelInfo, barrier); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java index 81159e2af6c54..f38d745f421e1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierBehaviourController.java @@ -41,7 +41,7 @@ public interface CheckpointBarrierBehaviourController { * {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)} for that given checkpoint. * @return {@code true} if checkpoint should be triggered. */ - boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException; + boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException; /** * Invoked once per checkpoint, after the last invocation of diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java index 54f31a1193173..ef8eb459278ea 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/SingleCheckpointBarrierHandler.java @@ -118,12 +118,17 @@ public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelIn currentCheckpointId = barrierId; numBarriersReceived = 0; allBarriersReceivedFuture = new CompletableFuture<>(); - if (controller.preProcessFirstBarrier(channelInfo, barrier)) { - LOG.debug("{}: Triggering checkpoint {} on the first barrier at {}.", - taskName, - barrier.getId(), - barrier.getTimestamp()); - notifyCheckpoint(barrier); + try { + if (controller.preProcessFirstBarrier(channelInfo, barrier)) { + LOG.debug("{}: Triggering checkpoint {} on the first barrier at {}.", + taskName, + barrier.getId(), + barrier.getTimestamp()); + notifyCheckpoint(barrier); + } + } catch (CheckpointException e) { + abortInternal(barrier.getId(), e); + return; } } @@ -158,14 +163,16 @@ public void processBarrierAnnouncement( @Override public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws IOException { final long cancelledId = cancelBarrier.getCheckpointId(); - if (currentCheckpointId > cancelledId || (currentCheckpointId == cancelledId && numBarriersReceived == 0)) { - return; + if (cancelledId > currentCheckpointId || (cancelledId == currentCheckpointId && numBarriersReceived > 0)) { + abortInternal(cancelledId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER)); } + } + + private void abortInternal(long cancelledId, CheckpointException exception) throws IOException { // by setting the currentCheckpointId to this checkpoint while keeping the numBarriers // at zero means that no checkpoint barrier can start a new alignment - currentCheckpointId = cancelledId; + currentCheckpointId = Math.max(cancelledId, currentCheckpointId); numBarriersReceived = 0; - CheckpointException exception = new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER); controller.abortPendingCheckpoint(cancelledId, exception); allBarriersReceivedFuture.completeExceptionally(exception); notifyAbort(cancelledId, exception); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java index 7cc109af197c3..d53b4ecbf2f9e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/UnalignedController.java @@ -48,7 +48,7 @@ public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barr } @Override - public boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException { + public boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException { checkpointCoordinator.initCheckpoint(barrier.getId(), barrier.getCheckpointOptions()); for (final CheckpointableInput input : inputs) { input.checkpointStarted(barrier);