Skip to content

Commit

Permalink
[FLINK-34371][runtime] Start checkpoint only after tasks with blockin…
Browse files Browse the repository at this point in the history
…g edge finished
  • Loading branch information
yunfengzhou-hub authored and xintongsong committed Feb 28, 2024
1 parent 5fe0930 commit 8d45cd9
Show file tree
Hide file tree
Showing 6 changed files with 98 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2039,8 +2039,13 @@ public void startCheckpointScheduler() {
isPeriodicCheckpointingConfigured(),
"Can not start checkpoint scheduler, if no periodic checkpointing is configured");

// make sure all prior timers are cancelled
stopCheckpointScheduler();
if (isPeriodicCheckpointingStarted()) {
// cancel previously scheduled checkpoints and spare savepoints.
// TODO: Introduce a more general solution to the race condition
// between different checkpoint scheduling triggers.
// https://issues.apache.org/jira/browse/FLINK-34519
stopCheckpointScheduler();
}

periodicScheduling = true;
scheduleTriggerWithDelay(clock.relativeTimeMillis(), getRandomInitDelay());
Expand Down Expand Up @@ -2133,14 +2138,15 @@ private void restoreStateToCoordinators(
// job status listener that schedules / cancels periodic checkpoints
// ------------------------------------------------------------------------

public JobStatusListener createActivatorDeactivator() {
public JobStatusListener createActivatorDeactivator(boolean allTasksOutputNonBlocking) {
synchronized (lock) {
if (shutdown) {
throw new IllegalArgumentException("Checkpoint coordinator is shut down");
}

if (jobStatusListener == null) {
jobStatusListener = new CheckpointCoordinatorDeActivator(this);
jobStatusListener =
new CheckpointCoordinatorDeActivator(this, allTasksOutputNonBlocking);
}

return jobStatusListener;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@
public class CheckpointCoordinatorDeActivator implements JobStatusListener {

private final CheckpointCoordinator coordinator;
private final boolean allTasksOutputNonBlocking;

public CheckpointCoordinatorDeActivator(CheckpointCoordinator coordinator) {
public CheckpointCoordinatorDeActivator(
CheckpointCoordinator coordinator, boolean allTasksOutputNonBlocking) {
this.coordinator = checkNotNull(coordinator);
this.allTasksOutputNonBlocking = allTasksOutputNonBlocking;
}

@Override
public void jobStatusChanges(JobID jobId, JobStatus newJobStatus, long timestamp) {
if (newJobStatus == JobStatus.RUNNING) {
// start the checkpoint scheduler
if (newJobStatus == JobStatus.RUNNING && allTasksOutputNonBlocking) {
// start the checkpoint scheduler if there is no blocking edge
coordinator.startCheckpointScheduler();
} else {
// anything else should stop the trigger for now
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,8 +515,13 @@ public void failJobDueToTaskFailure(

if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
// the periodic checkpoint scheduler is activated and deactivated as a result of
// job status changes (running -> on, all other states -> off)
registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
// job status and topology changes (running & all edges non-blocking -> on, all
// other states -> off)
boolean allTasksOutputNonBlocking =
tasks.values().stream()
.noneMatch(vertex -> vertex.getJobVertex().isAnyOutputBlocking());
registerJobStatusListener(
checkpointCoordinator.createActivatorDeactivator(allTasksOutputNonBlocking));
}

this.stateBackendName = checkpointStateBackend.getName();
Expand Down Expand Up @@ -1376,6 +1381,13 @@ private boolean updateStateInternal(
return attempt.switchToInitializing();

case RUNNING:
if (!isAnyOutputBlocking()
&& checkpointCoordinator != null
&& checkpointCoordinator.isPeriodicCheckpointingConfigured()
&& !checkpointCoordinator.isPeriodicCheckpointingStarted()) {
checkpointCoordinator.startCheckpointScheduler();
}

return attempt.switchToRunning();

case FINISHED:
Expand Down Expand Up @@ -1413,6 +1425,11 @@ private boolean updateStateInternal(
}
}

private boolean isAnyOutputBlocking() {
return currentExecutions.values().stream()
.anyMatch(x -> x.getVertex().getJobVertex().getJobVertex().isAnyOutputBlocking());
}

private void maybeReleasePartitionGroupsFor(final Execution attempt) {
final ExecutionVertexID finishedExecutionVertex = attempt.getVertex().getID();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,8 @@ public class JobVertex implements java.io.Serializable {
*/
private boolean supportsConcurrentExecutionAttempts = true;

private boolean anyOutputBlocking = false;

private boolean parallelismConfigured = false;

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -496,6 +498,7 @@ public void updateCoLocationGroup(CoLocationGroupImpl group) {
// --------------------------------------------------------------------------------------------
public IntermediateDataSet getOrCreateResultDataSet(
IntermediateDataSetID id, ResultPartitionType partitionType) {
anyOutputBlocking |= partitionType.isBlockingOrBlockingPersistentResultPartition();
return this.results.computeIfAbsent(
id, key -> new IntermediateDataSet(id, partitionType, this));
}
Expand Down Expand Up @@ -557,6 +560,10 @@ public boolean isSupportsConcurrentExecutionAttempts() {
return supportsConcurrentExecutionAttempts;
}

public boolean isAnyOutputBlocking() {
return anyOutputBlocking;
}

// --------------------------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1801,6 +1801,53 @@ void testJobStatusHookWithJobFinished() throws Exception {
commonJobStatusHookTest(ExecutionState.FINISHED, JobStatus.FINISHED);
}

@Test
void testStartCheckpointOnlyAfterVertexWithBlockingEdgeFinished() {
final JobVertex source = new JobVertex("source");
source.setInvokableClass(NoOpInvokable.class);
final JobVertex map = new JobVertex("map");
map.setInvokableClass(NoOpInvokable.class);
final JobVertex sink = new JobVertex("sink");
sink.setInvokableClass(NoOpInvokable.class);

sink.connectNewDataSetAsInput(
map, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
map.connectNewDataSetAsInput(
source, DistributionPattern.POINTWISE, ResultPartitionType.PIPELINED);

final JobGraph jobGraph = JobGraphTestUtils.streamingJobGraph(source, map, sink);
enableCheckpointing(jobGraph, null, null, Long.MAX_VALUE - 1, true);

final DefaultScheduler scheduler = createSchedulerAndStartScheduling(jobGraph);
final CheckpointCoordinator checkpointCoordinator = scheduler.getCheckpointCoordinator();
assertThat(checkpointCoordinator.isPeriodicCheckpointingStarted()).isFalse();

final Iterator<ArchivedExecutionVertex> iterator =
scheduler
.requestJob()
.getArchivedExecutionGraph()
.getAllExecutionVertices()
.iterator();
final ExecutionAttemptID sourceAttemptId =
iterator.next().getCurrentExecutionAttempt().getAttemptId();
final ExecutionAttemptID mapAttemptId =
iterator.next().getCurrentExecutionAttempt().getAttemptId();
final ExecutionAttemptID sinkAttemptId =
iterator.next().getCurrentExecutionAttempt().getAttemptId();
assertThat(iterator).isExhausted();

transitionToRunning(scheduler, sourceAttemptId);
transitionToRunning(scheduler, mapAttemptId);
assertThat(checkpointCoordinator.isPeriodicCheckpointingStarted()).isFalse();

scheduler.updateTaskExecutionState(
new TaskExecutionState(sourceAttemptId, ExecutionState.FINISHED));
scheduler.updateTaskExecutionState(
new TaskExecutionState(mapAttemptId, ExecutionState.FINISHED));
transitionToRunning(scheduler, sinkAttemptId);
assertThat(checkpointCoordinator.isPeriodicCheckpointingStarted()).isTrue();
}

private void commonJobStatusHookTest(
ExecutionState expectedExecutionState, JobStatus expectedJobStatus) throws Exception {
final JobGraph jobGraph = singleNonParallelJobVertexJobGraph();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2346,6 +2346,10 @@ void testOutputOnlyAfterEndOfStream() {
assertHasOutputPartitionType(
vertexMap.get("transform -> Map"), ResultPartitionType.BLOCKING);

assertThat(vertexMap.get("Source: source").isAnyOutputBlocking()).isFalse();
assertThat(vertexMap.get("transform -> Map").isAnyOutputBlocking()).isTrue();
assertThat(vertexMap.get("sink: Writer").isAnyOutputBlocking()).isFalse();

env.disableOperatorChaining();
jobGraph = StreamingJobGraphGenerator.createJobGraph(env.getStreamGraph(false));
vertexMap = new HashMap<>();
Expand All @@ -2357,6 +2361,11 @@ void testOutputOnlyAfterEndOfStream() {
vertexMap.get("Source: source"), ResultPartitionType.PIPELINED_BOUNDED);
assertHasOutputPartitionType(vertexMap.get("transform"), ResultPartitionType.BLOCKING);
assertHasOutputPartitionType(vertexMap.get("Map"), ResultPartitionType.PIPELINED_BOUNDED);

assertThat(vertexMap.get("Source: source").isAnyOutputBlocking()).isFalse();
assertThat(vertexMap.get("transform").isAnyOutputBlocking()).isTrue();
assertThat(vertexMap.get("Map").isAnyOutputBlocking()).isFalse();
assertThat(vertexMap.get("sink: Writer").isAnyOutputBlocking()).isFalse();
}

private static void testWhetherOutputFormatSupportsConcurrentExecutionAttempts(
Expand Down

0 comments on commit 8d45cd9

Please sign in to comment.