Skip to content

Commit

Permalink
[FLINK-20331][checkpointing][task] Don't fail the task if unaligned c…
Browse files Browse the repository at this point in the history
…heckpoint was subsumed
  • Loading branch information
rkhachatryan authored and AHeise committed Nov 26, 2020
1 parent 54ac81a commit 50af0b1
Show file tree
Hide file tree
Showing 9 changed files with 39 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.annotation.Internal;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;

Expand All @@ -38,7 +39,7 @@ public interface CheckpointableInput {

int getNumberOfInputChannels();

void checkpointStarted(CheckpointBarrier barrier);
void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException;

void checkpointStopped(long cancelledCheckpointId);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.io.network.api.CheckpointBarrier;

Expand All @@ -30,7 +31,7 @@ public abstract class IndexedInputGate extends InputGate implements Checkpointab
public abstract int getGateIndex();

@Override
public void checkpointStarted(CheckpointBarrier barrier) {
public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
for (int index = 0, numChannels = getNumberOfInputChannels(); index < numChannels; index++) {
getChannel(index).checkpointStarted(barrier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.execution.CancelTaskException;
Expand Down Expand Up @@ -173,7 +174,7 @@ protected void notifyBufferAvailable(int numAvailableBuffers) throws IOException
/**
* Called by task thread when checkpointing is started (e.g., any input channel received barrier).
*/
public void checkpointStarted(CheckpointBarrier barrier) {
public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.event.TaskEvent;
Expand All @@ -36,7 +38,6 @@
import org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.PrioritizedDeque;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.util.Preconditions;

import org.apache.flink.shaded.guava18.com.google.common.collect.Iterators;

Expand Down Expand Up @@ -507,7 +508,7 @@ private SequenceBuffer announce(SequenceBuffer sequenceBuffer) throws IOExceptio
* Spills all queued buffers on checkpoint start. If barrier has already been received (and reordered), spill only
* the overtaken buffers.
*/
public void checkpointStarted(CheckpointBarrier barrier) {
public void checkpointStarted(CheckpointBarrier barrier) throws CheckpointException {
synchronized (receivedBuffers) {
channelStatePersister.startPersisting(
barrier.getId(),
Expand All @@ -526,7 +527,7 @@ public void checkpointStopped(long checkpointId) {
}

@VisibleForTesting
List<Buffer> getInflightBuffers(long checkpointId) {
List<Buffer> getInflightBuffers(long checkpointId) throws CheckpointException {
synchronized (receivedBuffers) {
return getInflightBuffersUnsafe(checkpointId);
}
Expand All @@ -535,13 +536,16 @@ List<Buffer> getInflightBuffers(long checkpointId) {
/**
* Returns a list of buffers, checking the first n non-priority buffers, and skipping all events.
*/
private List<Buffer> getInflightBuffersUnsafe(long checkpointId) {
private List<Buffer> getInflightBuffersUnsafe(long checkpointId) throws CheckpointException {
assert Thread.holdsLock(receivedBuffers);

if (checkpointId < lastBarrierId) {
throw new CheckpointException(
String.format("Sequence number for checkpoint %d is not known (it was likely been overwritten by a newer checkpoint %d)", checkpointId, lastBarrierId),
CheckpointFailureReason.CHECKPOINT_SUBSUMED); // currently, at most one active unaligned checkpoint is possible
}

final List<Buffer> inflightBuffers = new ArrayList<>();
Preconditions.checkState(
checkpointId >= lastBarrierId,
"Sequence number for checkpoint %s is not known (it was likely been overwritten by a newer checkpoint %s)", checkpointId, lastBarrierId);
Iterator<SequenceBuffer> iterator = receivedBuffers.iterator();
// skip all priority events (only buffers are stored anyways)
Iterators.advance(iterator, receivedBuffers.getNumPriorityElements());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.io.network.partition.consumer;

import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.ExecutionState;
Expand Down Expand Up @@ -1254,7 +1255,7 @@ private void send(RemoteInputChannel channel, int sequenceNumber, Buffer buffer)
channel.checkError();
}

private void assertInflightBufferSizes(RemoteInputChannel channel, Integer ...bufferSizes) {
private void assertInflightBufferSizes(RemoteInputChannel channel, Integer ...bufferSizes) throws CheckpointException {
assertEquals(Arrays.asList(bufferSizes), toBufferSizes(channel.getInflightBuffers(CHECKPOINT_ID)));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barr
@Override
public boolean preProcessFirstBarrier(
InputChannelInfo channelInfo,
CheckpointBarrier barrier) throws IOException {
CheckpointBarrier barrier) throws IOException, CheckpointException {
activeController = chooseController(barrier);
return activeController.preProcessFirstBarrier(channelInfo, barrier);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public interface CheckpointBarrierBehaviourController {
* {@link #barrierReceived(InputChannelInfo, CheckpointBarrier)} for that given checkpoint.
* @return {@code true} if checkpoint should be triggered.
*/
boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException;
boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException;

/**
* Invoked once per checkpoint, after the last invocation of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,17 @@ public void processBarrier(CheckpointBarrier barrier, InputChannelInfo channelIn
currentCheckpointId = barrierId;
numBarriersReceived = 0;
allBarriersReceivedFuture = new CompletableFuture<>();
if (controller.preProcessFirstBarrier(channelInfo, barrier)) {
LOG.debug("{}: Triggering checkpoint {} on the first barrier at {}.",
taskName,
barrier.getId(),
barrier.getTimestamp());
notifyCheckpoint(barrier);
try {
if (controller.preProcessFirstBarrier(channelInfo, barrier)) {
LOG.debug("{}: Triggering checkpoint {} on the first barrier at {}.",
taskName,
barrier.getId(),
barrier.getTimestamp());
notifyCheckpoint(barrier);
}
} catch (CheckpointException e) {
abortInternal(barrier.getId(), e);
return;
}
}

Expand Down Expand Up @@ -158,14 +163,16 @@ public void processBarrierAnnouncement(
@Override
public void processCancellationBarrier(CancelCheckpointMarker cancelBarrier) throws IOException {
final long cancelledId = cancelBarrier.getCheckpointId();
if (currentCheckpointId > cancelledId || (currentCheckpointId == cancelledId && numBarriersReceived == 0)) {
return;
if (cancelledId > currentCheckpointId || (cancelledId == currentCheckpointId && numBarriersReceived > 0)) {
abortInternal(cancelledId, new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER));
}
}

private void abortInternal(long cancelledId, CheckpointException exception) throws IOException {
// by setting the currentCheckpointId to this checkpoint while keeping the numBarriers
// at zero means that no checkpoint barrier can start a new alignment
currentCheckpointId = cancelledId;
currentCheckpointId = Math.max(cancelledId, currentCheckpointId);
numBarriersReceived = 0;
CheckpointException exception = new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED_ON_CANCELLATION_BARRIER);
controller.abortPendingCheckpoint(cancelledId, exception);
allBarriersReceivedFuture.completeExceptionally(exception);
notifyAbort(cancelledId, exception);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ public void barrierReceived(InputChannelInfo channelInfo, CheckpointBarrier barr
}

@Override
public boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException {
public boolean preProcessFirstBarrier(InputChannelInfo channelInfo, CheckpointBarrier barrier) throws IOException, CheckpointException {
checkpointCoordinator.initCheckpoint(barrier.getId(), barrier.getCheckpointOptions());
for (final CheckpointableInput input : inputs) {
input.checkpointStarted(barrier);
Expand Down

0 comments on commit 50af0b1

Please sign in to comment.