Skip to content

Commit

Permalink
[FLINK-20427] Remove configuration option to prefer checkpoints over …
Browse files Browse the repository at this point in the history
…newer savepoints
  • Loading branch information
Nicolaus Weidner authored and tillrohrmann committed Sep 3, 2021
1 parent e03c84a commit eb8c21c
Show file tree
Hide file tree
Showing 33 changed files with 19 additions and 525 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,9 +152,6 @@ env.get_checkpoint_config().set_max_concurrent_checkpoints(1)

# 开启在 job 中止后仍然保留的 externalized checkpoints
env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

# 允许在有更近 savepoint 时回退到 checkpoint
env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(True)
```
{{< /tab >}}
{{< /tabs >}}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,9 +173,6 @@ env.get_checkpoint_config().set_max_concurrent_checkpoints(1)
# enable externalized checkpoints which are retained after job cancellation
env.get_checkpoint_config().enable_externalized_checkpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION)

# allow job recovery fallback to checkpoint when there is a more recent savepoint
env.get_checkpoint_config().set_prefer_checkpoint_for_recovery(True)

# enables the experimental unaligned checkpoints
env.get_checkpoint_config().enable_unaligned_checkpoints()
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ public void testJobManagerJMXMetricAccess() throws Exception {
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
false,
0,
0),
null);
Expand Down
22 changes: 0 additions & 22 deletions flink-python/pyflink/datastream/checkpoint_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,28 +255,6 @@ def is_externalized_checkpoints_enabled(self) -> bool:
"""
return self._j_checkpoint_config.isExternalizedCheckpointsEnabled()

def is_prefer_checkpoint_for_recovery(self) -> bool:
"""
Returns whether a job recovery should fallback to checkpoint when there is a more recent
savepoint.
:return: ``True`` if a job recovery should fallback to checkpoint, false otherwise.
"""
return self._j_checkpoint_config.isPreferCheckpointForRecovery()

def set_prefer_checkpoint_for_recovery(
self,
prefer_checkpoint_for_recovery: bool) -> 'CheckpointConfig':
"""
Sets whether a job recovery should fallback to checkpoint when there is a more recent
savepoint.
:param prefer_checkpoint_for_recovery: ``True`` if a job recovery should fallback to
checkpoint, false otherwise.
"""
self._j_checkpoint_config.setPreferCheckpointForRecovery(prefer_checkpoint_for_recovery)
return self

def get_externalized_checkpoint_cleanup(self) -> Optional['ExternalizedCheckpointCleanup']:
"""
Returns the cleanup behaviour for externalized checkpoints.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,14 +129,6 @@ def test_get_set_externalized_checkpoints_cleanup(self):
self.assertEqual(self.checkpoint_config.get_externalized_checkpoint_cleanup(),
ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION)

def test_get_set_prefer_checkpoint_for_recovery(self):

self.assertFalse(self.checkpoint_config.is_prefer_checkpoint_for_recovery())

self.checkpoint_config.set_prefer_checkpoint_for_recovery(True)

self.assertTrue(self.checkpoint_config.is_prefer_checkpoint_for_recovery())

def test_is_unaligned_checkpointing_enabled(self):

self.assertFalse(self.checkpoint_config.is_unaligned_checkpoints_enabled())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,8 +210,6 @@ public class CheckpointCoordinator {
/** Registry that tracks state which is shared across (incremental) checkpoints. */
private SharedStateRegistry sharedStateRegistry;

private boolean isPreferCheckpointForRecovery;

/** Id of checkpoint for which in-flight data should be ignored on recovery. */
private final long checkpointIdOfIgnoredInFlightData;

Expand Down Expand Up @@ -310,7 +308,6 @@ public CheckpointCoordinator(
this.checkpointsCleaner = checkNotNull(checkpointsCleaner);
this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
this.isPreferCheckpointForRecovery = chkConfig.isPreferCheckpointForRecovery();
this.failureManager = checkNotNull(failureManager);
this.checkpointPlanCalculator = checkNotNull(checkpointPlanCalculator);
this.attemptMappingProvider = checkNotNull(attemptMappingProvider);
Expand Down Expand Up @@ -1513,8 +1510,7 @@ private OptionalLong restoreLatestCheckpointedStateInternal(
sharedStateRegistry);

// Restore from the latest checkpoint
CompletedCheckpoint latest =
completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery);
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();

if (latest == null) {
LOG.info("No checkpoint found during restore.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.ListIterator;

/** A bounded LIFO-queue of {@link CompletedCheckpoint} instances. */
public interface CompletedCheckpointStore {
Expand All @@ -48,33 +47,13 @@ void addCheckpoint(
* Returns the latest {@link CompletedCheckpoint} instance or <code>null</code> if none was
* added.
*/
default CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery)
throws Exception {
default CompletedCheckpoint getLatestCheckpoint() throws Exception {
List<CompletedCheckpoint> allCheckpoints = getAllCheckpoints();
if (allCheckpoints.isEmpty()) {
return null;
}

CompletedCheckpoint lastCompleted = allCheckpoints.get(allCheckpoints.size() - 1);

if (isPreferCheckpointForRecovery
&& allCheckpoints.size() > 1
&& lastCompleted.getProperties().isSavepoint()) {
ListIterator<CompletedCheckpoint> listIterator =
allCheckpoints.listIterator(allCheckpoints.size() - 1);
while (listIterator.hasPrevious()) {
CompletedCheckpoint prev = listIterator.previous();
if (!prev.getProperties().isSavepoint()) {
LOG.info(
"Found a completed checkpoint ({}) before the latest savepoint, will use it to recover!",
prev);
return prev;
}
}
LOG.info("Did not find earlier checkpoint, using latest savepoint to recover.");
}

return lastCompleted;
return allCheckpoints.get(allCheckpoints.size() - 1);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
*/
private final boolean isExactlyOnce;

private final boolean isPreferCheckpointForRecovery;

private final boolean isUnalignedCheckpointsEnabled;

private final long alignedCheckpointTimeout;
Expand All @@ -79,7 +77,6 @@ public CheckpointCoordinatorConfiguration(
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
boolean isUnalignedCheckpoint,
boolean isPreferCheckpointForRecovery,
int tolerableCpFailureNumber,
long checkpointIdOfIgnoredInFlightData) {
this(
Expand All @@ -89,7 +86,6 @@ public CheckpointCoordinatorConfiguration(
maxConcurrentCheckpoints,
checkpointRetentionPolicy,
isExactlyOnce,
isPreferCheckpointForRecovery,
tolerableCpFailureNumber,
isUnalignedCheckpoint,
0,
Expand All @@ -104,7 +100,6 @@ private CheckpointCoordinatorConfiguration(
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce,
boolean isPreferCheckpointForRecovery,
int tolerableCpFailureNumber,
boolean isUnalignedCheckpointsEnabled,
long alignedCheckpointTimeout,
Expand All @@ -129,7 +124,6 @@ private CheckpointCoordinatorConfiguration(
this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy);
this.isExactlyOnce = isExactlyOnce;
this.isPreferCheckpointForRecovery = isPreferCheckpointForRecovery;
this.tolerableCheckpointFailureNumber = tolerableCpFailureNumber;
this.isUnalignedCheckpointsEnabled = isUnalignedCheckpointsEnabled;
this.alignedCheckpointTimeout = alignedCheckpointTimeout;
Expand Down Expand Up @@ -161,10 +155,6 @@ public boolean isExactlyOnce() {
return isExactlyOnce;
}

public boolean isPreferCheckpointForRecovery() {
return isPreferCheckpointForRecovery;
}

public int getTolerableCheckpointFailureNumber() {
return tolerableCheckpointFailureNumber;
}
Expand Down Expand Up @@ -202,7 +192,6 @@ public boolean equals(Object o) {
&& isUnalignedCheckpointsEnabled == that.isUnalignedCheckpointsEnabled
&& alignedCheckpointTimeout == that.alignedCheckpointTimeout
&& checkpointRetentionPolicy == that.checkpointRetentionPolicy
&& isPreferCheckpointForRecovery == that.isPreferCheckpointForRecovery
&& tolerableCheckpointFailureNumber == that.tolerableCheckpointFailureNumber
&& checkpointIdOfIgnoredInFlightData == that.checkpointIdOfIgnoredInFlightData
&& enableCheckpointsAfterTasksFinish == that.enableCheckpointsAfterTasksFinish;
Expand All @@ -219,7 +208,6 @@ public int hashCode() {
isExactlyOnce,
isUnalignedCheckpointsEnabled,
alignedCheckpointTimeout,
isPreferCheckpointForRecovery,
tolerableCheckpointFailureNumber,
checkpointIdOfIgnoredInFlightData,
enableCheckpointsAfterTasksFinish);
Expand All @@ -244,8 +232,6 @@ public String toString() {
+ isUnalignedCheckpointsEnabled
+ ", alignedCheckpointTimeout="
+ alignedCheckpointTimeout
+ ", isPreferCheckpointForRecovery="
+ isPreferCheckpointForRecovery
+ ", tolerableCheckpointFailureNumber="
+ tolerableCheckpointFailureNumber
+ ", checkpointIdOfIgnoredInFlightData="
Expand All @@ -268,7 +254,6 @@ public static class CheckpointCoordinatorConfigurationBuilder {
private CheckpointRetentionPolicy checkpointRetentionPolicy =
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION;
private boolean isExactlyOnce = true;
private boolean isPreferCheckpointForRecovery = true;
private int tolerableCheckpointFailureNumber;
private boolean isUnalignedCheckpointsEnabled;
private long alignedCheckpointTimeout = 0;
Expand All @@ -283,7 +268,6 @@ public CheckpointCoordinatorConfiguration build() {
maxConcurrentCheckpoints,
checkpointRetentionPolicy,
isExactlyOnce,
isPreferCheckpointForRecovery,
tolerableCheckpointFailureNumber,
isUnalignedCheckpointsEnabled,
alignedCheckpointTimeout,
Expand Down Expand Up @@ -326,12 +310,6 @@ public CheckpointCoordinatorConfigurationBuilder setExactlyOnce(boolean exactlyO
return this;
}

public CheckpointCoordinatorConfigurationBuilder setPreferCheckpointForRecovery(
boolean preferCheckpointForRecovery) {
isPreferCheckpointForRecovery = preferCheckpointForRecovery;
return this;
}

public CheckpointCoordinatorConfigurationBuilder setTolerableCheckpointFailureNumber(
int tolerableCheckpointFailureNumber) {
this.tolerableCheckpointFailureNumber = tolerableCheckpointFailureNumber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -264,8 +264,7 @@ public void addCheckpoint(
}

@Override
public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery)
throws Exception {
public CompletedCheckpoint getLatestCheckpoint() throws Exception {
throw new UnsupportedOperationException("Not implemented.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ public void testHooksAreCalledOnTrigger() throws Exception {
assertEquals(0, cc.getNumberOfPendingCheckpoints());

assertEquals(1, cc.getNumberOfRetainedSuccessfulCheckpoints());
final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint(false);
final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint();

final Collection<MasterState> masterStates = chk.getMasterHookStates();
assertEquals(2, masterStates.size());
Expand Down Expand Up @@ -473,7 +473,6 @@ private CheckpointCoordinator instantiateCheckpointCoordinator(
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true,
false,
false,
0,
0);
Executor executor = Executors.directExecutor();
Expand Down
Loading

0 comments on commit eb8c21c

Please sign in to comment.