diff --git a/docs/dev/stream/state/checkpointing.md b/docs/dev/stream/state/checkpointing.md index eca8d5c1ede95..fab1a494edde9 100644 --- a/docs/dev/stream/state/checkpointing.md +++ b/docs/dev/stream/state/checkpointing.md @@ -76,6 +76,8 @@ Other parameters for checkpointing include: - *fail/continue task on checkpoint errors*: This determines if a task will be failed if an error occurs in the execution of the task's checkpoint procedure. This is the default behaviour. Alternatively, when this is disabled, the task will simply decline the checkpoint to the checkpoint coordinator and continue running. + - *prefer checkpoint for recovery*: This determines if a job will fallback to latest checkpoint even when there are more recent savepoints available to potentially reduce recovery time. +
{% highlight java %} @@ -100,6 +102,9 @@ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // enable externalized checkpoints which are retained after job cancellation env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); + +// allow job recovery fallback to checkpoint when there is a more recent savepoint +env.getCheckpointConfig().setPreferCheckpointForRecovery(true); {% endhighlight %}
diff --git a/docs/dev/stream/state/checkpointing.zh.md b/docs/dev/stream/state/checkpointing.zh.md index eca8d5c1ede95..bcd32e0c4ec68 100644 --- a/docs/dev/stream/state/checkpointing.zh.md +++ b/docs/dev/stream/state/checkpointing.zh.md @@ -76,6 +76,8 @@ Other parameters for checkpointing include: - *fail/continue task on checkpoint errors*: This determines if a task will be failed if an error occurs in the execution of the task's checkpoint procedure. This is the default behaviour. Alternatively, when this is disabled, the task will simply decline the checkpoint to the checkpoint coordinator and continue running. + - *prefer checkpoint for recovery*: This determines if a job will fallback to latest checkpoint even when there are more recent savepoints available to potentially reduce recovery time. +
{% highlight java %} @@ -125,6 +127,9 @@ env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false) // allow only one checkpoint to be in progress at the same time env.getCheckpointConfig.setMaxConcurrentCheckpoints(1) + +// allow job recovery fallback to checkpoint when there is a more recent savepoint +env.getCheckpointConfig().setPreferCheckpointForRecovery(true) {% endhighlight %}
diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index 036796c582d45..b946896a651a9 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -100,7 +100,8 @@ public void testJobManagerJMXMetricAccess() throws Exception { 50, 5, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, - true), + true, + false), null)); ClusterClient client = MINI_CLUSTER_RESOURCE.getClusterClient(); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index c7f59a7ad01a9..7da924c66e85c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -183,6 +183,8 @@ public class CheckpointCoordinator { /** Registry that tracks state which is shared across (incremental) checkpoints. */ private SharedStateRegistry sharedStateRegistry; + private boolean isPreferCheckpointForRecovery; + // -------------------------------------------------------------------------------------------- public CheckpointCoordinator( @@ -199,7 +201,8 @@ public CheckpointCoordinator( CompletedCheckpointStore completedCheckpointStore, StateBackend checkpointStateBackend, Executor executor, - SharedStateRegistryFactory sharedStateRegistryFactory) { + SharedStateRegistryFactory sharedStateRegistryFactory, + boolean isPreferCheckpointForRecovery) { // sanity checks checkNotNull(checkpointStateBackend); @@ -233,6 +236,7 @@ public CheckpointCoordinator( this.executor = checkNotNull(executor); this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory); this.sharedStateRegistry = sharedStateRegistryFactory.create(executor); + this.isPreferCheckpointForRecovery = isPreferCheckpointForRecovery; this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS); this.masterHooks = new HashMap<>(); @@ -1080,7 +1084,7 @@ public boolean restoreLatestCheckpointedState( LOG.debug("Status of the shared state registry of job {} after restore: {}.", job, sharedStateRegistry); // Restore from the latest checkpoint - CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); + CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery); if (latest == null) { if (errorIfNoCheckpoint) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index 82193b5f08da9..1cda1313708a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -19,18 +19,25 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.runtime.jobgraph.JobStatus; +import org.apache.flink.util.FlinkRuntimeException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.List; +import java.util.ListIterator; /** * A bounded LIFO-queue of {@link CompletedCheckpoint} instances. */ public interface CompletedCheckpointStore { + Logger LOG = LoggerFactory.getLogger(CompletedCheckpointStore.class); + /** * Recover available {@link CompletedCheckpoint} instances. * - *

After a call to this method, {@link #getLatestCheckpoint()} returns the latest + *

After a call to this method, {@link #getLatestCheckpoint(boolean)} returns the latest * available checkpoint. */ void recover() throws Exception; @@ -47,7 +54,33 @@ public interface CompletedCheckpointStore { * Returns the latest {@link CompletedCheckpoint} instance or null if none was * added. */ - CompletedCheckpoint getLatestCheckpoint() throws Exception; + default CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) throws Exception { + if (getAllCheckpoints().isEmpty()) { + return null; + } + + CompletedCheckpoint candidate = getAllCheckpoints().get(getAllCheckpoints().size() - 1); + if (isPreferCheckpointForRecovery && getAllCheckpoints().size() > 1) { + List allCheckpoints; + try { + allCheckpoints = getAllCheckpoints(); + ListIterator listIterator = allCheckpoints.listIterator(allCheckpoints.size() - 1); + while (listIterator.hasPrevious()) { + CompletedCheckpoint prev = listIterator.previous(); + if (!prev.getProperties().isSavepoint()) { + candidate = prev; + LOG.info("Found a completed checkpoint before the latest savepoint, will use it to recover!"); + break; + } + } + } catch (Exception e) { + LOG.error("Method getAllCheckpoints caused exception : ", e); + throw new FlinkRuntimeException(e); + } + } + + return candidate; + } /** * Shuts down the store. diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java index 63e7468ebca74..eca7e92bb01e9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStore.java @@ -76,11 +76,6 @@ public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { } } - @Override - public CompletedCheckpoint getLatestCheckpoint() { - return checkpoints.isEmpty() ? null : checkpoints.getLast(); - } - @Override public List getAllCheckpoints() { return new ArrayList<>(checkpoints); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java index 12cc8eb66abad..e4001b139299c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStore.java @@ -243,16 +243,6 @@ private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoin } } - @Override - public CompletedCheckpoint getLatestCheckpoint() { - if (completedCheckpoints.isEmpty()) { - return null; - } - else { - return completedCheckpoints.peekLast(); - } - } - @Override public List getAllCheckpoints() throws Exception { List checkpoints = new ArrayList<>(completedCheckpoints); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 6a4af57f6ef5d..a82586061eb47 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -524,7 +524,8 @@ public void enableCheckpointing( CheckpointIDCounter checkpointIDCounter, CompletedCheckpointStore checkpointStore, StateBackend checkpointStateBackend, - CheckpointStatsTracker statsTracker) { + CheckpointStatsTracker statsTracker, + boolean isPreferCheckpointForRecovery) { // simple sanity checks checkArgument(interval >= 10, "checkpoint interval must not be below 10ms"); @@ -554,7 +555,8 @@ public void enableCheckpointing( checkpointStore, checkpointStateBackend, ioExecutor, - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + isPreferCheckpointForRecovery); // register the master hooks on the checkpoint coordinator for (MasterTriggerRestoreHook hook : masterHooks) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index fc63e4b1a0f12..117b7b203ecd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -301,7 +301,8 @@ public static ExecutionGraph buildGraph( checkpointIdCounter, completedCheckpoints, rootBackend, - checkpointStatsTracker); + checkpointStatsTracker, + chkConfig.isPreferCheckpointForRecovery()); } // create all the metrics for the Execution Graph diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java index 4ecbda57b287f..9e57d129c8861 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java @@ -54,13 +54,16 @@ public class CheckpointCoordinatorConfiguration implements Serializable { */ private final boolean isExactlyOnce; + private final boolean isPreferCheckpointForRecovery; + public CheckpointCoordinatorConfiguration( long checkpointInterval, long checkpointTimeout, long minPauseBetweenCheckpoints, int maxConcurrentCheckpoints, CheckpointRetentionPolicy checkpointRetentionPolicy, - boolean isExactlyOnce) { + boolean isExactlyOnce, + boolean isPerfetCheckpointForRecovery) { // sanity checks if (checkpointInterval < 1 || checkpointTimeout < 1 || @@ -74,6 +77,7 @@ public CheckpointCoordinatorConfiguration( this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy); this.isExactlyOnce = isExactlyOnce; + this.isPreferCheckpointForRecovery = isPerfetCheckpointForRecovery; } public long getCheckpointInterval() { @@ -100,6 +104,10 @@ public boolean isExactlyOnce() { return isExactlyOnce; } + public boolean isPreferCheckpointForRecovery() { + return isPreferCheckpointForRecovery; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -114,7 +122,8 @@ public boolean equals(Object o) { minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints && maxConcurrentCheckpoints == that.maxConcurrentCheckpoints && isExactlyOnce == that.isExactlyOnce && - checkpointRetentionPolicy == that.checkpointRetentionPolicy; + checkpointRetentionPolicy == that.checkpointRetentionPolicy && + isPreferCheckpointForRecovery == that.isPreferCheckpointForRecovery; } @Override @@ -125,7 +134,8 @@ public int hashCode() { minPauseBetweenCheckpoints, maxConcurrentCheckpoints, checkpointRetentionPolicy, - isExactlyOnce); + isExactlyOnce, + isPreferCheckpointForRecovery); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 32b32cfb3d6e4..aac54b2be6002 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -80,7 +80,8 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception { new FailingCompletedCheckpointStore(), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); coord.triggerCheckpoint(triggerTimestamp, false); @@ -142,7 +143,7 @@ public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception { } @Override - public CompletedCheckpoint getLatestCheckpoint() throws Exception { + public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) throws Exception { throw new UnsupportedOperationException("Not implemented."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index f644c01caf50e..949947af733eb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -206,7 +206,7 @@ public void testHooksAreCalledOnTrigger() throws Exception { assertEquals(0, cc.getNumberOfPendingCheckpoints()); assertEquals(1, cc.getNumberOfRetainedSuccessfulCheckpoints()); - final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint(); + final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint(false); final Collection masterStates = chk.getMasterHookStates(); assertEquals(2, masterStates.size()); @@ -435,7 +435,8 @@ private static CheckpointCoordinator instantiateCheckpointCoordinator(JobID jid, new StandaloneCompletedCheckpointStore(10), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); } private static T mockGeneric(Class clazz) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index e1f8f9c885b2b..5af7cf6f2c996 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -55,14 +55,19 @@ import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; +import org.apache.flink.util.SerializableObject; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.mockito.Mockito; +import org.mockito.hamcrest.MockitoHamcrest; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import org.mockito.verification.VerificationMode; @@ -77,6 +82,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Random; import java.util.UUID; import java.util.concurrent.BlockingQueue; @@ -143,7 +149,8 @@ public void testCheckpointAbortsIfTriggerTasksAreNotExecuted() { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -204,7 +211,8 @@ public void testCheckpointAbortsIfTriggerTasksAreFinished() { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -256,7 +264,8 @@ public void testCheckpointAbortsIfAckTasksAreNotExecuted() { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); // nothing should be happening assertEquals(0, coord.getNumberOfPendingCheckpoints()); @@ -309,7 +318,8 @@ public void testTriggerAndDeclineCheckpointSimple() { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -412,7 +422,8 @@ public void testTriggerAndDeclineCheckpointComplex() { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -532,7 +543,8 @@ public void testTriggerAndConfirmSimpleCheckpoint() { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -700,7 +712,8 @@ public void testMultipleConcurrentCheckpoints() { new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -831,7 +844,8 @@ public void testSuccessfulCheckpointSubsumesUnsuccessful() { new StandaloneCompletedCheckpointStore(10), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -996,7 +1010,8 @@ public void testCheckpointTimeoutIsolated() { new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); // trigger a checkpoint, partially acknowledged assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1074,7 +1089,8 @@ public void testHandleMessagesForNonExistingCheckpoints() { new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1138,7 +1154,8 @@ public void testStateCleanupForLateOrUnknownMessages() throws Exception { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -1271,7 +1288,8 @@ public Void answer(InvocationOnMock invocation) throws Throwable { new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); coord.startCheckpointScheduler(); @@ -1361,7 +1379,8 @@ public void testMinTimeBetweenCheckpointsInterval() throws Exception { new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); try { coord.startCheckpointScheduler(); @@ -1435,7 +1454,8 @@ public void testTriggerAndConfirmSimpleSavepoint() throws Exception { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); assertEquals(0, coord.getNumberOfPendingCheckpoints()); assertEquals(0, coord.getNumberOfRetainedSuccessfulCheckpoints()); @@ -1587,7 +1607,8 @@ public void testSavepointsAreNotSubsumed() throws Exception { new StandaloneCompletedCheckpointStore(10), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); String savepointDir = tmpFolder.newFolder().getAbsolutePath(); @@ -1681,7 +1702,8 @@ private void testMaxConcurrentAttempts(int maxConcurrentAttempts) { new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); coord.startCheckpointScheduler(); @@ -1755,7 +1777,8 @@ public void testMaxConcurrentAttempsWithSubsumption() { new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); coord.startCheckpointScheduler(); @@ -1832,7 +1855,8 @@ public void testPeriodicSchedulingWithInactiveTasks() { new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); coord.startCheckpointScheduler(); @@ -1885,7 +1909,8 @@ public void testConcurrentSavepoints() throws Exception { new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); List> savepointFutures = new ArrayList<>(); @@ -1939,7 +1964,8 @@ public void testMinDelayBetweenSavepoints() throws Exception { new StandaloneCompletedCheckpointStore(2), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); String savepointDir = tmpFolder.newFolder().getAbsolutePath(); @@ -2002,7 +2028,8 @@ public void testRestoreLatestCheckpointedState() throws Exception { store, new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2117,7 +2144,8 @@ public void testRestoreLatestCheckpointFailureWhenMaxParallelismChanges() throws new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2196,6 +2224,149 @@ public void testRestoreLatestCheckpointedStateScaleOut() throws Exception { testRestoreLatestCheckpointedStateWithChangingParallelism(true); } + @Test + public void testRestoreLatestCheckpointWhenPreferCheckpoint() throws Exception { + testRestoreLatestCheckpointIsPreferSavepoint(true); + } + + @Test + public void testRestoreLatestCheckpointWhenPreferSavepoint() throws Exception { + testRestoreLatestCheckpointIsPreferSavepoint(false); + } + + private void testRestoreLatestCheckpointIsPreferSavepoint(boolean isPreferCheckpoint) { + try { + final JobID jid = new JobID(); + long timestamp = System.currentTimeMillis(); + StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter(); + + final JobVertexID statefulId = new JobVertexID(); + final JobVertexID statelessId = new JobVertexID(); + + Execution statefulExec1 = mockExecution(); + Execution statelessExec1 = mockExecution(); + + ExecutionVertex stateful1 = mockExecutionVertex(statefulExec1, statefulId, 0, 1); + ExecutionVertex stateless1 = mockExecutionVertex(statelessExec1, statelessId, 0, 1); + + ExecutionJobVertex stateful = mockExecutionJobVertex(statefulId, + new ExecutionVertex[] { stateful1 }); + ExecutionJobVertex stateless = mockExecutionJobVertex(statelessId, + new ExecutionVertex[] { stateless1 }); + + Map map = new HashMap(); + map.put(statefulId, stateful); + map.put(statelessId, stateless); + + CompletedCheckpointStore store = new RecoverableCompletedCheckpointStore(2); + + CheckpointCoordinator coord = new CheckpointCoordinator( + jid, + 600000, + 600000, + 0, + Integer.MAX_VALUE, + CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + new ExecutionVertex[] { stateful1, stateless1 }, + new ExecutionVertex[] { stateful1, stateless1 }, + new ExecutionVertex[] { stateful1, stateless1 }, + checkpointIDCounter, + store, + new MemoryStateBackend(), + Executors.directExecutor(), + SharedStateRegistry.DEFAULT_FACTORY, + isPreferCheckpoint); + + //trigger a checkpoint and wait to become a completed checkpoint + assertTrue(coord.triggerCheckpoint(timestamp, false)); + + long checkpointId = checkpointIDCounter.getLast(); + + KeyGroupRange keyGroupRange = KeyGroupRange.of(0,0); + List testStates = Collections.singletonList(new SerializableObject()); + KeyedStateHandle serializedKeyGroupStates = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, testStates); + + TaskStateSnapshot subtaskStatesForCheckpoint = new TaskStateSnapshot(); + + subtaskStatesForCheckpoint.putSubtaskStateByOperatorID( + OperatorID.fromJobVertexID(statefulId), + new OperatorSubtaskState( + StateObjectCollection.empty(), + StateObjectCollection.empty(), + StateObjectCollection.singleton(serializedKeyGroupStates), + StateObjectCollection.empty())); + + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStatesForCheckpoint)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId)); + + CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0); + assertEquals(jid, success.getJobId()); + + // trigger a savepoint and wait it to be finished + String savepointDir = tmpFolder.newFolder().getAbsolutePath(); + timestamp = System.currentTimeMillis(); + CompletableFuture savepointFuture = coord.triggerSavepoint(timestamp, savepointDir); + + + KeyGroupRange keyGroupRangeForSavepoint = KeyGroupRange.of(1, 1); + List testStatesForSavepoint = Collections.singletonList(new SerializableObject()); + KeyedStateHandle serializedKeyGroupStatesForSavepoint = CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRangeForSavepoint, testStatesForSavepoint); + + TaskStateSnapshot subtaskStatesForSavepoint = new TaskStateSnapshot(); + + subtaskStatesForSavepoint.putSubtaskStateByOperatorID( + OperatorID.fromJobVertexID(statefulId), + new OperatorSubtaskState( + StateObjectCollection.empty(), + StateObjectCollection.empty(), + StateObjectCollection.singleton(serializedKeyGroupStatesForSavepoint), + StateObjectCollection.empty())); + + checkpointId = checkpointIDCounter.getLast(); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statefulExec1.getAttemptId(), checkpointId, new CheckpointMetrics(), subtaskStatesForSavepoint)); + coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, statelessExec1.getAttemptId(), checkpointId)); + + assertTrue(savepointFuture.isDone()); + + //restore and jump the latest savepoint + coord.restoreLatestCheckpointedState(map, true, false); + + //compare and see if it used the checkpoint's subtaskStates + BaseMatcher matcher = new BaseMatcher() { + @Override + public boolean matches(Object o) { + if (o instanceof JobManagerTaskRestore) { + JobManagerTaskRestore taskRestore = (JobManagerTaskRestore) o; + if (isPreferCheckpoint) { + return Objects.equals(taskRestore.getTaskStateSnapshot(), subtaskStatesForCheckpoint); + } else { + return Objects.equals(taskRestore.getTaskStateSnapshot(), subtaskStatesForSavepoint); + } + } + return false; + } + + @Override + public void describeTo(Description description) { + if (isPreferCheckpoint) { + description.appendValue(subtaskStatesForCheckpoint); + } else { + description.appendValue(subtaskStatesForSavepoint); + } + } + }; + + verify(statefulExec1, times(1)).setInitialState(MockitoHamcrest.argThat(matcher)); + verify(statelessExec1, times(0)).setInitialState(Mockito.any()); + + coord.shutdown(JobStatus.FINISHED); + } + catch (Exception e) { + e.printStackTrace(); + fail(e.getMessage()); + } + } + @Test public void testStateRecoveryWhenTopologyChangeOut() throws Exception { testStateRecoveryWithTopologyChange(0); @@ -2264,7 +2435,8 @@ private void testRestoreLatestCheckpointedStateWithChangingParallelism(boolean s new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); // trigger the checkpoint coord.triggerCheckpoint(timestamp, false); @@ -2532,7 +2704,7 @@ public void testStateRecoveryWithTopologyChange(int scaleType) throws Exception CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), new TestCompletedCheckpointStorageLocation()); - when(standaloneCompletedCheckpointStore.getLatestCheckpoint()).thenReturn(completedCheckpoint); + when(standaloneCompletedCheckpointStore.getLatestCheckpoint(false)).thenReturn(completedCheckpoint); // set up the coordinator and validate the initial state CheckpointCoordinator coord = new CheckpointCoordinator( @@ -2549,7 +2721,8 @@ public void testStateRecoveryWithTopologyChange(int scaleType) throws Exception standaloneCompletedCheckpointStore, new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); coord.restoreLatestCheckpointedState(tasks, false, true); @@ -2701,7 +2874,8 @@ public void testExternalizedCheckpoints() throws Exception { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); assertTrue(coord.triggerCheckpoint(timestamp, false)); @@ -3136,7 +3310,8 @@ public void testStopPeriodicScheduler() throws Exception { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); // Periodic try { @@ -3357,7 +3532,8 @@ public void testCheckpointStatsTrackerPendingCheckpointCallback() { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); CheckpointStatsTracker tracker = mock(CheckpointStatsTracker.class); coord.setCheckpointStatsTracker(tracker); @@ -3396,7 +3572,8 @@ public void testCheckpointStatsTrackerRestoreCallback() throws Exception { store, new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); store.addCheckpoint(new CompletedCheckpoint( new JobID(), @@ -3463,7 +3640,8 @@ public void testSharedStateRegistrationOnRestore() throws Exception { SharedStateRegistry instance = new SharedStateRegistry(deleteExecutor); createdSharedStateRegistries.add(instance); return instance; - }); + }, + false); final int numCheckpoints = 3; @@ -3654,4 +3832,36 @@ private void performIncrementalCheckpoint( coord.receiveAcknowledgeMessage(acknowledgeCheckpoint); } } + + private Execution mockExecution() { + Execution mock = mock(Execution.class); + when(mock.getAttemptId()).thenReturn(new ExecutionAttemptID()); + when(mock.getState()).thenReturn(ExecutionState.RUNNING); + return mock; + } + + private ExecutionVertex mockExecutionVertex(Execution execution, JobVertexID vertexId, int subtask, int parallelism) { + ExecutionVertex mock = mock(ExecutionVertex.class); + when(mock.getJobvertexId()).thenReturn(vertexId); + when(mock.getParallelSubtaskIndex()).thenReturn(subtask); + when(mock.getCurrentExecutionAttempt()).thenReturn(execution); + when(mock.getTotalNumberOfParallelSubtasks()).thenReturn(parallelism); + when(mock.getMaxParallelism()).thenReturn(parallelism); + return mock; + } + + private ExecutionJobVertex mockExecutionJobVertex(JobVertexID id, ExecutionVertex[] vertices) { + ExecutionJobVertex vertex = mock(ExecutionJobVertex.class); + when(vertex.getParallelism()).thenReturn(vertices.length); + when(vertex.getMaxParallelism()).thenReturn(vertices.length); + when(vertex.getJobVertexId()).thenReturn(id); + when(vertex.getTaskVertices()).thenReturn(vertices); + when(vertex.getOperatorIDs()).thenReturn(Collections.singletonList(OperatorID.fromJobVertexID(id))); + when(vertex.getUserDefinedOperatorIDs()).thenReturn(Collections.singletonList(null)); + + for (ExecutionVertex v : vertices) { + when(v.getJobVertex()).thenReturn(vertex); + } + return vertex; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java index 780c0fe358fd0..bf438aada0d98 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java @@ -90,7 +90,8 @@ public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception 0L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, - true), + true, + false), new SerializedValue(new CustomStateBackend(outOfClassPath)), serHooks); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 236717ff43465..c441309d73894 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -109,7 +109,8 @@ public void testSetState() { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); // create ourselves a checkpoint with state final long timestamp = 34623786L; @@ -187,7 +188,8 @@ public void testNoCheckpointAvailable() { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); try { coord.restoreLatestCheckpointedState(new HashMap(), true, false); @@ -245,7 +247,8 @@ public void testNonRestoredState() throws Exception { new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), Executors.directExecutor(), - SharedStateRegistry.DEFAULT_FACTORY); + SharedStateRegistry.DEFAULT_FACTORY, + false); // --- (2) Checkpoint misses state for a jobVertex (should work) --- Map checkpointTaskStates = new HashMap<>(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index 82dcd023f9bf3..fce0f3aa9995c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -64,6 +64,7 @@ public void testGetSnapshottingSettings() throws Exception { 191929L, 123, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + false, false ), null); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index c4d89030dc335..bb4350a7b13d2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -79,11 +79,11 @@ public void testAddAndGetLatestCheckpoint() throws Exception { // Add and get latest checkpoints.addCheckpoint(expected[0]); assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints()); - verifyCheckpoint(expected[0], checkpoints.getLatestCheckpoint()); + verifyCheckpoint(expected[0], checkpoints.getLatestCheckpoint(false)); checkpoints.addCheckpoint(expected[1]); assertEquals(2, checkpoints.getNumberOfRetainedCheckpoints()); - verifyCheckpoint(expected[1], checkpoints.getLatestCheckpoint()); + verifyCheckpoint(expected[1], checkpoints.getLatestCheckpoint(false)); } /** @@ -119,7 +119,7 @@ public void testAddCheckpointMoreThanMaxRetained() throws Exception { /** * Tests that *

@@ -128,7 +128,7 @@ public void testAddCheckpointMoreThanMaxRetained() throws Exception { public void testEmptyState() throws Exception { CompletedCheckpointStore checkpoints = createCompletedCheckpoints(1); - assertNull(checkpoints.getLatestCheckpoint()); + assertNull(checkpoints.getLatestCheckpoint(false)); assertEquals(0, checkpoints.getAllCheckpoints().size()); assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints()); } @@ -179,7 +179,7 @@ public void testDiscardAllCheckpoints() throws Exception { checkpoints.shutdown(JobStatus.FINISHED); // Empty state - assertNull(checkpoints.getLatestCheckpoint()); + assertNull(checkpoints.getLatestCheckpoint(false)); assertEquals(0, checkpoints.getAllCheckpoints().size()); assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java index f9f50fe6dcb0f..bdb9975fdbc32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ExecutionGraphCheckpointCoordinatorTest.java @@ -166,7 +166,8 @@ private ExecutionGraph createExecutionGraphAndEnableCheckpointing( counter, store, new MemoryStateBackend(), - CheckpointStatsTrackerTest.createTestTracker()); + CheckpointStatsTrackerTest.createTestTracker(), + false); JobVertex jobVertex = new JobVertex("MockVertex"); jobVertex.setInvokableClass(AbstractInvokable.class); @@ -222,7 +223,7 @@ public void addCheckpoint(CompletedCheckpoint checkpoint) { } @Override - public CompletedCheckpoint getLatestCheckpoint() { + public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) { throw new UnsupportedOperationException("Not implemented."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index c0c18773e6f30..af6d860e88a94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -119,7 +119,7 @@ public void testRecover() throws Exception { assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size()); assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); - assertEquals(expected[2], checkpoints.getLatestCheckpoint()); + assertEquals(expected[2], checkpoints.getLatestCheckpoint(false)); List expectedCheckpoints = new ArrayList<>(3); expectedCheckpoints.add(expected[1]); @@ -193,7 +193,7 @@ public void testSuspendKeepsCheckpoints() throws Exception { sharedStateRegistry.close(); store.recover(); - CompletedCheckpoint recovered = store.getLatestCheckpoint(); + CompletedCheckpoint recovered = store.getLatestCheckpoint(false); assertEquals(checkpoint, recovered); } @@ -220,7 +220,7 @@ public void testLatestCheckpointRecovery() throws Exception { sharedStateRegistry.close(); checkpointStore.recover(); - CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint(); + CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint(false); assertEquals(checkpoints.get(checkpoints.size() -1), latestCheckpoint); } @@ -251,7 +251,7 @@ public void testConcurrentCheckpointOperations() throws Exception { sharedStateRegistry = new SharedStateRegistry(); zkCheckpointStore2.recover(); - CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint(); + CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint(false); assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint); TestCompletedCheckpoint recoveredTestCheckpoint = (TestCompletedCheckpoint) recoveredCheckpoint; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java index c8fcfb90711d7..44437b010d5af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java @@ -164,7 +164,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable { zooKeeperCompletedCheckpointStore.recover(); - CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint(); + CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint(false); // check that we return the latest retrievable checkpoint // this should remove the latest checkpoint because it is broken @@ -190,6 +190,130 @@ public Void answer(InvocationOnMock invocation) throws Throwable { verify(failingRetrievableStateHandle, never()).discardState(); } + /** + * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper + * and ignores those which cannot be retrieved via their state handles. + * + *

We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation. + */ + @Test(timeout = 50000) + public void testCheckpointRecoveryPreferCheckpoint() throws Exception { + final JobID jobID = new JobID(); + final long checkpoint1Id = 1L; + final long checkpoint2Id = 2; + final List, String>> checkpointsInZooKeeper = new ArrayList<>(4); + + final Collection expectedCheckpointIds = new HashSet<>(2); + expectedCheckpointIds.add(1L); + expectedCheckpointIds.add(2L); + + final RetrievableStateHandle failingRetrievableStateHandle = mock(RetrievableStateHandle.class); + when(failingRetrievableStateHandle.retrieveState()).thenThrow(new IOException("Test exception")); + + final RetrievableStateHandle retrievableStateHandle1 = mock(RetrievableStateHandle.class); + when(retrievableStateHandle1.retrieveState()).then( + (invocation) -> new CompletedCheckpoint( + jobID, + checkpoint1Id, + 1L, + 1L, + new HashMap<>(), + null, + CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION), + new TestCompletedCheckpointStorageLocation())); + + final RetrievableStateHandle retrievableStateHandle2 = mock(RetrievableStateHandle.class); + when(retrievableStateHandle2.retrieveState()).then( + (invocation -> new CompletedCheckpoint( + jobID, + checkpoint2Id, + 2L, + 2L, + new HashMap<>(), + null, + CheckpointProperties.forSavepoint(), + new TestCompletedCheckpointStorageLocation()))); + + checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1")); + checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing1")); + checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2")); + checkpointsInZooKeeper.add(Tuple2.of(failingRetrievableStateHandle, "/failing2")); + + final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS); + final RetrievableStateStorageHelper storageHelperMock = mock(RetrievableStateStorageHelper.class); + + ZooKeeperStateHandleStore zooKeeperStateHandleStoreMock = spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock)); + doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllAndLock(); + + final int numCheckpointsToRetain = 1; + + // Mocking for the delete operation on the CuratorFramework client + // It assures that the callback is executed synchronously + + final EnsurePath ensurePathMock = mock(EnsurePath.class); + final CuratorEvent curatorEventMock = mock(CuratorEvent.class); + when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE); + when(curatorEventMock.getResultCode()).thenReturn(0); + when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock); + + when( + client + .delete() + .inBackground(any(BackgroundCallback.class), any(Executor.class)) + ).thenAnswer(new Answer>() { + @Override + public ErrorListenerPathable answer(InvocationOnMock invocation) throws Throwable { + final BackgroundCallback callback = (BackgroundCallback) invocation.getArguments()[0]; + + ErrorListenerPathable result = mock(ErrorListenerPathable.class); + + when(result.forPath(anyString())).thenAnswer(new Answer() { + @Override + public Void answer(InvocationOnMock invocation) throws Throwable { + + callback.processResult(client, curatorEventMock); + + return null; + } + }); + + return result; + } + }); + + ZooKeeperCompletedCheckpointStore zooKeeperCompletedCheckpointStore = new ZooKeeperCompletedCheckpointStore( + numCheckpointsToRetain, + zooKeeperStateHandleStoreMock, + Executors.directExecutor()); + + zooKeeperCompletedCheckpointStore.recover(); + + CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint(true); + + // check that we return the latest retrievable checkpoint + // this should remove the latest checkpoint because it is broken + assertEquals(checkpoint1Id, latestCompletedCheckpoint.getCheckpointID()); + + // this should remove the second broken checkpoint because we're iterating over all checkpoints + List completedCheckpoints = zooKeeperCompletedCheckpointStore.getAllCheckpoints(); + + Collection actualCheckpointIds = new HashSet<>(completedCheckpoints.size()); + + for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) { + actualCheckpointIds.add(completedCheckpoint.getCheckpointID()); + } + + assertEquals(expectedCheckpointIds, actualCheckpointIds); + + // check that we did not discard any of the state handles + verify(retrievableStateHandle1, never()).discardState(); + verify(retrievableStateHandle2, never()).discardState(); + + // Make sure that we also didn't discard any of the broken handles. Only when checkpoints + // are subsumed should they be discarded. + verify(failingRetrievableStateHandle, never()).discardState(); + } + /** * Tests that the checkpoint does not exist in the store when we fail to add * it into the store (i.e., there exists an exception thrown by the method). diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 98a1e16787a42..40986436b3587 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -140,7 +140,8 @@ public static void setupExecutionGraph() throws Exception { new StandaloneCheckpointIDCounter(), new StandaloneCompletedCheckpointStore(1), new MemoryStateBackend(), - statsTracker); + statsTracker, + false); runtimeGraph.setJsonPlan("{}"); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java index 29915734d71bd..2002b2913a158 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ConcurrentFailoverStrategyExecutionGraphTest.java @@ -362,7 +362,8 @@ public void testLocalFailureFailsPendingCheckpoints() throws Exception { 1L, 3, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, - true); + true, + false); final ExecutionGraph graph = createSampleGraph( jid, @@ -401,7 +402,8 @@ protected void performExecutionVertexRestart( 1, allVertices, checkpointCoordinatorConfiguration, - UnregisteredMetricGroups.createUnregisteredTaskMetricGroup())); + UnregisteredMetricGroups.createUnregisteredTaskMetricGroup()), + false); final CheckpointCoordinator checkpointCoordinator = graph.getCheckpointCoordinator(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 09ef74b3616cd..df80d3e6b4f0f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -801,6 +801,7 @@ private ExecutionGraph createExecutionGraph(Configuration configuration) throws 0, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, + false, false), null)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java index 55890d7c7b00e..f52eab36d5e09 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/FailoverRegionTest.java @@ -114,7 +114,7 @@ public void testSingleRegionFailover() throws Exception { // verify checkpoint has been completed successfully. assertEquals(1, eg.getCheckpointCoordinator().getCheckpointStore().getNumberOfRetainedCheckpoints()); - assertEquals(checkpointId, eg.getCheckpointCoordinator().getCheckpointStore().getLatestCheckpoint().getCheckpointID()); + assertEquals(checkpointId, eg.getCheckpointCoordinator().getCheckpointStore().getLatestCheckpoint(false).getCheckpointID()); ev.getCurrentExecutionAttempt().fail(new Exception("Test Exception")); assertEquals(JobStatus.CANCELLING, strategy.getFailoverRegion(ev).getState()); @@ -535,7 +535,7 @@ private void verifyCheckpointRestoredAsExpected(ExecutionGraph eg) throws Except // verify checkpoint has been restored successfully. assertEquals(1, eg.getCheckpointCoordinator().getCheckpointStore().getNumberOfRetainedCheckpoints()); - assertEquals(checkpointId, eg.getCheckpointCoordinator().getCheckpointStore().getLatestCheckpoint().getCheckpointID()); + assertEquals(checkpointId, eg.getCheckpointCoordinator().getCheckpointStore().getLatestCheckpoint(false).getCheckpointID()); } private ExecutionGraph createSingleRegionExecutionGraph(RestartStrategy restartStrategy) throws Exception { @@ -622,7 +622,8 @@ private static void enableCheckpointing(ExecutionGraph eg) { 0, jobVertices, mock(CheckpointCoordinatorConfiguration.class), - new UnregisteredMetricsGroup())); + new UnregisteredMetricsGroup()), + false); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java index adb6f5e4a0504..d778cf0baf84c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java @@ -346,7 +346,8 @@ private static JobCheckpointingSettings createCheckpointSettingsWithInterval(fin Long.MAX_VALUE, Integer.MAX_VALUE, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, - true); + true, + false); return new JobCheckpointingSettings( Collections.emptyList(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java index 51e7fec122397..4665a636a58d4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java @@ -49,6 +49,7 @@ public void testIsJavaSerializable() throws Exception { 112, 12, CheckpointRetentionPolicy.RETAIN_ON_FAILURE, + false, false), new SerializedValue<>(new MemoryStateBackend())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index ff9d6ed78311d..6712570822cb6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -490,7 +490,7 @@ public void testRestoringFromSavepoint() throws Exception { try { // starting the JobMaster should have read the savepoint - final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(); + final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(false); assertThat(savepointCheckpoint, Matchers.notNullValue()); @@ -551,7 +551,7 @@ public void testRestoringModifiedJobFromSavepoint() throws Exception { try { // starting the JobMaster should have read the savepoint - final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(); + final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(false); assertThat(savepointCheckpoint, Matchers.notNullValue()); @@ -602,7 +602,7 @@ public void testCheckpointPrecedesSavepointRecovery() throws Exception { try { // starting the JobMaster should have read the savepoint - final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(); + final CompletedCheckpoint savepointCheckpoint = completedCheckpointStore.getLatestCheckpoint(false); assertThat(savepointCheckpoint, Matchers.notNullValue()); @@ -1837,7 +1837,8 @@ private JobGraph createJobGraphFromJobVerticesWithCheckpointing(SavepointRestore 1000L, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, - true); + true, + false); final JobCheckpointingSettings checkpointingSettings = new JobCheckpointingSettings( Collections.emptyList(), Collections.emptyList(), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java index 037ecd17cf268..a80b13788c1ac 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java @@ -74,11 +74,6 @@ public void removeOldestCheckpoint() throws Exception { checkpointToSubsume.discardOnSubsume(); } - @Override - public CompletedCheckpoint getLatestCheckpoint() throws Exception { - return checkpoints.isEmpty() ? null : checkpoints.getLast(); - } - @Override public void shutdown(JobStatus jobStatus) throws Exception { if (jobStatus.isGloballyTerminalState()) { diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index 87c800d2130d0..8b9ad5506035e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -72,6 +72,9 @@ public class CheckpointConfig implements java.io.Serializable { /** Determines if a tasks are failed or not if there is an error in their checkpointing. Default: true */ private boolean failOnCheckpointingErrors = true; + /** Determines if a job will fallback to checkpoint when there is a more recent savepoint. **/ + private boolean preferCheckpointForRecovery = false; + // ------------------------------------------------------------------------ /** @@ -285,6 +288,24 @@ public boolean isExternalizedCheckpointsEnabled() { return externalizedCheckpointCleanup != null; } + /** + * Returns whether a job recovery should fallback to checkpoint when there is a more recent savepoint. + * + * @return true if a job recovery should fallback to checkpoint. + */ + @PublicEvolving + public boolean isPreferCheckpointForRecovery() { + return preferCheckpointForRecovery; + } + + /** + * Sets whether a job recovery should fallback to checkpoint when there is a more recent savepoint. + */ + @PublicEvolving + public void setPreferCheckpointForRecovery(boolean preferCheckpointForRecovery) { + this.preferCheckpointForRecovery = preferCheckpointForRecovery; + } + /** * Returns the cleanup behaviour for externalized checkpoints. * diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 05d06265109d3..fdb38f5f3f323 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -700,7 +700,8 @@ private void configureCheckpointing() { cfg.getMinPauseBetweenCheckpoints(), cfg.getMaxConcurrentCheckpoints(), retentionAfterTermination, - isExactlyOnce), + isExactlyOnce, + cfg.isPreferCheckpointForRecovery()), serializedStateBackend, serializedHooks); diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java index f3d904efb3d86..986e4108677a5 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointIT.java @@ -244,7 +244,8 @@ private void setUpJobGraph( 10, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, - true), + true, + false), null)); clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader()); diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java index 75e9ed18483b4..6635e318ac056 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java @@ -106,7 +106,8 @@ private void setUpWithCheckpointInterval(long checkpointInterval) throws Excepti 10, 1, CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, - true), + true, + false), null)); clusterClient.submitJob(jobGraph, ClassLoader.getSystemClassLoader());