Skip to content

Commit

Permalink
[FLINK-18336][checkpointing] Ignore failures of past checkpoints in C…
Browse files Browse the repository at this point in the history
…heckpointFailureManager

Past checkpoints are subsumed checkpoints and savepoints.
  • Loading branch information
rkhachatryan authored and zhijiangW committed Jul 2, 2020
1 parent c0be7a4 commit 06af98a
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public class CheckpointFailureManager {
private final FailJobCallback failureCallback;
private final AtomicInteger continuousFailureCounter;
private final Set<Long> countedCheckpointIds;
private long lastSucceededCheckpointId = Long.MIN_VALUE;

public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback failureCallback) {
checkArgument(tolerableCpFailureNumber >= 0,
Expand Down Expand Up @@ -82,10 +83,12 @@ public void handleTaskLevelCheckpointException(
}

private void handleCheckpointException(CheckpointException exception, long checkpointId, Consumer<FlinkRuntimeException> errorHandler) {
checkFailureCounter(exception, checkpointId);
if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
clearCount();
errorHandler.accept(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."));
if (checkpointId > lastSucceededCheckpointId) {
checkFailureCounter(exception, checkpointId);
if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
clearCount();
errorHandler.accept(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."));
}
}
}

Expand Down Expand Up @@ -146,8 +149,11 @@ public void checkFailureCounter(
* @param checkpointId the failed checkpoint id used to count the continuous failure number based on
* checkpoint id sequence.
*/
public void handleCheckpointSuccess(@SuppressWarnings("unused") long checkpointId) {
clearCount();
public void handleCheckpointSuccess(long checkpointId) {
if (checkpointId > lastSucceededCheckpointId) {
lastSucceededCheckpointId = checkpointId;
clearCount();
}
}

private void clearCount() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,27 @@

import org.junit.Test;

import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_EXPIRED;
import static org.junit.Assert.assertEquals;

/**
* Tests for the checkpoint failure manager.
*/
public class CheckpointFailureManagerTest extends TestLogger {

@Test
public void testIgnoresPastCheckpoints() {
TestFailJobCallback callback = new TestFailJobCallback();
CheckpointFailureManager failureManager = new CheckpointFailureManager(2, callback);
failureManager.handleJobLevelCheckpointException(new CheckpointException(CHECKPOINT_EXPIRED), 1L);
failureManager.handleJobLevelCheckpointException(new CheckpointException(CHECKPOINT_EXPIRED), 2L);
failureManager.handleCheckpointSuccess(2L);
failureManager.handleJobLevelCheckpointException(new CheckpointException(CHECKPOINT_EXPIRED), 1L);
failureManager.handleJobLevelCheckpointException(new CheckpointException(CHECKPOINT_EXPIRED), 3L);
failureManager.handleJobLevelCheckpointException(new CheckpointException(CHECKPOINT_EXPIRED), 4L);
assertEquals(0, callback.getInvokeCounter());
}

@Test
public void testContinuousFailure() {
TestFailJobCallback callback = new TestFailJobCallback();
Expand Down Expand Up @@ -64,7 +78,7 @@ public void testBreakContinuousFailure() {
failureManager.handleCheckpointSuccess(4);

failureManager.handleJobLevelCheckpointException(
new CheckpointException(CheckpointFailureReason.CHECKPOINT_EXPIRED), 5);
new CheckpointException(CHECKPOINT_EXPIRED), 5);
assertEquals(0, callback.getInvokeCounter());
}

Expand Down

0 comments on commit 06af98a

Please sign in to comment.