diff --git a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md index e7e53e660df63..bdfaa00d775a5 100644 --- a/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content.zh/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -152,9 +152,6 @@ env.get_checkpoint_config().set_max_concurrent_checkpoints(1) # 开启在 job 中止后仍然保留的 externalized checkpoints env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) - -# 允许在有更近 savepoint 时回退到 checkpoint -env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(True) ``` {{< /tab >}} {{< /tabs >}} diff --git a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md index a40691d50dd86..048e137c0212e 100644 --- a/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md +++ b/docs/content/docs/dev/datastream/fault-tolerance/checkpointing.md @@ -173,9 +173,6 @@ env.get_checkpoint_config().set_max_concurrent_checkpoints(1) # enable externalized checkpoints which are retained after job cancellation env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION) -# allow job recovery fallback to checkpoint when there is a more recent savepoint -env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(True) - # enables the experimental unaligned checkpoints env.get_checkpoint_config().enable_unaligned_checkpoints() ``` diff --git a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java index 179d6d40a59e0..67d265fb14473 100644 --- a/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java +++ b/flink-metrics/flink-metrics-jmx/src/test/java/org/apache/flink/runtime/jobmanager/JMXJobManagerMetricTest.java @@ -100,7 +100,6 @@ public void testJobManagerJMXMetricAccess() throws Exception { CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, - false, 0, 0), null); diff --git a/flink-python/pyflink/datastream/checkpoint_config.py b/flink-python/pyflink/datastream/checkpoint_config.py index e6f253ded80a0..8ab29186a7f8e 100644 --- a/flink-python/pyflink/datastream/checkpoint_config.py +++ b/flink-python/pyflink/datastream/checkpoint_config.py @@ -255,28 +255,6 @@ def is_externalized_checkpoints_enabled(self) -> bool: """ return self._j_checkpoint_config.isExternalizedCheckpointsEnabled() - def is_prefer_checkpoint_for_recovery(self) -> bool: - """ - Returns whether a job recovery should fallback to checkpoint when there is a more recent - savepoint. - - :return: ``True`` if a job recovery should fallback to checkpoint, false otherwise. - """ - return self._j_checkpoint_config.isPreferCheckpointForRecovery() - - def set_prefer_checkpoint_for_recovery( - self, - prefer_checkpoint_for_recovery: bool) -> 'CheckpointConfig': - """ - Sets whether a job recovery should fallback to checkpoint when there is a more recent - savepoint. - - :param prefer_checkpoint_for_recovery: ``True`` if a job recovery should fallback to - checkpoint, false otherwise. - """ - self._j_checkpoint_config.setPreferCheckpointForRecovery(prefer_checkpoint_for_recovery) - return self - def get_externalized_checkpoint_cleanup(self) -> Optional['ExternalizedCheckpointCleanup']: """ Returns the cleanup behaviour for externalized checkpoints. diff --git a/flink-python/pyflink/datastream/tests/test_check_point_config.py b/flink-python/pyflink/datastream/tests/test_check_point_config.py index 6358514561abd..d4fcb9ccfbf6b 100644 --- a/flink-python/pyflink/datastream/tests/test_check_point_config.py +++ b/flink-python/pyflink/datastream/tests/test_check_point_config.py @@ -129,14 +129,6 @@ def test_get_set_externalized_checkpoints_cleanup(self): self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_cleanup(), ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION) - def test_get_set_prefer_checkpoint_for_recovery(self): - - self.assertFalse(self.checkpoint_config.is_prefer_checkpoint_for_recovery()) - - self.checkpoint_config.set_prefer_checkpoint_for_recovery(True) - - self.assertTrue(self.checkpoint_config.is_prefer_checkpoint_for_recovery()) - def test_is_unaligned_checkpointing_enabled(self): self.assertFalse(self.checkpoint_config.is_unaligned_checkpoints_enabled()) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 7e6d7b2a8b4c1..694f064fe276f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -210,8 +210,6 @@ public class CheckpointCoordinator { /** Registry that tracks state which is shared across (incremental) checkpoints. */ private SharedStateRegistry sharedStateRegistry; - private boolean isPreferCheckpointForRecovery; - /** Id of checkpoint for which in-flight data should be ignored on recovery. */ private final long checkpointIdOfIgnoredInFlightData; @@ -310,7 +308,6 @@ public CheckpointCoordinator( this.checkpointsCleaner = checkNotNull(checkpointsCleaner); this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory); this.sharedStateRegistry = sharedStateRegistryFactory.create(executor); - this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery(); this.failureManager = checkNotNull(failureManager); this.checkpointPlanCalculator = checkNotNull(checkpointPlanCalculator); this.attemptMappingProvider = checkNotNull(attemptMappingProvider); @@ -1513,8 +1510,7 @@ private OptionalLong restoreLatestCheckpointedStateInternal( sharedStateRegistry); // Restore from the latest checkpoint - CompletedCheckpoint latest = - completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery); + CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(); if (latest == null) { LOG.info("No checkpoint found during restore."); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java index ad3cb757313f5..785d9db3d21cb 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStore.java @@ -25,7 +25,6 @@ import org.slf4j.LoggerFactory; import java.util.List; -import java.util.ListIterator; /** A bounded LIFO-queue of {@link CompletedCheckpoint} instances. */ public interface CompletedCheckpointStore { @@ -48,33 +47,13 @@ void addCheckpoint( * Returns the latest {@link CompletedCheckpoint} instance or null if none was * added. */ - default CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) - throws Exception { + default CompletedCheckpoint getLatestCheckpoint() throws Exception { List allCheckpoints = getAllCheckpoints(); if (allCheckpoints.isEmpty()) { return null; } - CompletedCheckpoint lastCompleted = allCheckpoints.get(allCheckpoints.size() - 1); - - if (isPreferCheckpointForRecovery - && allCheckpoints.size() > 1 - && lastCompleted.getProperties().isSavepoint()) { - ListIterator listIterator = - allCheckpoints.listIterator(allCheckpoints.size() - 1); - while (listIterator.hasPrevious()) { - CompletedCheckpoint prev = listIterator.previous(); - if (!prev.getProperties().isSavepoint()) { - LOG.info( - "Found a completed checkpoint ({}) before the latest savepoint, will use it to recover!", - prev); - return prev; - } - } - LOG.info("Did not find earlier checkpoint, using latest savepoint to recover."); - } - - return lastCompleted; + return allCheckpoints.get(allCheckpoints.size() - 1); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java index aea3795ca1891..84075184443f3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCoordinatorConfiguration.java @@ -58,8 +58,6 @@ public class CheckpointCoordinatorConfiguration implements Serializable { */ private final boolean isExactlyOnce; - private final boolean isPreferCheckpointForRecovery; - private final boolean isUnalignedCheckpointsEnabled; private final long alignedCheckpointTimeout; @@ -79,7 +77,6 @@ public CheckpointCoordinatorConfiguration( CheckpointRetentionPolicy checkpointRetentionPolicy, boolean isExactlyOnce, boolean isUnalignedCheckpoint, - boolean isPreferCheckpointForRecovery, int tolerableCpFailureNumber, long checkpointIdOfIgnoredInFlightData) { this( @@ -89,7 +86,6 @@ public CheckpointCoordinatorConfiguration( maxConcurrentCheckpoints, checkpointRetentionPolicy, isExactlyOnce, - isPreferCheckpointForRecovery, tolerableCpFailureNumber, isUnalignedCheckpoint, 0, @@ -104,7 +100,6 @@ private CheckpointCoordinatorConfiguration( int maxConcurrentCheckpoints, CheckpointRetentionPolicy checkpointRetentionPolicy, boolean isExactlyOnce, - boolean isPreferCheckpointForRecovery, int tolerableCpFailureNumber, boolean isUnalignedCheckpointsEnabled, long alignedCheckpointTimeout, @@ -129,7 +124,6 @@ private CheckpointCoordinatorConfiguration( this.maxConcurrentCheckpoints = maxConcurrentCheckpoints; this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy); this.isExactlyOnce = isExactlyOnce; - this.isPreferCheckpointForRecovery = isPreferCheckpointForRecovery; this.tolerableCheckpointFailureNumber = tolerableCpFailureNumber; this.isUnalignedCheckpointsEnabled = isUnalignedCheckpointsEnabled; this.alignedCheckpointTimeout = alignedCheckpointTimeout; @@ -161,10 +155,6 @@ public boolean isExactlyOnce() { return isExactlyOnce; } - public boolean isPreferCheckpointForRecovery() { - return isPreferCheckpointForRecovery; - } - public int getTolerableCheckpointFailureNumber() { return tolerableCheckpointFailureNumber; } @@ -202,7 +192,6 @@ public boolean equals(Object o) { && isUnalignedCheckpointsEnabled == that.isUnalignedCheckpointsEnabled && alignedCheckpointTimeout == that.alignedCheckpointTimeout && checkpointRetentionPolicy == that.checkpointRetentionPolicy - && isPreferCheckpointForRecovery == that.isPreferCheckpointForRecovery && tolerableCheckpointFailureNumber == that.tolerableCheckpointFailureNumber && checkpointIdOfIgnoredInFlightData == that.checkpointIdOfIgnoredInFlightData && enableCheckpointsAfterTasksFinish == that.enableCheckpointsAfterTasksFinish; @@ -219,7 +208,6 @@ public int hashCode() { isExactlyOnce, isUnalignedCheckpointsEnabled, alignedCheckpointTimeout, - isPreferCheckpointForRecovery, tolerableCheckpointFailureNumber, checkpointIdOfIgnoredInFlightData, enableCheckpointsAfterTasksFinish); @@ -244,8 +232,6 @@ public String toString() { + isUnalignedCheckpointsEnabled + ", alignedCheckpointTimeout=" + alignedCheckpointTimeout - + ", isPreferCheckpointForRecovery=" - + isPreferCheckpointForRecovery + ", tolerableCheckpointFailureNumber=" + tolerableCheckpointFailureNumber + ", checkpointIdOfIgnoredInFlightData=" @@ -268,7 +254,6 @@ public static class CheckpointCoordinatorConfigurationBuilder { private CheckpointRetentionPolicy checkpointRetentionPolicy = CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; private boolean isExactlyOnce = true; - private boolean isPreferCheckpointForRecovery = true; private int tolerableCheckpointFailureNumber; private boolean isUnalignedCheckpointsEnabled; private long alignedCheckpointTimeout = 0; @@ -283,7 +268,6 @@ public CheckpointCoordinatorConfiguration build() { maxConcurrentCheckpoints, checkpointRetentionPolicy, isExactlyOnce, - isPreferCheckpointForRecovery, tolerableCheckpointFailureNumber, isUnalignedCheckpointsEnabled, alignedCheckpointTimeout, @@ -326,12 +310,6 @@ public CheckpointCoordinatorConfigurationBuilder setExactlyOnce(boolean exactlyO return this; } - public CheckpointCoordinatorConfigurationBuilder setPreferCheckpointForRecovery( - boolean preferCheckpointForRecovery) { - isPreferCheckpointForRecovery = preferCheckpointForRecovery; - return this; - } - public CheckpointCoordinatorConfigurationBuilder setTolerableCheckpointFailureNumber( int tolerableCheckpointFailureNumber) { this.tolerableCheckpointFailureNumber = tolerableCheckpointFailureNumber; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java index 2424100db6573..ec1572aadd641 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorFailureTest.java @@ -264,8 +264,7 @@ public void addCheckpoint( } @Override - public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) - throws Exception { + public CompletedCheckpoint getLatestCheckpoint() throws Exception { throw new UnsupportedOperationException("Not implemented."); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java index e9878890cfe29..6c52fc7c42138 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorMasterHooksTest.java @@ -231,7 +231,7 @@ public void testHooksAreCalledOnTrigger() throws Exception { assertEquals(0, cc.getNumberOfPendingCheckpoints()); assertEquals(1, cc.getNumberOfRetainedSuccessfulCheckpoints()); - final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint(false); + final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint(); final Collection masterStates = chk.getMasterHookStates(); assertEquals(2, masterStates.size()); @@ -473,7 +473,6 @@ private CheckpointCoordinator instantiateCheckpointCoordinator( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, - false, 0, 0); Executor executor = Executors.directExecutor(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java index c7dff7e87efc3..51579a0a93f44 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorRestoringTest.java @@ -27,13 +27,11 @@ import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; -import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.ChainedStateHandle; @@ -46,7 +44,6 @@ import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.CommonTestUtils; import org.apache.flink.util.FlinkRuntimeException; -import org.apache.flink.util.SerializableObject; import org.apache.flink.util.TestLogger; import org.apache.flink.shaded.guava30.com.google.common.collect.Iterables; @@ -69,7 +66,6 @@ import java.util.Objects; import java.util.Random; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -85,8 +81,6 @@ import static org.apache.flink.runtime.checkpoint.StateHandleDummyUtil.createNewResultSubpartitionStateHandle; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -284,161 +278,6 @@ public void testRestoreLatestCheckpointedStateScaleOut() throws Exception { testRestoreLatestCheckpointedStateWithChangingParallelism(true); } - @Test - public void testRestoreLatestCheckpointWhenPreferCheckpoint() throws Exception { - testRestoreLatestCheckpointIsPreferSavepoint(true); - } - - @Test - public void testRestoreLatestCheckpointWhenPreferSavepoint() throws Exception { - testRestoreLatestCheckpointIsPreferSavepoint(false); - } - - private void testRestoreLatestCheckpointIsPreferSavepoint(boolean isPreferCheckpoint) { - try { - StandaloneCheckpointIDCounter checkpointIDCounter = new StandaloneCheckpointIDCounter(); - - final JobVertexID statefulId = new JobVertexID(); - final JobVertexID statelessId = new JobVertexID(); - - final ExecutionGraph graph = - new CheckpointCoordinatorTestingUtils.CheckpointExecutionGraphBuilder() - .addJobVertex(statefulId) - .addJobVertex(statelessId) - .build(); - - ExecutionJobVertex stateful = graph.getJobVertex(statefulId); - ExecutionJobVertex stateless = graph.getJobVertex(statelessId); - - ExecutionVertex stateful1 = stateful.getTaskVertices()[0]; - ExecutionVertex stateless1 = stateless.getTaskVertices()[0]; - - Execution statefulExec1 = stateful1.getCurrentExecutionAttempt(); - Execution statelessExec1 = stateless1.getCurrentExecutionAttempt(); - - Set tasks = new HashSet<>(); - tasks.add(stateful); - tasks.add(stateless); - - CompletedCheckpointStore store = new EmbeddedCompletedCheckpointStore(2); - - CheckpointCoordinatorConfiguration chkConfig = - new CheckpointCoordinatorConfigurationBuilder() - .setPreferCheckpointForRecovery(isPreferCheckpoint) - .build(); - CheckpointCoordinator coord = - new CheckpointCoordinatorBuilder() - .setExecutionGraph(graph) - .setCheckpointCoordinatorConfiguration(chkConfig) - .setCheckpointIDCounter(checkpointIDCounter) - .setCompletedCheckpointStore(store) - .setTimer(manuallyTriggeredScheduledExecutor) - .build(); - - // trigger a checkpoint and wait to become a completed checkpoint - final CompletableFuture checkpointFuture = - coord.triggerCheckpoint(false); - manuallyTriggeredScheduledExecutor.triggerAll(); - assertFalse(checkpointFuture.isCompletedExceptionally()); - - long checkpointId = checkpointIDCounter.getLast(); - - KeyGroupRange keyGroupRange = KeyGroupRange.of(0, 0); - List testStates = singletonList(new SerializableObject()); - KeyedStateHandle serializedKeyGroupStates = - generateKeyGroupState(keyGroupRange, testStates); - - TaskStateSnapshot subtaskStatesForCheckpoint = new TaskStateSnapshot(); - - subtaskStatesForCheckpoint.putSubtaskStateByOperatorID( - OperatorID.fromJobVertexID(statefulId), - OperatorSubtaskState.builder() - .setManagedKeyedState(serializedKeyGroupStates) - .build()); - - coord.receiveAcknowledgeMessage( - new AcknowledgeCheckpoint( - graph.getJobID(), - statefulExec1.getAttemptId(), - checkpointId, - new CheckpointMetrics(), - subtaskStatesForCheckpoint), - TASK_MANAGER_LOCATION_INFO); - coord.receiveAcknowledgeMessage( - new AcknowledgeCheckpoint( - graph.getJobID(), statelessExec1.getAttemptId(), checkpointId), - TASK_MANAGER_LOCATION_INFO); - - CompletedCheckpoint success = coord.getSuccessfulCheckpoints().get(0); - assertEquals(graph.getJobID(), success.getJobId()); - - // trigger a savepoint and wait it to be finished - String savepointDir = tmpFolder.newFolder().getAbsolutePath(); - CompletableFuture savepointFuture = - coord.triggerSavepoint(savepointDir); - - KeyGroupRange keyGroupRangeForSavepoint = KeyGroupRange.of(1, 1); - List testStatesForSavepoint = - singletonList(new SerializableObject()); - KeyedStateHandle serializedKeyGroupStatesForSavepoint = - generateKeyGroupState(keyGroupRangeForSavepoint, testStatesForSavepoint); - - TaskStateSnapshot subtaskStatesForSavepoint = new TaskStateSnapshot(); - - subtaskStatesForSavepoint.putSubtaskStateByOperatorID( - OperatorID.fromJobVertexID(statefulId), - OperatorSubtaskState.builder() - .setManagedOperatorState(StateObjectCollection.empty()) - .setRawOperatorState(StateObjectCollection.empty()) - .setManagedKeyedState( - StateObjectCollection.singleton( - serializedKeyGroupStatesForSavepoint)) - .setRawKeyedState(StateObjectCollection.empty()) - .build()); - - manuallyTriggeredScheduledExecutor.triggerAll(); - checkpointId = checkpointIDCounter.getLast(); - coord.receiveAcknowledgeMessage( - new AcknowledgeCheckpoint( - graph.getJobID(), - statefulExec1.getAttemptId(), - checkpointId, - new CheckpointMetrics(), - subtaskStatesForSavepoint), - TASK_MANAGER_LOCATION_INFO); - coord.receiveAcknowledgeMessage( - new AcknowledgeCheckpoint( - graph.getJobID(), statelessExec1.getAttemptId(), checkpointId), - TASK_MANAGER_LOCATION_INFO); - - assertNotNull(savepointFuture.get()); - - // restore and jump the latest savepoint - assertTrue(coord.restoreLatestCheckpointedStateToAll(tasks, false)); - - // compare and see if it used the checkpoint's subtaskStates - assertNotNull( - "Stateful vertex should get state to restore", statefulExec1.getTaskRestore()); - if (isPreferCheckpoint) { - assertEquals( - subtaskStatesForCheckpoint, - statefulExec1.getTaskRestore().getTaskStateSnapshot()); - } else { - assertEquals( - subtaskStatesForSavepoint, - statefulExec1.getTaskRestore().getTaskStateSnapshot()); - } - assertNull( - "Stateless vertex should not get state to restore", - statelessExec1.getTaskRestore()); - - coord.shutdown(); - } catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - /** * Tests the checkpoint restoration with changing parallelism of job vertex with partitioned * state. diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java index fb21d8e81b8eb..fb15d31cf2945 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java @@ -86,7 +86,6 @@ public void testDeserializationOfUserCodeWithUserClassLoader() throws Exception CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, - false, 0, 0), new SerializedValue(new CustomStateBackend(outOfClassPath)), diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java index cd73b1bca1a05..0e97958c032aa 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStatsTrackerTest.java @@ -59,7 +59,6 @@ public void testGetSnapshottingSettings() throws Exception { CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, false, - false, 0, 0), null); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java index 5a59fa1a81347..8eaa0618b04d3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CompletedCheckpointStoreTest.java @@ -81,11 +81,11 @@ public void testAddAndGetLatestCheckpoint() throws Exception { // Add and get latest checkpoints.addCheckpoint(expected[0], new CheckpointsCleaner(), () -> {}); assertEquals(1, checkpoints.getNumberOfRetainedCheckpoints()); - verifyCheckpoint(expected[0], checkpoints.getLatestCheckpoint(false)); + verifyCheckpoint(expected[0], checkpoints.getLatestCheckpoint()); checkpoints.addCheckpoint(expected[1], new CheckpointsCleaner(), () -> {}); assertEquals(2, checkpoints.getNumberOfRetainedCheckpoints()); - verifyCheckpoint(expected[1], checkpoints.getLatestCheckpoint(false)); + verifyCheckpoint(expected[1], checkpoints.getLatestCheckpoint()); } /** @@ -123,8 +123,7 @@ public void testAddCheckpointMoreThanMaxRetained() throws Exception { * Tests that * *
    - *
  • {@link CompletedCheckpointStore#getLatestCheckpoint(boolean)} returns null - * , + *
  • {@link CompletedCheckpointStore#getLatestCheckpoint()} returns null , *
  • {@link CompletedCheckpointStore#getAllCheckpoints()} returns an empty list, *
  • {@link CompletedCheckpointStore#getNumberOfRetainedCheckpoints()} returns 0. *
@@ -133,7 +132,7 @@ public void testAddCheckpointMoreThanMaxRetained() throws Exception { public void testEmptyState() throws Exception { CompletedCheckpointStore checkpoints = createRecoveredCompletedCheckpointStore(1); - assertNull(checkpoints.getLatestCheckpoint(false)); + assertNull(checkpoints.getLatestCheckpoint()); assertEquals(0, checkpoints.getAllCheckpoints().size()); assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints()); } @@ -186,7 +185,7 @@ public void testDiscardAllCheckpoints() throws Exception { checkpoints.shutdown(JobStatus.FINISHED, new CheckpointsCleaner()); // Empty state - assertNull(checkpoints.getLatestCheckpoint(false)); + assertNull(checkpoints.getLatestCheckpoint()); assertEquals(0, checkpoints.getAllCheckpoints().size()); assertEquals(0, checkpoints.getNumberOfRetainedCheckpoints()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java index 00a353a68a804..a42387ffdef59 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/FailoverStrategyCheckpointCoordinatorTest.java @@ -69,7 +69,6 @@ public void testAbortPendingCheckpointsWithTriggerValidation() throws Exception CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, - false, 0, 0); CheckpointCoordinator checkpointCoordinator = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java index 218b023742c5d..1f8f3ad2ad8f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/StandaloneCompletedCheckpointStoreTest.java @@ -32,8 +32,6 @@ import java.util.concurrent.Executor; import static org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION; -import static org.hamcrest.CoreMatchers.equalTo; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -120,60 +118,4 @@ public boolean discardOnSubsume() { } discardAttempted.await(); } - - @Test - public void testPreferCheckpointWithoutSavepoint() throws Exception { - StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(5); - JobID jobId = new JobID(); - store.addCheckpoint(checkpoint(jobId, 1L), new CheckpointsCleaner(), () -> {}); - store.addCheckpoint(checkpoint(jobId, 2L), new CheckpointsCleaner(), () -> {}); - store.addCheckpoint(checkpoint(jobId, 3L), new CheckpointsCleaner(), () -> {}); - - CompletedCheckpoint latestCheckpoint = store.getLatestCheckpoint(true); - - assertThat(latestCheckpoint.getCheckpointID(), equalTo(3L)); - } - - @Test - public void testPreferCheckpointWithSavepoint() throws Exception { - StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(5); - JobID jobId = new JobID(); - store.addCheckpoint(checkpoint(jobId, 1L), new CheckpointsCleaner(), () -> {}); - store.addCheckpoint(savepoint(jobId, 2L), new CheckpointsCleaner(), () -> {}); - store.addCheckpoint(savepoint(jobId, 3L), new CheckpointsCleaner(), () -> {}); - - CompletedCheckpoint latestCheckpoint = store.getLatestCheckpoint(true); - - assertThat(latestCheckpoint.getCheckpointID(), equalTo(1L)); - } - - @Test - public void testPreferCheckpointWithOnlySavepoint() throws Exception { - StandaloneCompletedCheckpointStore store = new StandaloneCompletedCheckpointStore(5); - JobID jobId = new JobID(); - store.addCheckpoint(savepoint(jobId, 1L), new CheckpointsCleaner(), () -> {}); - store.addCheckpoint(savepoint(jobId, 2L), new CheckpointsCleaner(), () -> {}); - - CompletedCheckpoint latestCheckpoint = store.getLatestCheckpoint(true); - - assertThat(latestCheckpoint.getCheckpointID(), equalTo(2L)); - } - - private static CompletedCheckpoint checkpoint(JobID jobId, long checkpointId) { - return new TestCompletedCheckpoint( - jobId, - checkpointId, - checkpointId, - Collections.emptyMap(), - CheckpointProperties.forCheckpoint(CheckpointRetentionPolicy.RETAIN_ON_FAILURE)); - } - - private static CompletedCheckpoint savepoint(JobID jobId, long checkpointId) { - return new TestCompletedCheckpoint( - jobId, - checkpointId, - checkpointId, - Collections.emptyMap(), - CheckpointProperties.forSavepoint(true)); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java index 629dad17ebcc0..5398e57b56da8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/TestingCompletedCheckpointStore.java @@ -41,7 +41,7 @@ public void addCheckpoint( } @Override - public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) { + public CompletedCheckpoint getLatestCheckpoint() { return null; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index d909174712472..8541dfcbec59a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -130,7 +130,7 @@ public void testRecover() throws Exception { assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size()); assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); - assertEquals(expected[2], checkpoints.getLatestCheckpoint(false)); + assertEquals(expected[2], checkpoints.getLatestCheckpoint()); List expectedCheckpoints = new ArrayList<>(3); expectedCheckpoints.add(expected[1]); @@ -219,7 +219,7 @@ public void testSuspendKeepsCheckpoints() throws Exception { sharedStateRegistry.close(); store = createRecoveredCompletedCheckpointStore(1); - CompletedCheckpoint recovered = store.getLatestCheckpoint(false); + CompletedCheckpoint recovered = store.getLatestCheckpoint(); assertEquals(checkpoint, recovered); } @@ -247,7 +247,7 @@ public void testLatestCheckpointRecovery() throws Exception { sharedStateRegistry.close(); final CompletedCheckpoint latestCheckpoint = - createRecoveredCompletedCheckpointStore(numCheckpoints).getLatestCheckpoint(false); + createRecoveredCompletedCheckpointStore(numCheckpoints).getLatestCheckpoint(); assertEquals(checkpoints.get(checkpoints.size() - 1), latestCheckpoint); } @@ -278,7 +278,7 @@ public void testConcurrentCheckpointOperations() throws Exception { final CompletedCheckpointStore zkCheckpointStore2 = createRecoveredCompletedCheckpointStore(numberOfCheckpoints); - CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint(false); + CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint(); assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint); TestCompletedCheckpoint recoveredTestCheckpoint = (TestCompletedCheckpoint) recoveredCheckpoint; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java index 222f82b10ee25..323c4a9961442 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreMockitoTest.java @@ -175,7 +175,7 @@ public Void answer(InvocationOnMock invocation) Executors.directExecutor()); CompletedCheckpoint latestCompletedCheckpoint = - zooKeeperCompletedCheckpointStore.getLatestCheckpoint(false); + zooKeeperCompletedCheckpointStore.getLatestCheckpoint(); // check that we return the latest retrievable checkpoint // this should remove the latest checkpoint because it is broken @@ -198,140 +198,4 @@ public Void answer(InvocationOnMock invocation) verify(retrievableStateHandle1, never()).discardState(); verify(retrievableStateHandle2, never()).discardState(); } - - /** - * Tests that the completed checkpoint store can retrieve all checkpoints stored in ZooKeeper - * and ignores those which cannot be retrieved via their state handles. - * - *

We have a timeout in case the ZooKeeper store get's into a deadlock/livelock situation. - */ - @Test(timeout = 50000) - public void testCheckpointRecoveryPreferCheckpoint() throws Exception { - final JobID jobID = new JobID(); - final long checkpoint1Id = 1L; - final long checkpoint2Id = 2; - final List, String>> - checkpointsInZooKeeper = new ArrayList<>(4); - - final Collection expectedCheckpointIds = new HashSet<>(2); - expectedCheckpointIds.add(1L); - expectedCheckpointIds.add(2L); - - final RetrievableStateHandle retrievableStateHandle1 = - mock(RetrievableStateHandle.class); - when(retrievableStateHandle1.retrieveState()) - .then( - (invocation) -> - new CompletedCheckpoint( - jobID, - checkpoint1Id, - 1L, - 1L, - new HashMap<>(), - null, - CheckpointProperties.forCheckpoint( - CheckpointRetentionPolicy - .NEVER_RETAIN_AFTER_TERMINATION), - new TestCompletedCheckpointStorageLocation())); - - final RetrievableStateHandle retrievableStateHandle2 = - mock(RetrievableStateHandle.class); - when(retrievableStateHandle2.retrieveState()) - .then( - (invocation -> - new CompletedCheckpoint( - jobID, - checkpoint2Id, - 2L, - 2L, - new HashMap<>(), - null, - CheckpointProperties.forSavepoint(true), - new TestCompletedCheckpointStorageLocation()))); - - checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle1, "/foobar1")); - checkpointsInZooKeeper.add(Tuple2.of(retrievableStateHandle2, "/foobar2")); - - final CuratorFramework client = mock(CuratorFramework.class, Mockito.RETURNS_DEEP_STUBS); - final RetrievableStateStorageHelper storageHelperMock = - mock(RetrievableStateStorageHelper.class); - - ZooKeeperStateHandleStore zooKeeperStateHandleStoreMock = - spy(new ZooKeeperStateHandleStore<>(client, storageHelperMock)); - doReturn(checkpointsInZooKeeper).when(zooKeeperStateHandleStoreMock).getAllAndLock(); - - final int numCheckpointsToRetain = 1; - - // Mocking for the delete operation on the CuratorFramework client - // It assures that the callback is executed synchronously - - final EnsurePath ensurePathMock = mock(EnsurePath.class); - final CuratorEvent curatorEventMock = mock(CuratorEvent.class); - when(curatorEventMock.getType()).thenReturn(CuratorEventType.DELETE); - when(curatorEventMock.getResultCode()).thenReturn(0); - when(client.newNamespaceAwareEnsurePath(anyString())).thenReturn(ensurePathMock); - - when(client.delete().inBackground(any(BackgroundCallback.class), any(Executor.class))) - .thenAnswer( - new Answer>() { - @Override - public ErrorListenerPathable answer(InvocationOnMock invocation) - throws Throwable { - final BackgroundCallback callback = - (BackgroundCallback) invocation.getArguments()[0]; - - ErrorListenerPathable result = - mock(ErrorListenerPathable.class); - - when(result.forPath(anyString())) - .thenAnswer( - new Answer() { - @Override - public Void answer(InvocationOnMock invocation) - throws Throwable { - - callback.processResult( - client, curatorEventMock); - - return null; - } - }); - - return result; - } - }); - - CompletedCheckpointStore zooKeeperCompletedCheckpointStore = - new DefaultCompletedCheckpointStore<>( - numCheckpointsToRetain, - zooKeeperStateHandleStoreMock, - zooKeeperCheckpointStoreUtil, - DefaultCompletedCheckpointStoreUtils.retrieveCompletedCheckpoints( - zooKeeperStateHandleStoreMock, zooKeeperCheckpointStoreUtil), - Executors.directExecutor()); - - CompletedCheckpoint latestCompletedCheckpoint = - zooKeeperCompletedCheckpointStore.getLatestCheckpoint(true); - - // check that we return the latest retrievable checkpoint - // this should remove the latest checkpoint because it is broken - assertEquals(checkpoint1Id, latestCompletedCheckpoint.getCheckpointID()); - - // this should remove the second broken checkpoint because we're iterating over all - // checkpoints - List completedCheckpoints = - zooKeeperCompletedCheckpointStore.getAllCheckpoints(); - - Collection actualCheckpointIds = new HashSet<>(completedCheckpoints.size()); - - for (CompletedCheckpoint completedCheckpoint : completedCheckpoints) { - actualCheckpointIds.add(completedCheckpoint.getCheckpointID()); - } - - assertEquals(expectedCheckpointIds, actualCheckpointIds); - - // check that we did not discard any of the state handles - verify(retrievableStateHandle1, never()).discardState(); - verify(retrievableStateHandle2, never()).discardState(); - } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java index 4636812c172f7..e751f64ad9d6b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ArchivedExecutionGraphTest.java @@ -104,7 +104,6 @@ public static void setupExecutionGraph() throws Exception { CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, - false, 0, 0); JobCheckpointingSettings checkpointingSettings = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java index 1eb39d776150f..a1db269bf6ea2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java @@ -672,7 +672,6 @@ private ExecutionGraph createExecutionGraph(Configuration configuration) throws CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, false, - false, 0, 0), null)); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java index dd4424bf15d16..3caedb0ee7999 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java @@ -397,7 +397,6 @@ private static JobCheckpointingSettings createCheckpointSettingsWithInterval( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, - false, 0, 0); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java index 8a7a73804e34d..3a9c2dfce0c80 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/tasks/JobCheckpointingSettingsTest.java @@ -44,7 +44,6 @@ public void testIsJavaSerializable() throws Exception { CheckpointRetentionPolicy.RETAIN_ON_FAILURE, false, false, - false, 0, 0), new SerializedValue<>(new MemoryStateBackend())); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 455e338078264..db070c09e957d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -830,7 +830,7 @@ public void testRestoringFromSavepoint() throws Exception { // been created taskSubmitLatch.await(); final CompletedCheckpoint savepointCheckpoint = - completedCheckpointStore.getLatestCheckpoint(false); + completedCheckpointStore.getLatestCheckpoint(); assertThat(savepointCheckpoint, Matchers.notNullValue()); @@ -881,7 +881,7 @@ public void testCheckpointPrecedesSavepointRecovery() throws Exception { try { // starting the JobMaster should have read the savepoint final CompletedCheckpoint savepointCheckpoint = - completedCheckpointStore.getLatestCheckpoint(false); + completedCheckpointStore.getLatestCheckpoint(); assertThat(savepointCheckpoint, Matchers.notNullValue()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java index ebd0ae3d20d33..b5054b0aeb762 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/TestUtils.java @@ -103,7 +103,6 @@ public static JobGraph createJobGraphFromJobVerticesWithCheckpointing( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION) .setExactlyOnce(true) .setUnalignedCheckpointsEnabled(false) - .setPreferCheckpointForRecovery(false) .setTolerableCheckpointFailureNumber(0) .build(); final JobCheckpointingSettings checkpointingSettings = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java index b755ad94e6363..ae967ceb1cc65 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactoryTest.java @@ -107,7 +107,7 @@ public void testRestoringModifiedJobFromSavepointWithAllowNonRestoredStateSuccee SchedulerBase.computeVertexParallelismStore(jobGraphWithNewOperator), log); - final CompletedCheckpoint savepoint = completedCheckpointStore.getLatestCheckpoint(false); + final CompletedCheckpoint savepoint = completedCheckpointStore.getLatestCheckpoint(); MatcherAssert.assertThat(savepoint, notNullValue()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java index bf3a9f8db8660..6967c7243adf8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java @@ -162,7 +162,6 @@ public static void enableCheckpointing( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, false, false, - false, 0, 0); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java index 7efd3b45ba0b5..8204c6759a790 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/CheckpointConfig.java @@ -118,9 +118,6 @@ public class CheckpointConfig implements java.io.Serializable { */ @Deprecated private boolean failOnCheckpointingErrors = true; - /** Determines if a job will fallback to checkpoint when there is a more recent savepoint. * */ - private boolean preferCheckpointForRecovery = false; - /** * Determines the threshold that we tolerance declined checkpoint failure number. The default * value is -1 meaning undetermined and not set via {@link @@ -147,7 +144,6 @@ public CheckpointConfig(final CheckpointConfig checkpointConfig) { this.checkpointTimeout = checkpointConfig.checkpointTimeout; this.maxConcurrentCheckpoints = checkpointConfig.maxConcurrentCheckpoints; this.minPauseBetweenCheckpoints = checkpointConfig.minPauseBetweenCheckpoints; - this.preferCheckpointForRecovery = checkpointConfig.preferCheckpointForRecovery; this.tolerableCheckpointFailureNumber = checkpointConfig.tolerableCheckpointFailureNumber; this.unalignedCheckpointsEnabled = checkpointConfig.isUnalignedCheckpointsEnabled(); this.alignedCheckpointTimeout = checkpointConfig.alignedCheckpointTimeout; @@ -463,37 +459,6 @@ public boolean isExternalizedCheckpointsEnabled() { return externalizedCheckpointCleanup != null; } - /** - * Returns whether a job recovery should fallback to checkpoint when there is a more recent - * savepoint. - * - * @return true if a job recovery should fallback to checkpoint. - * @deprecated Don't activate prefer checkpoints for recovery because it can lead to data loss - * and duplicate output. This option will soon be removed. See FLINK-20427 for more - * information. - */ - @PublicEvolving - @Deprecated - public boolean isPreferCheckpointForRecovery() { - return preferCheckpointForRecovery; - } - - /** - * Sets whether a job recovery should fallback to checkpoint when there is a more recent - * savepoint. - * - * @deprecated Don't activate prefer checkpoints for recovery because it can lead to data loss - * and duplicate output. This option will soon be removed. See FLINK-20427 for more - * information. - */ - @PublicEvolving - @Deprecated - public void setPreferCheckpointForRecovery(boolean preferCheckpointForRecovery) { - this.preferCheckpointForRecovery = preferCheckpointForRecovery; - } - /** * Enables unaligned checkpoints, which greatly reduce checkpointing times under backpressure. * @@ -800,9 +765,6 @@ public void configure(ReadableConfig configuration) { configuration .getOptional(ExecutionCheckpointingOptions.MIN_PAUSE_BETWEEN_CHECKPOINTS) .ifPresent(m -> this.setMinPauseBetweenCheckpoints(m.toMillis())); - configuration - .getOptional(ExecutionCheckpointingOptions.PREFER_CHECKPOINT_FOR_RECOVERY) - .ifPresent(this::setPreferCheckpointForRecovery); configuration .getOptional(ExecutionCheckpointingOptions.TOLERABLE_FAILURE_NUMBER) .ifPresent(this::setTolerableCheckpointFailureNumber); diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java index 27a757476b5c4..f919363826964 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/environment/ExecutionCheckpointingOptions.java @@ -78,14 +78,6 @@ public class ExecutionCheckpointingOptions { + "sure that a minimum amount of time passes where no checkpoint is in progress at all.") .build()); - public static final ConfigOption PREFER_CHECKPOINT_FOR_RECOVERY = - ConfigOptions.key("execution.checkpointing.prefer-checkpoint-for-recovery") - .booleanType() - .defaultValue(false) - .withDescription( - "If enabled, a job recovery should fallback to checkpoint when there is a more recent " - + "savepoint."); - public static final ConfigOption TOLERABLE_FAILURE_NUMBER = ConfigOptions.key("execution.checkpointing.tolerable-failed-checkpoints") .intType() diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index f0123e582df9d..536577bc761d1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -1315,7 +1315,6 @@ private void configureCheckpointing() { .setCheckpointRetentionPolicy(retentionAfterTermination) .setExactlyOnce( getCheckpointingMode(cfg) == CheckpointingMode.EXACTLY_ONCE) - .setPreferCheckpointForRecovery(cfg.isPreferCheckpointForRecovery()) .setTolerableCheckpointFailureNumber( cfg.getTolerableCheckpointFailureNumber()) .setUnalignedCheckpointsEnabled(cfg.isUnalignedCheckpointsEnabled()) diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java index 0bbab9909adcb..6c01193e7247e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/CheckpointConfigFromConfigurationTest.java @@ -69,12 +69,6 @@ public static Collection specs() { .viaSetter(CheckpointConfig::setMinPauseBetweenCheckpoints) .getterVia(CheckpointConfig::getMinPauseBetweenCheckpoints) .nonDefaultValue(100L), - TestSpec.testValue(true) - .whenSetFromFile( - "execution.checkpointing.prefer-checkpoint-for-recovery", "true") - .viaSetter(CheckpointConfig::setPreferCheckpointForRecovery) - .getterVia(CheckpointConfig::isPreferCheckpointForRecovery) - .nonDefaultValue(true), TestSpec.testValue( CheckpointConfig.ExternalizedCheckpointCleanup .RETAIN_ON_CANCELLATION) diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java index 5a98d140c927d..2164968dca274 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterStopWithSavepointITCase.java @@ -289,7 +289,6 @@ private void setUpJobGraph( CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, - false, 0, 0), null); diff --git a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java index 1361c4ef7ec3e..0e3e7543b6836 100644 --- a/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTriggerSavepointITCase.java @@ -107,7 +107,6 @@ private void setUpWithCheckpointInterval(long checkpointInterval) throws Excepti CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION, true, false, - false, 0, 0), null);