Skip to content

Commit

Permalink
[hotfix] Add unit test for checkpoint failure.
Browse files Browse the repository at this point in the history
  • Loading branch information
becketqin authored and pnowojski committed Sep 8, 2020
1 parent 977454e commit 99cd44f
Showing 1 changed file with 126 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder;
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
Expand All @@ -50,6 +53,9 @@
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore;
import org.apache.flink.util.ExceptionUtils;
Expand Down Expand Up @@ -2436,6 +2442,126 @@ public Integer deserialize(int version, byte[] serialized) throws IOException {
checkpointCoordinator.shutdown(JobStatus.FINISHED);
}

@Test
public void testCompleteCheckpointFailureWithExternallyInducedSource() throws Exception {
final JobID jobId = new JobID();

// create some mock Execution vertices that receive the checkpoint trigger messages
final ExecutionAttemptID attemptID1 = new ExecutionAttemptID();
final ExecutionAttemptID attemptID2 = new ExecutionAttemptID();
ExecutionVertex vertex1 = mockExecutionVertex(attemptID1,
(executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});
ExecutionVertex vertex2 = mockExecutionVertex(attemptID2,
(executionAttemptID, jid, checkpointId, timestamp, checkpointOptions, advanceToEndOfEventTime) -> {});

OperatorID opID1 = OperatorID.fromJobVertexID(vertex1.getJobvertexId());
OperatorID opID2 = OperatorID.fromJobVertexID(vertex2.getJobvertexId());
TaskStateSnapshot taskOperatorSubtaskStates1 = new TaskStateSnapshot();
TaskStateSnapshot taskOperatorSubtaskStates2 = new TaskStateSnapshot();
OperatorSubtaskState subtaskState1 = new OperatorSubtaskState();
OperatorSubtaskState subtaskState2 = new OperatorSubtaskState();
taskOperatorSubtaskStates1.putSubtaskStateByOperatorID(opID1, subtaskState1);
taskOperatorSubtaskStates2.putSubtaskStateByOperatorID(opID2, subtaskState2);

// Create a mock OperatorCoordinatorCheckpointContext which completes the checkpoint immediately.
AtomicBoolean coordCheckpointDone = new AtomicBoolean(false);
OperatorCoordinatorCheckpointContext coordinatorCheckpointContext =
new CheckpointCoordinatorTestingUtils.MockOperatorCheckpointCoordinatorContextBuilder()
.setOnCallingCheckpointCoordinator((checkpointId, result) -> {
coordCheckpointDone.set(true);
result.complete(new byte[0]);
})
.setOperatorID(opID1)
.build();

// set up the coordinator and validate the initial state
CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorBuilder()
.setJobId(jobId)
.setTasks(new ExecutionVertex[]{ vertex1, vertex2 })
.setCheckpointCoordinatorConfiguration(
CheckpointCoordinatorConfiguration.builder().setMaxConcurrentCheckpoints(Integer.MAX_VALUE).build())
.setTimer(manuallyTriggeredScheduledExecutor)
.setCoordinatorsToCheckpoint(Collections.singleton(coordinatorCheckpointContext))
.setStateBackEnd(new MemoryStateBackend() {
private static final long serialVersionUID = 8134582566514272546L;

// Throw exception when finalizing the checkpoint.
@Override
public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
return new MemoryBackendCheckpointStorage(jobId, null, null, 100) {
@Override
public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
return new NonPersistentMetadataCheckpointStorageLocation(1000) {
@Override
public CheckpointMetadataOutputStream createMetadataOutputStream() throws IOException {
throw new IOException("Artificial Exception");
}
};
}
};
}
})
.build();
AtomicReference<Long> checkpointIdRef = new AtomicReference<>();

// Add a master hook which triggers and acks the task checkpoint immediately.
// In this case the task checkpoints would complete before the job master checkpoint completes.
checkpointCoordinator.addMasterHook(new MasterTriggerRestoreHook<Integer>() {
@Override
public String getIdentifier() {
return "anything";
}

@Override
@Nullable
public CompletableFuture<Integer> triggerCheckpoint(long checkpointId, long timestamp, Executor executor) throws Exception {
assertTrue("The coordinator checkpoint should have finished.", coordCheckpointDone.get());
// Acknowledge the checkpoint in the master hooks so the task snapshots complete before
// the master state snapshot completes.
checkpointIdRef.set(checkpointId);
AcknowledgeCheckpoint acknowledgeCheckpoint1 = new AcknowledgeCheckpoint(
jobId, attemptID1, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates1);
AcknowledgeCheckpoint acknowledgeCheckpoint2 = new AcknowledgeCheckpoint(
jobId, attemptID2, checkpointId, new CheckpointMetrics(), taskOperatorSubtaskStates2);
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint1, TASK_MANAGER_LOCATION_INFO);
checkpointCoordinator.receiveAcknowledgeMessage(acknowledgeCheckpoint2, TASK_MANAGER_LOCATION_INFO);
return null;
}

@Override
public void restoreCheckpoint(long checkpointId, Integer checkpointData) throws Exception {

}

@Override
public SimpleVersionedSerializer<Integer> createCheckpointDataSerializer() {
return new SimpleVersionedSerializer<Integer>() {
@Override
public int getVersion() {
return 0;
}

@Override
public byte[] serialize(Integer obj) throws IOException {
return new byte[0];
}

@Override
public Integer deserialize(int version, byte[] serialized) throws IOException {
return 1;
}
};
}
});

// trigger the first checkpoint. this should succeed
final CompletableFuture<CompletedCheckpoint> checkpointFuture = checkpointCoordinator.triggerCheckpoint(false);
manuallyTriggeredScheduledExecutor.triggerAll();

assertTrue(checkpointFuture.isCompletedExceptionally());
assertTrue(checkpointCoordinator.getSuccessfulCheckpoints().isEmpty());
}

private CheckpointCoordinator getCheckpointCoordinator(
JobID jobId,
ExecutionVertex vertex1,
Expand Down

0 comments on commit 99cd44f

Please sign in to comment.