Skip to content

Commit

Permalink
[FLINK-21080][runtime][checkpoint] Report latest completed checkpoint…
Browse files Browse the repository at this point in the history
… id when notifying checkpoint abort

This closes apache#16633
  • Loading branch information
gaoyunhaii authored and dawidwys committed Aug 3, 2021
1 parent 0182cd3 commit 2f5655a
Show file tree
Hide file tree
Showing 22 changed files with 246 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1314,14 +1314,18 @@ private void sendAcknowledgeMessages(

private void sendAbortedMessages(
List<ExecutionVertex> tasksToAbort, long checkpointId, long timeStamp) {
assert (Thread.holdsLock(lock));
long latestCompletedCheckpointId = completedCheckpointStore.getLatestCheckpointId();

// send notification of aborted checkpoints asynchronously.
executor.execute(
() -> {
// send the "abort checkpoint" messages to necessary vertices.
for (ExecutionVertex ev : tasksToAbort) {
Execution ee = ev.getCurrentExecutionAttempt();
if (ee != null) {
ee.notifyCheckpointAborted(checkpointId, timeStamp);
ee.notifyCheckpointAborted(
checkpointId, latestCompletedCheckpointId, timeStamp);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,21 @@ default CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRec
return lastCompleted;
}

/** Returns the id of the latest completed checkpoints. */
default long getLatestCheckpointId() {
try {
List<CompletedCheckpoint> allCheckpoints = getAllCheckpoints();
if (allCheckpoints.isEmpty()) {
return 0;
}

return allCheckpoints.get(allCheckpoints.size() - 1).getCheckpointID();
} catch (Throwable throwable) {
LOG.warn("Get the latest completed checkpoints failed", throwable);
return 0;
}
}

/**
* Shuts down the store.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -796,16 +796,22 @@ public void notifyCheckpointComplete(long checkpointId, long timestamp) {
* Notify the task of this execution about a aborted checkpoint.
*
* @param abortCheckpointId of the subsumed checkpoint
* @param latestCompletedCheckpointId of the latest completed checkpoint
* @param timestamp of the subsumed checkpoint
*/
public void notifyCheckpointAborted(long abortCheckpointId, long timestamp) {
public void notifyCheckpointAborted(
long abortCheckpointId, long latestCompletedCheckpointId, long timestamp) {
final LogicalSlot slot = assignedResource;

if (slot != null) {
final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();

taskManagerGateway.notifyCheckpointAborted(
attemptId, getVertex().getJobId(), abortCheckpointId, timestamp);
attemptId,
getVertex().getJobId(),
abortCheckpointId,
latestCompletedCheckpointId,
timestamp);
} else {
LOG.debug(
"The execution has no slot assigned. This indicates that the execution is "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -282,9 +282,11 @@ public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
* notification.
*
* @param checkpointId The ID of the checkpoint that is aborted.
* @param latestCompletedCheckpointId The ID of the latest completed checkpoint.
* @return future that completes when the notification has been processed by the task.
*/
public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
public Future<Void> notifyCheckpointAbortAsync(
long checkpointId, long latestCompletedCheckpointId) {
throw new UnsupportedOperationException(
String.format(
"notifyCheckpointAbortAsync not supported by %s",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,15 @@ void notifyCheckpointComplete(
* @param executionAttemptID identifying the task
* @param jobId identifying the job to which the task belongs
* @param checkpointId of the subsumed checkpoint
* @param latestCompletedCheckpointId of the latest completed checkpoint
* @param timestamp of the subsumed checkpoint
*/
void notifyCheckpointAborted(
ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp);
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
long latestCompletedCheckpointId,
long timestamp);

/**
* Trigger for the given task a checkpoint.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,13 @@ public void notifyCheckpointComplete(

@Override
public void notifyCheckpointAborted(
ExecutionAttemptID executionAttemptID, JobID jobId, long checkpointId, long timestamp) {
taskExecutorGateway.abortCheckpoint(executionAttemptID, checkpointId, timestamp);
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
long latestCompletedCheckpointId,
long timestamp) {
taskExecutorGateway.abortCheckpoint(
executionAttemptID, checkpointId, latestCompletedCheckpointId, timestamp);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1000,7 +1000,10 @@ public CompletableFuture<Acknowledge> confirmCheckpoint(

@Override
public CompletableFuture<Acknowledge> abortCheckpoint(
ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
ExecutionAttemptID executionAttemptID,
long checkpointId,
long latestCompletedCheckpointId,
long checkpointTimestamp) {
log.debug(
"Abort checkpoint {}@{} for {}.",
checkpointId,
Expand All @@ -1010,7 +1013,7 @@ public CompletableFuture<Acknowledge> abortCheckpoint(
final Task task = taskSlotTable.getTask(executionAttemptID);

if (task != null) {
task.notifyCheckpointAborted(checkpointId);
task.notifyCheckpointAborted(checkpointId, latestCompletedCheckpointId);

return CompletableFuture.completedFuture(Acknowledge.get());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,11 +156,15 @@ CompletableFuture<Acknowledge> confirmCheckpoint(
*
* @param executionAttemptID identifying the task
* @param checkpointId unique id for the checkpoint
* @param latestCompletedCheckpointId the id of the latest completed checkpoint
* @param checkpointTimestamp is the timestamp when the checkpoint has been initiated
* @return Future acknowledge if the checkpoint has been successfully confirmed
*/
CompletableFuture<Acknowledge> abortCheckpoint(
ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp);
ExecutionAttemptID executionAttemptID,
long checkpointId,
long latestCompletedCheckpointId,
long checkpointTimestamp);

/**
* Cancel the given task.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,9 +138,12 @@ public CompletableFuture<Acknowledge> confirmCheckpoint(

@Override
public CompletableFuture<Acknowledge> abortCheckpoint(
ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
ExecutionAttemptID executionAttemptID,
long checkpointId,
long latestCompletedCheckpointId,
long checkpointTimestamp) {
return originalGateway.abortCheckpoint(
executionAttemptID, checkpointId, checkpointTimestamp);
executionAttemptID, checkpointId, latestCompletedCheckpointId, checkpointTimestamp);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.TaskInfo;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.state.CheckpointListener;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.core.fs.FileSystemSafetyNet;
Expand Down Expand Up @@ -137,11 +136,7 @@
* <p>Each Task is run by one dedicated thread.
*/
public class Task
implements Runnable,
TaskSlotPayload,
TaskActions,
PartitionProducerStateProvider,
CheckpointListener {
implements Runnable, TaskSlotPayload, TaskActions, PartitionProducerStateProvider {

/** The class logger. */
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
Expand Down Expand Up @@ -1356,7 +1351,6 @@ public void triggerCheckpointBarrier(
}
}

@Override
public void notifyCheckpointComplete(final long checkpointID) {
final AbstractInvokable invokable = this.invokable;

Expand Down Expand Up @@ -1384,13 +1378,13 @@ public void notifyCheckpointComplete(final long checkpointID) {
}
}

@Override
public void notifyCheckpointAborted(final long checkpointID) {
public void notifyCheckpointAborted(
final long checkpointID, final long latestCompletedCheckpointId) {
final AbstractInvokable invokable = this.invokable;

if (executionState == ExecutionState.RUNNING && invokable != null) {
try {
invokable.notifyCheckpointAbortAsync(checkpointID);
invokable.notifyCheckpointAbortAsync(checkpointID, latestCompletedCheckpointId);
} catch (RejectedExecutionException ex) {
// This may happen if the mailbox is closed. It means that the task is shutting
// down, so we just ignore it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import static org.apache.flink.runtime.checkpoint.CheckpointFailureReason.CHECKPOINT_ASYNC_EXCEPTION;
Expand Down Expand Up @@ -523,6 +524,7 @@ public void notifyCheckpointAborted(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
long latestCompletedCheckpointId,
long timestamp) {
checkpointAborted.set(true);
}
Expand Down Expand Up @@ -3514,6 +3516,76 @@ public void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception
}
}

@Test
public void testReportLatestCompletedCheckpointIdWithAbort() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
ExecutionGraph graph =
new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder()
.addJobVertex(jobVertexID)
.setTransitToRunning(false)
.build();

ExecutionVertex task = graph.getJobVertex(jobVertexID).getTaskVertices()[0];

AtomicLong reportedCheckpointId = new AtomicLong(-1);
LogicalSlot slot =
new TestingLogicalSlotBuilder()
.setTaskManagerGateway(
new SimpleAckingTaskManagerGateway() {
@Override
public void notifyCheckpointAborted(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
long latestCompletedCheckpointId,
long timestamp) {
reportedCheckpointId.set(latestCompletedCheckpointId);
}
})
.createTestingLogicalSlot();
ExecutionGraphTestUtils.setVertexResource(task, slot);
task.getCurrentExecutionAttempt().transitionState(ExecutionState.RUNNING);

CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setExecutionGraph(graph)
.setTimer(manuallyTriggeredScheduledExecutor)
.setAllowCheckpointsAfterTasksFinished(true)
.build();

// Trigger a successful checkpoint
CompletableFuture<CompletedCheckpoint> result =
checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
long completedCheckpointId =
checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
checkpointCoordinator.receiveAcknowledgeMessage(
new AcknowledgeCheckpoint(
graph.getJobID(),
task.getCurrentExecutionAttempt().getAttemptId(),
completedCheckpointId,
new CheckpointMetrics(),
new TaskStateSnapshot()),
"localhost");
assertTrue(result.isDone());
assertFalse(result.isCompletedExceptionally());

result = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();
long abortedCheckpointId =
checkpointCoordinator.getPendingCheckpoints().entrySet().iterator().next().getKey();
checkpointCoordinator.receiveDeclineMessage(
new DeclineCheckpoint(
graph.getJobID(),
task.getCurrentExecutionAttempt().getAttemptId(),
abortedCheckpointId,
new CheckpointException(CHECKPOINT_EXPIRED)),
"localhost");
assertTrue(result.isCompletedExceptionally());

assertEquals(completedCheckpointId, reportedCheckpointId.get());
}

private CheckpointCoordinator getCheckpointCoordinator(ExecutionGraph graph) throws Exception {
return new CheckpointCoordinatorBuilder()
.setExecutionGraph(graph)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,11 @@ public void notifyCheckpointComplete(

@Override
public void notifyCheckpointAborted(
ExecutionAttemptID attemptId, JobID jobId, long checkpointId, long timestamp) {
ExecutionAttemptID attemptId,
JobID jobId,
long checkpointId,
long latestCompletedCheckpointId,
long timestamp) {
notifiedAbortCheckpoints
.computeIfAbsent(attemptId, k -> new ArrayList<>())
.add(new NotifiedCheckpoint(jobId, checkpointId, timestamp));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,21 @@ public void testDiscardAllCheckpoints() throws Exception {
}
}

@Test
public void testAcquireLatestCompletedCheckpointId() throws Exception {
SharedStateRegistry sharedStateRegistry = new SharedStateRegistry();
CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1);
assertEquals(0, checkpoints.getLatestCheckpointId());

checkpoints.addCheckpoint(
createCheckpoint(2, sharedStateRegistry), new CheckpointsCleaner(), () -> {});
assertEquals(2, checkpoints.getLatestCheckpointId());

checkpoints.addCheckpoint(
createCheckpoint(4, sharedStateRegistry), new CheckpointsCleaner(), () -> {});
assertEquals(4, checkpoints.getLatestCheckpointId());
}

// ---------------------------------------------------------------------------------------------

public static TestCompletedCheckpoint createCheckpoint(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ public void notifyCheckpointAborted(
ExecutionAttemptID executionAttemptID,
JobID jobId,
long checkpointId,
long latestCompletedCheckpointId,
long timestamp) {}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -604,7 +604,8 @@ public Future<Void> notifyCheckpointCompleteAsync(long checkpointId) {
}

@Override
public Future<Void> notifyCheckpointAbortAsync(long checkpointId) {
public Future<Void> notifyCheckpointAbortAsync(
long checkpointId, long latestCompletedCheckpointId) {
return CompletableFuture.completedFuture(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,10 @@ public CompletableFuture<Acknowledge> confirmCheckpoint(

@Override
public CompletableFuture<Acknowledge> abortCheckpoint(
ExecutionAttemptID executionAttemptID, long checkpointId, long checkpointTimestamp) {
ExecutionAttemptID executionAttemptID,
long checkpointId,
long latestCompletedCheckpointId,
long checkpointTimestamp) {
return CompletableFuture.completedFuture(Acknowledge.get());
}

Expand Down
Loading

0 comments on commit 2f5655a

Please sign in to comment.