Skip to content

Commit

Permalink
[hotfix][checkpointing] Extract CheckpointFailureManager.handleCheckp…
Browse files Browse the repository at this point in the history
…ointException method
  • Loading branch information
rkhachatryan authored and zhijiangW committed Jul 2, 2020
1 parent 835d41e commit c0be7a4
Showing 1 changed file with 7 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -60,11 +61,7 @@ public CheckpointFailureManager(int tolerableCpFailureNumber, FailJobCallback fa
* latest generated checkpoint id as a special flag.
*/
public void handleJobLevelCheckpointException(CheckpointException exception, long checkpointId) {
checkFailureCounter(exception, checkpointId);
if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
clearCount();
failureCallback.failJob(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."));
}
handleCheckpointException(exception, checkpointId, failureCallback::failJob);
}

/**
Expand All @@ -81,10 +78,14 @@ public void handleTaskLevelCheckpointException(
CheckpointException exception,
long checkpointId,
ExecutionAttemptID executionAttemptID) {
handleCheckpointException(exception, checkpointId, e -> failureCallback.failJobDueToTaskFailure(e, executionAttemptID));
}

private void handleCheckpointException(CheckpointException exception, long checkpointId, Consumer<FlinkRuntimeException> errorHandler) {
checkFailureCounter(exception, checkpointId);
if (continuousFailureCounter.get() > tolerableCpFailureNumber) {
clearCount();
failureCallback.failJobDueToTaskFailure(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."), executionAttemptID);
errorHandler.accept(new FlinkRuntimeException("Exceeded checkpoint tolerable failure threshold."));
}
}

Expand Down

0 comments on commit c0be7a4

Please sign in to comment.