Skip to content

Commit

Permalink
[FLINK-21246] Fail fast if a failure occurs when triggering a checkpoint
Browse files Browse the repository at this point in the history
on a Task

Prior to the change we did not check if the request finished successfully. If it did not, we were
leaving the checkpoint to fail due to a subsequent checkpoint. Since the Tasks can now finish it may be a more common situation that we
end up with a failed checkpoint triggering. We should fail the
checkpoint fast if we realize we failed to trigger some tasks.

This closes apache#16493
  • Loading branch information
dawidwys committed Jul 16, 2021
1 parent 49a05d8 commit 10db8b1
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
Expand Down Expand Up @@ -62,6 +63,7 @@

import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
Expand Down Expand Up @@ -608,7 +610,7 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {

Preconditions.checkState(
checkpoint != null || throwable != null,
"Either the pending checkpoint needs to be created or an error must have been occurred.");
"Either the pending checkpoint needs to be created or an error must have occurred.");

if (throwable != null) {
// the initialization might not be finished yet
Expand All @@ -618,38 +620,8 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
onTriggerFailure(checkpoint, throwable);
}
} else {
if (checkpoint.isDisposed()) {
onTriggerFailure(
checkpoint,
new CheckpointException(
CheckpointFailureReason
.TRIGGER_CHECKPOINT_FAILURE,
checkpoint.getFailureCause()));
} else {
// no exception, no discarding, everything is OK
final long checkpointId =
checkpoint.getCheckpointId();
snapshotTaskState(
timestamp,
checkpointId,
checkpoint.getCheckpointStorageLocation(),
request.props,
checkpoint
.getCheckpointPlan()
.getTasksToTrigger());

coordinatorsToCheckpoint.forEach(
(ctx) ->
ctx.afterSourceBarrierInjection(
checkpointId));
// It is possible that the tasks has finished
// checkpointing at this point.
// So we need to complete this pending checkpoint.
if (!maybeCompleteCheckpoint(checkpoint)) {
return null;
}
onTriggerSuccess();
}
triggerCheckpointRequest(
request, timestamp, checkpoint);
}
return null;
},
Expand All @@ -672,6 +644,75 @@ private void startTriggeringCheckpoint(CheckpointTriggerRequest request) {
}
}

private void triggerCheckpointRequest(
CheckpointTriggerRequest request, long timestamp, PendingCheckpoint checkpoint) {
if (checkpoint.isDisposed()) {
onTriggerFailure(
checkpoint,
new CheckpointException(
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
checkpoint.getFailureCause()));
} else {
triggerTasks(request, timestamp, checkpoint)
.exceptionally(
failure -> {
final CheckpointException cause;
if (failure instanceof CheckpointException) {
cause = (CheckpointException) failure;
} else {
cause =
new CheckpointException(
CheckpointFailureReason
.TRIGGER_CHECKPOINT_FAILURE,
failure);
}
timer.execute(
() -> {
synchronized (lock) {
abortPendingCheckpoint(checkpoint, cause);
}
});
return null;
});

coordinatorsToCheckpoint.forEach(
(ctx) -> ctx.afterSourceBarrierInjection(checkpoint.getCheckpointID()));
// It is possible that the tasks has finished
// checkpointing at this point.
// So we need to complete this pending checkpoint.
if (maybeCompleteCheckpoint(checkpoint)) {
onTriggerSuccess();
}
}
}

private CompletableFuture<Void> triggerTasks(
CheckpointTriggerRequest request, long timestamp, PendingCheckpoint checkpoint) {
// no exception, no discarding, everything is OK
final long checkpointId = checkpoint.getCheckpointID();

final CheckpointOptions checkpointOptions =
CheckpointOptions.forConfig(
request.props.getCheckpointType(),
checkpoint.getCheckpointStorageLocation().getLocationReference(),
isExactlyOnceMode,
unalignedCheckpointsEnabled,
alignedCheckpointTimeout);

// send messages to the tasks to trigger their checkpoints
List<CompletableFuture<Acknowledge>> acks = new ArrayList<>();
for (Execution execution : checkpoint.getCheckpointPlan().getTasksToTrigger()) {
if (request.props.isSynchronous()) {
acks.add(
execution.triggerSynchronousSavepoint(
checkpointId, timestamp, checkpointOptions));
} else {
acks.add(execution.triggerCheckpoint(checkpointId, timestamp, checkpointOptions));
}
}
return FutureUtils.waitForAll(acks);
}

/**
* Initialize the checkpoint trigger asynchronously. It will expected to be executed in io
* thread due to it might be time-consuming.
Expand Down Expand Up @@ -805,40 +846,6 @@ private CompletableFuture<Void> snapshotMasterState(PendingCheckpoint checkpoint
return masterStateCompletableFuture;
}

/**
* Snapshot task state.
*
* @param timestamp the timestamp of this checkpoint reques
* @param checkpointID the checkpoint id
* @param checkpointStorageLocation the checkpoint location
* @param props the checkpoint properties
* @param tasksToTrigger the executions which should be triggered
*/
private void snapshotTaskState(
long timestamp,
long checkpointID,
CheckpointStorageLocation checkpointStorageLocation,
CheckpointProperties props,
List<Execution> tasksToTrigger) {

final CheckpointOptions checkpointOptions =
CheckpointOptions.forConfig(
props.getCheckpointType(),
checkpointStorageLocation.getLocationReference(),
isExactlyOnceMode,
unalignedCheckpointsEnabled,
alignedCheckpointTimeout);

// send the messages to the tasks that trigger their checkpoint
for (Execution execution : tasksToTrigger) {
if (props.isSynchronous()) {
execution.triggerSynchronousSavepoint(checkpointID, timestamp, checkpointOptions);
} else {
execution.triggerCheckpoint(checkpointID, timestamp, checkpointOptions);
}
}
}

/** Trigger request is successful. NOTE, it must be invoked if trigger request is successful. */
private void onTriggerSuccess() {
isTriggering = false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -831,10 +831,11 @@ public void notifyCheckpointAborted(long abortCheckpointId, long timestamp) {
* @param checkpointId of th checkpoint to trigger
* @param timestamp of the checkpoint to trigger
* @param checkpointOptions of the checkpoint to trigger
* @return Future acknowledge which is returned once the checkpoint has been triggered
*/
public void triggerCheckpoint(
public CompletableFuture<Acknowledge> triggerCheckpoint(
long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
return triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
}

/**
Expand All @@ -843,13 +844,14 @@ public void triggerCheckpoint(
* @param checkpointId of th checkpoint to trigger
* @param timestamp of the checkpoint to trigger
* @param checkpointOptions of the checkpoint to trigger
* @return Future acknowledge which is returned once the checkpoint has been triggered
*/
public void triggerSynchronousSavepoint(
public CompletableFuture<Acknowledge> triggerSynchronousSavepoint(
long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {
triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
return triggerCheckpointHelper(checkpointId, timestamp, checkpointOptions);
}

private void triggerCheckpointHelper(
private CompletableFuture<Acknowledge> triggerCheckpointHelper(
long checkpointId, long timestamp, CheckpointOptions checkpointOptions) {

final CheckpointType checkpointType = checkpointOptions.getCheckpointType();
Expand All @@ -864,12 +866,12 @@ private void triggerCheckpointHelper(
if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

taskManagerGateway.triggerCheckpoint(
return taskManagerGateway.triggerCheckpoint(
attemptId, getVertex().getJobId(), checkpointId, timestamp, checkpointOptions);
} else {
LOG.debug(
"The execution has no slot assigned. This indicates that the execution is no longer running.");
}
LOG.debug(
"The execution has no slot assigned. This indicates that the execution is no longer running.");
return CompletableFuture.completedFuture(Acknowledge.get());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ void notifyCheckpointAborted(
* @param checkpointId of the checkpoint to trigger
* @param timestamp of the checkpoint to trigger
* @param checkpointOptions of the checkpoint to trigger
* @return Future acknowledge which is returned once the checkpoint has been triggered
*/
void triggerCheckpoint(
CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,13 @@ public void notifyCheckpointAborted(
}

@Override
public void triggerCheckpoint(
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions) {
taskExecutorGateway.triggerCheckpoint(
return taskExecutorGateway.triggerCheckpoint(
executionAttemptID, checkpointId, timestamp, checkpointOptions);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,19 @@
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraphTestUtils;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.rpc.exceptions.RpcException;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
Expand Down Expand Up @@ -474,6 +480,79 @@ public void testCheckpointTriggeredAfterSomeTasksFinishedIfAllowed() throws Exce
}
}

@Test
public void testTasksFinishDuringTriggering() throws Exception {
JobVertexID jobVertexID1 = new JobVertexID();
JobVertexID jobVertexID2 = new JobVertexID();

ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.setTransitToRunning(false)
.addJobVertex(jobVertexID1, 1, 256)
.addJobVertex(jobVertexID2, 1, 256)
.build();
ExecutionJobVertex jobVertex1 = graph.getJobVertex(jobVertexID1);
ExecutionVertex taskVertex = jobVertex1.getTaskVertices()[0];
ExecutionJobVertex jobVertex2 = graph.getJobVertex(jobVertexID2);
ExecutionVertex taskVertex2 = jobVertex2.getTaskVertices()[0];

AtomicBoolean checkpointAborted = new AtomicBoolean(false);
LogicalSlot slot1 =
new TestingLogicalSlotBuilder()
.setTaskManagerGateway(
new SimpleAckingTaskManagerGateway() {
@Override
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
long timestamp,
CheckpointOptions checkpointOptions) {
taskVertex.getCurrentExecutionAttempt().markFinished();
return FutureUtils.completedExceptionally(
new RpcException(""));
}
})
.createTestingLogicalSlot();
LogicalSlot slot2 =
new TestingLogicalSlotBuilder()
.setTaskManagerGateway(
new SimpleAckingTaskManagerGateway() {
@Override
public void notifyCheckpointAborted(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
long timestamp) {
checkpointAborted.set(true);
}
})
.createTestingLogicalSlot();
ExecutionGraphTestUtils.setVertexResource(taskVertex, slot1);
taskVertex.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);
ExecutionGraphTestUtils.setVertexResource(taskVertex2, slot2);
taskVertex2.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);

CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setExecutionGraph(graph)
.setTimer(manuallyTriggeredScheduledExecutor)
.setAllowCheckpointsAfterTasksFinished(true)
.build();

// nothing should be happening
assertEquals(0, checkpointCoordinator.getNumberOfPendingCheckpoints());
assertEquals(0, checkpointCoordinator.getNumberOfRetainedSuccessfulCheckpoints());

// trigger the first checkpoint. this will not fail because we allow checkpointing even with
// finished tasks
final CompletableFuture<CompletedCheckpoint> checkpointFuture =
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
assertTrue(checkpointFuture.isCompletedExceptionally());
assertTrue(checkpointAborted.get());
}

@Test
public void testTriggerAndDeclineCheckpointThenFailureManagerThrowsException()
throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
import org.apache.flink.runtime.jobmaster.LogicalSlot;
import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder;
import org.apache.flink.runtime.messages.Acknowledge;
import org.apache.flink.runtime.state.ChainedStateHandle;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand Down Expand Up @@ -452,7 +453,7 @@ static class CheckpointRecorderTaskManagerGateway extends SimpleAckingTaskManage
new HashMap<>();

@Override
public void triggerCheckpoint(
public CompletableFuture<Acknowledge> triggerCheckpoint(
ExecutionAttemptID attemptId,
JobID jobId,
long checkpointId,
Expand All @@ -463,6 +464,7 @@ public void triggerCheckpoint(
.add(
new TriggeredCheckpoint(
jobId, checkpointId, timestamp, checkpointOptions));
return CompletableFuture.completedFuture(Acknowledge.get());
}

@Override
Expand Down
Loading

0 comments on commit 10db8b1

Please sign in to comment.