Skip to content

Commit

Permalink
[FLINK-11159] Allow configuration whether to fall back to savepoints …
Browse files Browse the repository at this point in the history
…for restore

Closes apache#8410
  • Loading branch information
yanghua authored and Gyula Fora committed May 15, 2019
1 parent 81583e3 commit 1dfdaa4
Show file tree
Hide file tree
Showing 32 changed files with 510 additions and 95 deletions.
5 changes: 5 additions & 0 deletions docs/dev/stream/state/checkpointing.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ Other parameters for checkpointing include:

- *fail/continue task on checkpoint errors*: This determines if a task will be failed if an error occurs in the execution of the task's checkpoint procedure. This is the default behaviour. Alternatively, when this is disabled, the task will simply decline the checkpoint to the checkpoint coordinator and continue running.

- *prefer checkpoint for recovery*: This determines if a job will fallback to latest checkpoint even when there are more recent savepoints available to potentially reduce recovery time.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Expand All @@ -100,6 +102,9 @@ env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);

// enable externalized checkpoints which are retained after job cancellation
env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);

// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true);
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
Expand Down
5 changes: 5 additions & 0 deletions docs/dev/stream/state/checkpointing.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ Other parameters for checkpointing include:

- *fail/continue task on checkpoint errors*: This determines if a task will be failed if an error occurs in the execution of the task's checkpoint procedure. This is the default behaviour. Alternatively, when this is disabled, the task will simply decline the checkpoint to the checkpoint coordinator and continue running.

- *prefer checkpoint for recovery*: This determines if a job will fallback to latest checkpoint even when there are more recent savepoints available to potentially reduce recovery time.

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
Expand Down Expand Up @@ -125,6 +127,9 @@ env.getCheckpointConfig.setFailTasksOnCheckpointingErrors(false)

// allow only one checkpoint to be in progress at the same time
env.getCheckpointConfig.setMaxConcurrentCheckpoints(1)

// allow job recovery fallback to checkpoint when there is a more recent savepoint
env.getCheckpointConfig().setPreferCheckpointForRecovery(true)
{% endhighlight %}
</div>
</div>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ public void testJobManagerJMXMetricAccess() throws Exception {
50,
5,
CheckpointRetentionPolicy.NEVER_RETAIN_AFTER_TERMINATION,
true),
true,
false),
null));

ClusterClient<?> client = MINI_CLUSTER_RESOURCE.getClusterClient();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,8 @@ public class CheckpointCoordinator {
/** Registry that tracks state which is shared across (incremental) checkpoints. */
private SharedStateRegistry sharedStateRegistry;

private boolean isPreferCheckpointForRecovery;

// --------------------------------------------------------------------------------------------

public CheckpointCoordinator(
Expand All @@ -199,7 +201,8 @@ public CheckpointCoordinator(
CompletedCheckpointStore completedCheckpointStore,
StateBackend checkpointStateBackend,
Executor executor,
SharedStateRegistryFactory sharedStateRegistryFactory) {
SharedStateRegistryFactory sharedStateRegistryFactory,
boolean isPreferCheckpointForRecovery) {

// sanity checks
checkNotNull(checkpointStateBackend);
Expand Down Expand Up @@ -233,6 +236,7 @@ public CheckpointCoordinator(
this.executor = checkNotNull(executor);
this.sharedStateRegistryFactory = checkNotNull(sharedStateRegistryFactory);
this.sharedStateRegistry = sharedStateRegistryFactory.create(executor);
this.isPreferCheckpointForRecovery = isPreferCheckpointForRecovery;

this.recentPendingCheckpoints = new ArrayDeque<>(NUM_GHOST_CHECKPOINT_IDS);
this.masterHooks = new HashMap<>();
Expand Down Expand Up @@ -1080,7 +1084,7 @@ public boolean restoreLatestCheckpointedState(
LOG.debug("Status of the shared state registry of job {} after restore: {}.", job, sharedStateRegistry);

// Restore from the latest checkpoint
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint();
CompletedCheckpoint latest = completedCheckpointStore.getLatestCheckpoint(isPreferCheckpointForRecovery);

if (latest == null) {
if (errorIfNoCheckpoint) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,25 @@
package org.apache.flink.runtime.checkpoint;

import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.util.FlinkRuntimeException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.ListIterator;

/**
* A bounded LIFO-queue of {@link CompletedCheckpoint} instances.
*/
public interface CompletedCheckpointStore {

Logger LOG = LoggerFactory.getLogger(CompletedCheckpointStore.class);

/**
* Recover available {@link CompletedCheckpoint} instances.
*
* <p>After a call to this method, {@link #getLatestCheckpoint()} returns the latest
* <p>After a call to this method, {@link #getLatestCheckpoint(boolean)} returns the latest
* available checkpoint.
*/
void recover() throws Exception;
Expand All @@ -47,7 +54,33 @@ public interface CompletedCheckpointStore {
* Returns the latest {@link CompletedCheckpoint} instance or <code>null</code> if none was
* added.
*/
CompletedCheckpoint getLatestCheckpoint() throws Exception;
default CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) throws Exception {
if (getAllCheckpoints().isEmpty()) {
return null;
}

CompletedCheckpoint candidate = getAllCheckpoints().get(getAllCheckpoints().size() - 1);
if (isPreferCheckpointForRecovery && getAllCheckpoints().size() > 1) {
List<CompletedCheckpoint> allCheckpoints;
try {
allCheckpoints = getAllCheckpoints();
ListIterator<CompletedCheckpoint> listIterator = allCheckpoints.listIterator(allCheckpoints.size() - 1);
while (listIterator.hasPrevious()) {
CompletedCheckpoint prev = listIterator.previous();
if (!prev.getProperties().isSavepoint()) {
candidate = prev;
LOG.info("Found a completed checkpoint before the latest savepoint, will use it to recover!");
break;
}
}
} catch (Exception e) {
LOG.error("Method getAllCheckpoints caused exception : ", e);
throw new FlinkRuntimeException(e);
}
}

return candidate;
}

/**
* Shuts down the store.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
}
}

@Override
public CompletedCheckpoint getLatestCheckpoint() {
return checkpoints.isEmpty() ? null : checkpoints.getLast();
}

@Override
public List<CompletedCheckpoint> getAllCheckpoints() {
return new ArrayList<>(checkpoints);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,16 +243,6 @@ private void tryRemoveCompletedCheckpoint(CompletedCheckpoint completedCheckpoin
}
}

@Override
public CompletedCheckpoint getLatestCheckpoint() {
if (completedCheckpoints.isEmpty()) {
return null;
}
else {
return completedCheckpoints.peekLast();
}
}

@Override
public List<CompletedCheckpoint> getAllCheckpoints() throws Exception {
List<CompletedCheckpoint> checkpoints = new ArrayList<>(completedCheckpoints);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -524,7 +524,8 @@ public void enableCheckpointing(
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
StateBackend checkpointStateBackend,
CheckpointStatsTracker statsTracker) {
CheckpointStatsTracker statsTracker,
boolean isPreferCheckpointForRecovery) {

// simple sanity checks
checkArgument(interval >= 10, "checkpoint interval must not be below 10ms");
Expand Down Expand Up @@ -554,7 +555,8 @@ public void enableCheckpointing(
checkpointStore,
checkpointStateBackend,
ioExecutor,
SharedStateRegistry.DEFAULT_FACTORY);
SharedStateRegistry.DEFAULT_FACTORY,
isPreferCheckpointForRecovery);

// register the master hooks on the checkpoint coordinator
for (MasterTriggerRestoreHook<?> hook : masterHooks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,8 @@ public static ExecutionGraph buildGraph(
checkpointIdCounter,
completedCheckpoints,
rootBackend,
checkpointStatsTracker);
checkpointStatsTracker,
chkConfig.isPreferCheckpointForRecovery());
}

// create all the metrics for the Execution Graph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,16 @@ public class CheckpointCoordinatorConfiguration implements Serializable {
*/
private final boolean isExactlyOnce;

private final boolean isPreferCheckpointForRecovery;

public CheckpointCoordinatorConfiguration(
long checkpointInterval,
long checkpointTimeout,
long minPauseBetweenCheckpoints,
int maxConcurrentCheckpoints,
CheckpointRetentionPolicy checkpointRetentionPolicy,
boolean isExactlyOnce) {
boolean isExactlyOnce,
boolean isPerfetCheckpointForRecovery) {

// sanity checks
if (checkpointInterval < 1 || checkpointTimeout < 1 ||
Expand All @@ -74,6 +77,7 @@ public CheckpointCoordinatorConfiguration(
this.maxConcurrentCheckpoints = maxConcurrentCheckpoints;
this.checkpointRetentionPolicy = Preconditions.checkNotNull(checkpointRetentionPolicy);
this.isExactlyOnce = isExactlyOnce;
this.isPreferCheckpointForRecovery = isPerfetCheckpointForRecovery;
}

public long getCheckpointInterval() {
Expand All @@ -100,6 +104,10 @@ public boolean isExactlyOnce() {
return isExactlyOnce;
}

public boolean isPreferCheckpointForRecovery() {
return isPreferCheckpointForRecovery;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand All @@ -114,7 +122,8 @@ public boolean equals(Object o) {
minPauseBetweenCheckpoints == that.minPauseBetweenCheckpoints &&
maxConcurrentCheckpoints == that.maxConcurrentCheckpoints &&
isExactlyOnce == that.isExactlyOnce &&
checkpointRetentionPolicy == that.checkpointRetentionPolicy;
checkpointRetentionPolicy == that.checkpointRetentionPolicy &&
isPreferCheckpointForRecovery == that.isPreferCheckpointForRecovery;
}

@Override
Expand All @@ -125,7 +134,8 @@ public int hashCode() {
minPauseBetweenCheckpoints,
maxConcurrentCheckpoints,
checkpointRetentionPolicy,
isExactlyOnce);
isExactlyOnce,
isPreferCheckpointForRecovery);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ public void testFailingCompletedCheckpointStoreAdd() throws Exception {
new FailingCompletedCheckpointStore(),
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
SharedStateRegistry.DEFAULT_FACTORY,
false);

coord.triggerCheckpoint(triggerTimestamp, false);

Expand Down Expand Up @@ -142,7 +143,7 @@ public void addCheckpoint(CompletedCheckpoint checkpoint) throws Exception {
}

@Override
public CompletedCheckpoint getLatestCheckpoint() throws Exception {
public CompletedCheckpoint getLatestCheckpoint(boolean isPreferCheckpointForRecovery) throws Exception {
throw new UnsupportedOperationException("Not implemented.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ public void testHooksAreCalledOnTrigger() throws Exception {
assertEquals(0, cc.getNumberOfPendingCheckpoints());

assertEquals(1, cc.getNumberOfRetainedSuccessfulCheckpoints());
final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint();
final CompletedCheckpoint chk = cc.getCheckpointStore().getLatestCheckpoint(false);

final Collection<MasterState> masterStates = chk.getMasterHookStates();
assertEquals(2, masterStates.size());
Expand Down Expand Up @@ -435,7 +435,8 @@ private static CheckpointCoordinator instantiateCheckpointCoordinator(JobID jid,
new StandaloneCompletedCheckpointStore(10),
new MemoryStateBackend(),
Executors.directExecutor(),
SharedStateRegistry.DEFAULT_FACTORY);
SharedStateRegistry.DEFAULT_FACTORY,
false);
}

private static <T> T mockGeneric(Class<?> clazz) {
Expand Down
Loading

0 comments on commit 1dfdaa4

Please sign in to comment.