diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java index 3282c6d6eaef6..dc604a38ddc4c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierAligner.java @@ -134,7 +134,7 @@ public void processBarrier(CheckpointBarrier receivedBarrier, int channelIndex) // -- general code path for multiple input channels -- - if (numBarriersReceived > 0) { + if (isCheckpointPending()) { // this is only true if some alignment is already progress and was not canceled if (barrierId == currentCheckpointId) { @@ -243,7 +243,7 @@ public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) thr // -- general code path for multiple input channels -- - if (numBarriersReceived > 0) { + if (isCheckpointPending()) { // this is only true if some alignment is in progress and nothing was canceled if (barrierId == currentCheckpointId) { @@ -303,7 +303,7 @@ else if (barrierId > currentCheckpointId) { public void processEndOfPartition() throws Exception { numClosedChannels++; - if (numBarriersReceived > 0) { + if (isCheckpointPending()) { // let the task know we skip a checkpoint notifyAbort(currentCheckpointId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_INPUT_END_OF_STREAM)); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java index b2b1f2f2197f5..f98e83fe2ae39 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierUnaligner.java @@ -118,7 +118,7 @@ public class CheckpointBarrierUnaligner extends CheckpointBarrierHandler { @Override public void releaseBlocksAndResetBarriers() { - if (numBarrierConsumed > 0) { + if (isCheckpointPending()) { // make sure no additional data is persisted Arrays.fill(hasInflightBuffers, false); // the next barrier that comes must assume it is the first @@ -150,7 +150,7 @@ public void processBarrier( CheckpointBarrier receivedBarrier, int channelIndex) throws Exception { long barrierId = receivedBarrier.getId(); - if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && numBarrierConsumed == 0)) { + if (currentConsumedCheckpointId > barrierId || (currentConsumedCheckpointId == barrierId && !isCheckpointPending())) { // ignore old and cancelled barriers return; } @@ -173,11 +173,11 @@ public void processBarrier( public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws Exception { final long barrierId = cancelBarrier.getCheckpointId(); - if (currentConsumedCheckpointId >= barrierId && numBarrierConsumed == 0) { + if (currentConsumedCheckpointId >= barrierId && !isCheckpointPending()) { return; } - if (numBarrierConsumed > 0) { + if (isCheckpointPending()) { LOG.warn("{}: Received cancellation barrier for checkpoint {} before completing current checkpoint {}. " + "Skipping current checkpoint.", taskName, @@ -196,7 +196,7 @@ public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) thr public void processEndOfPartition() throws Exception { threadSafeUnaligner.onChannelClosed(); - if (numBarrierConsumed > 0) { + if (isCheckpointPending()) { // let the task know we skip a checkpoint notifyAbort( currentConsumedCheckpointId,