Skip to content

Commit

Permalink
Revert "[FLINK-21178][Runtime/Checkpointing] Task failure should trig…
Browse files Browse the repository at this point in the history
…ger master hook's reset() (apache#14890)"

This reverts commit 816ce96
  • Loading branch information
wuchong committed Mar 3, 2021
1 parent 57decce commit 9588c5d
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1536,13 +1536,13 @@ private OptionalLong restoreLatestCheckpointedStateInternal(
throw new IllegalStateException("No completed checkpoint available");
}

LOG.debug("Resetting the master hooks.");
MasterHooks.reset(masterHooks.values(), LOG);

if (operatorCoordinatorRestoreBehavior
== OperatorCoordinatorRestoreBehavior.RESTORE_OR_RESET) {
// we let the JobManager-side components know that there was a recovery,
// even if there was no checkpoint to recover from, yet
LOG.debug("Resetting the master hooks.");
MasterHooks.reset(masterHooks.values(), LOG);

LOG.info("Resetting the Operator Coordinators to an empty state.");
restoreStateToCoordinators(
OperatorCoordinator.NO_CHECKPOINT, Collections.emptyMap());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3350,26 +3350,6 @@ public Integer deserialize(int version, byte[] serialized)
assertTrue(checkpointCoordinator.getSuccessfulCheckpoints().isEmpty());
}

@Test
public void testResetCalledInRegionRecovery() throws Exception {
final JobID jobId = new JobID();

// set up the coordinator
CheckpointCoordinator checkpointCoordinator =
new CheckpointCoordinatorBuilder()
.setJobId(jobId)
.setTimer(manuallyTriggeredScheduledExecutor)
.build();

TestResetHook hook = new TestResetHook("id");

// Add a master hook
checkpointCoordinator.addMasterHook(hook);
assertFalse(hook.resetCalled);
checkpointCoordinator.restoreLatestCheckpointedStateToSubtasks(Collections.emptySet());
assertTrue(hook.resetCalled);
}

@Test
public void testNotifyCheckpointAbortionInOperatorCoordinator() throws Exception {
JobVertexID jobVertexID = new JobVertexID();
Expand Down Expand Up @@ -3612,42 +3592,4 @@ public int getInvokeCounter() {
return invokeCounter;
}
}

private static class TestResetHook implements MasterTriggerRestoreHook<String> {

private final String id;
boolean resetCalled;

TestResetHook(String id) {
this.id = id;
this.resetCalled = false;
}

@Override
public String getIdentifier() {
return id;
}

@Override
public void reset() throws Exception {
resetCalled = true;
}

@Override
public CompletableFuture<String> triggerCheckpoint(
long checkpointId, long timestamp, Executor executor) {
throw new UnsupportedOperationException();
}

@Override
public void restoreCheckpoint(long checkpointId, @Nullable String checkpointData)
throws Exception {
throw new UnsupportedOperationException();
}

@Override
public SimpleVersionedSerializer<String> createCheckpointDataSerializer() {
throw new UnsupportedOperationException();
}
}
}

0 comments on commit 9588c5d

Please sign in to comment.