Skip to content

Commit

Permalink
[FLINK-19465][runtime / statebackends] Wire checkpoint storage throug…
Browse files Browse the repository at this point in the history
…h the runtime
  • Loading branch information
sjwiesman committed Jan 25, 2021
1 parent 45cba06 commit 947a02c
Show file tree
Hide file tree
Showing 22 changed files with 304 additions and 46 deletions.
3 changes: 3 additions & 0 deletions flink-runtime-web/src/test/resources/rest_api_v1.snapshot
Original file line number Diff line number Diff line change
Expand Up @@ -1240,6 +1240,9 @@
"state_backend" : {
"type" : "string"
},
"checkpoint_storage" : {
"type" : "string"
},
"unaligned_checkpoints" : {
"type" : "boolean"
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
import org.apache.flink.runtime.operators.coordination.OperatorInfo;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageCoordinatorView;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
Expand Down Expand Up @@ -139,7 +140,7 @@ public class CheckpointCoordinator {
* The root checkpoint state backend, which is responsible for initializing the checkpoint,
* storing the metadata, and cleaning up the checkpoint.
*/
private final CheckpointStorageCoordinatorView checkpointStorage;
private final CheckpointStorageCoordinatorView checkpointStorageView;

/** A list of recent checkpoint IDs, to identify late messages (vs invalid ones). */
private final ArrayDeque<Long> recentPendingCheckpoints;
Expand Down Expand Up @@ -225,6 +226,46 @@ public class CheckpointCoordinator {

// --------------------------------------------------------------------------------------------

public CheckpointCoordinator(
JobID job,
CheckpointCoordinatorConfiguration chkConfig,
ExecutionVertex[] tasksToTrigger,
ExecutionVertex[] tasksToWaitFor,
ExecutionVertex[] tasksToCommitTo,
Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
CheckpointStorage checkpointStorage,
Executor executor,
CheckpointsCleaner checkpointsCleaner,
ScheduledExecutor timer,
SharedStateRegistryFactory sharedStateRegistryFactory,
CheckpointFailureManager failureManager) {

this(
job,
chkConfig,
tasksToTrigger,
tasksToWaitFor,
tasksToCommitTo,
coordinatorsToCheckpoint,
checkpointIDCounter,
completedCheckpointStore,
null,
executor,
checkpointsCleaner,
timer,
sharedStateRegistryFactory,
failureManager,
SystemClock.getInstance());
}

/**
* @deprecated Please pass a {@link CheckpointStorage} object directly. This constructor only
* exists for to keep coordinator tests passing and may be dropped once concrete checkpoint
* storage classes are implemented.
*/
@Deprecated
public CheckpointCoordinator(
JobID job,
CheckpointCoordinatorConfiguration chkConfig,
Expand All @@ -250,7 +291,7 @@ public CheckpointCoordinator(
coordinatorsToCheckpoint,
checkpointIDCounter,
completedCheckpointStore,
checkpointStateBackend,
asCheckpointStorage(checkpointStateBackend),
executor,
checkpointsCleaner,
timer,
Expand All @@ -269,7 +310,7 @@ public CheckpointCoordinator(
Collection<OperatorCoordinatorCheckpointContext> coordinatorsToCheckpoint,
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore completedCheckpointStore,
StateBackend checkpointStateBackend,
CheckpointStorage checkpointStorage,
Executor executor,
CheckpointsCleaner checkpointsCleaner,
ScheduledExecutor timer,
Expand All @@ -278,7 +319,7 @@ public CheckpointCoordinator(
Clock clock) {

// sanity checks
checkNotNull(checkpointStateBackend);
checkNotNull(checkpointStorage);

// max "in between duration" can be one year - this is to prevent numeric overflows
long minPauseBetweenCheckpoints = chkConfig.getMinPauseBetweenCheckpoints();
Expand Down Expand Up @@ -325,8 +366,8 @@ public CheckpointCoordinator(
CheckpointProperties.forCheckpoint(chkConfig.getCheckpointRetentionPolicy());

try {
this.checkpointStorage = checkpointStateBackend.createCheckpointStorage(job);
checkpointStorage.initializeBaseLocations();
this.checkpointStorageView = checkpointStorage.createCheckpointStorage(job);
checkpointStorageView.initializeBaseLocations();
} catch (IOException e) {
throw new FlinkRuntimeException(
"Failed to create checkpoint storage at checkpoint coordinator side.", e);
Expand Down Expand Up @@ -683,9 +724,9 @@ private CompletableFuture<CheckpointIdAndStorageLocation> initializeCheckpoint(

CheckpointStorageLocation checkpointStorageLocation =
props.isSavepoint()
? checkpointStorage.initializeLocationForSavepoint(
? checkpointStorageView.initializeLocationForSavepoint(
checkpointID, externalSavepointLocation)
: checkpointStorage.initializeLocationForCheckpoint(
: checkpointStorageView.initializeLocationForCheckpoint(
checkpointID);

return new CheckpointIdAndStorageLocation(
Expand Down Expand Up @@ -1627,7 +1668,7 @@ public boolean restoreSavepoint(
(allowNonRestored ? "allowing non restored state" : ""));

final CompletedCheckpointStorageLocation checkpointLocation =
checkpointStorage.resolveCheckpoint(savepointPointer);
checkpointStorageView.resolveCheckpoint(savepointPointer);

// Load the savepoint as a checkpoint into the system
CompletedCheckpoint savepoint =
Expand Down Expand Up @@ -1682,7 +1723,7 @@ public List<CompletedCheckpoint> getSuccessfulCheckpoints() throws Exception {
}

public CheckpointStorageCoordinatorView getCheckpointStorage() {
return checkpointStorage;
return checkpointStorageView;
}

public CompletedCheckpointStore getCheckpointStore() {
Expand Down Expand Up @@ -2108,4 +2149,13 @@ private enum OperatorCoordinatorRestoreBehavior {
/** Coordinators are not restored during this checkpoint restore. */
SKIP;
}

private static CheckpointStorage asCheckpointStorage(StateBackend backend) {
if (backend instanceof CheckpointStorage) {
return (CheckpointStorage) backend;
}

throw new IllegalStateException(
"Provided state backend does not implement checkpoint storage");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLoader;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
Expand Down Expand Up @@ -231,15 +233,15 @@ private static void throwNonRestoredStateException(
// ------------------------------------------------------------------------

public static void disposeSavepoint(
String pointer, StateBackend stateBackend, ClassLoader classLoader)
String pointer, CheckpointStorage checkpointStorage, ClassLoader classLoader)
throws IOException, FlinkException {

checkNotNull(pointer, "location");
checkNotNull(stateBackend, "stateBackend");
checkNotNull(checkpointStorage, "stateBackend");
checkNotNull(classLoader, "classLoader");

final CompletedCheckpointStorageLocation checkpointLocation =
stateBackend.resolveCheckpoint(pointer);
checkpointStorage.resolveCheckpoint(pointer);

final StreamStateHandle metadataHandle = checkpointLocation.getMetadataHandle();

Expand Down Expand Up @@ -293,9 +295,9 @@ public static void disposeSavepoint(
checkNotNull(configuration, "configuration");
checkNotNull(classLoader, "classLoader");

StateBackend backend = loadStateBackend(configuration, classLoader, logger);
CheckpointStorage storage = loadCheckpointStorage(configuration, classLoader, logger);

disposeSavepoint(pointer, backend, classLoader);
disposeSavepoint(pointer, storage, classLoader);
}

@Nonnull
Expand Down Expand Up @@ -331,6 +333,30 @@ public static StateBackend loadStateBackend(
return backend;
}

@Nonnull
public static CheckpointStorage loadCheckpointStorage(
Configuration configuration, ClassLoader classLoader, @Nullable Logger logger) {
StateBackend backend = loadStateBackend(configuration, classLoader, logger);

if (logger != null) {
logger.info("Attempting to load configured checkpoint storage for savepoint disposal");
}

CheckpointStorage checkpointStorage = null;
try {
checkpointStorage =
CheckpointStorageLoader.load(null, backend, configuration, classLoader, null);
} catch (Throwable t) {
// catches exceptions and errors (like linking errors)
if (logger != null) {
logger.info("Could not load configured state backend.");
logger.debug("Detailed exception:", t);
}
}

return checkpointStorage;
}

// ------------------------------------------------------------------------

/** This class contains only static utility methods and is not meant to be instantiated. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,4 +172,11 @@ public interface AccessExecutionGraph extends JobStatusProvider {
* @return The state backend name, or an empty Optional in the case of batch jobs
*/
Optional<String> getStateBackendName();

/**
* Returns the checkpoint storage name for this ExecutionGraph.
*
* @return The checkpoint storage name, or an empty Optional in the case of batch jobs
*/
Optional<String> getCheckpointStorageName();
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ public class ArchivedExecutionGraph implements AccessExecutionGraph, Serializabl

@Nullable private final String stateBackendName;

@Nullable private final String checkpointStorageName;

public ArchivedExecutionGraph(
JobID jobID,
String jobName,
Expand All @@ -110,7 +112,8 @@ public ArchivedExecutionGraph(
boolean isStoppable,
@Nullable CheckpointCoordinatorConfiguration jobCheckpointingConfiguration,
@Nullable CheckpointStatsSnapshot checkpointStatsSnapshot,
@Nullable String stateBackendName) {
@Nullable String stateBackendName,
@Nullable String checkpointStorageName) {

this.jobID = Preconditions.checkNotNull(jobID);
this.jobName = Preconditions.checkNotNull(jobName);
Expand All @@ -127,6 +130,7 @@ public ArchivedExecutionGraph(
this.jobCheckpointingConfiguration = jobCheckpointingConfiguration;
this.checkpointStatsSnapshot = checkpointStatsSnapshot;
this.stateBackendName = stateBackendName;
this.checkpointStorageName = checkpointStorageName;
}

// --------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -257,6 +261,11 @@ public Optional<String> getStateBackendName() {
return Optional.ofNullable(stateBackendName);
}

@Override
public Optional<String> getCheckpointStorageName() {
return Optional.ofNullable(checkpointStorageName);
}

class AllVerticesIterator implements Iterator<ArchivedExecutionVertex> {

private final Iterator<ArchivedExecutionJobVertex> jobVertices;
Expand Down Expand Up @@ -346,7 +355,8 @@ public static ArchivedExecutionGraph createFrom(ExecutionGraph executionGraph) {
executionGraph.isStoppable(),
executionGraph.getCheckpointCoordinatorConfiguration(),
executionGraph.getCheckpointStatsSnapshot(),
executionGraph.getStateBackendName().orElse(null));
executionGraph.getStateBackendName().orElse(null),
executionGraph.getCheckpointStorageName().orElse(null));
}

/**
Expand Down Expand Up @@ -395,6 +405,7 @@ public static ArchivedExecutionGraph createFromInitializingJob(
false,
null,
null,
null,
null);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import org.apache.flink.runtime.scheduler.strategy.SchedulingResultPartition;
import org.apache.flink.runtime.scheduler.strategy.SchedulingTopology;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
Expand Down Expand Up @@ -266,6 +267,8 @@ public class ExecutionGraph implements AccessExecutionGraph {
// ------ Fields that are only relevant for archived execution graphs ------------
@Nullable private String stateBackendName;

@Nullable private String checkpointStorageName;

private String jsonPlan;

/** Shuffle master to register partitions for task deployment. */
Expand Down Expand Up @@ -388,6 +391,11 @@ public Optional<String> getStateBackendName() {
return Optional.ofNullable(stateBackendName);
}

@Override
public Optional<String> getCheckpointStorageName() {
return Optional.ofNullable(checkpointStorageName);
}

public void enableCheckpointing(
CheckpointCoordinatorConfiguration chkConfig,
List<ExecutionJobVertex> verticesToTrigger,
Expand All @@ -397,6 +405,7 @@ public void enableCheckpointing(
CheckpointIDCounter checkpointIDCounter,
CompletedCheckpointStore checkpointStore,
StateBackend checkpointStateBackend,
CheckpointStorage checkpointStorage,
CheckpointStatsTracker statsTracker,
CheckpointsCleaner checkpointsCleaner) {

Expand Down Expand Up @@ -478,6 +487,7 @@ public void failJobDueToTaskFailure(
}

this.stateBackendName = checkpointStateBackend.getClass().getSimpleName();
this.checkpointStorageName = checkpointStorageName.getClass().getSimpleName();
}

@Nullable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@
import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
import org.apache.flink.runtime.jobgraph.tasks.JobCheckpointingSettings;
import org.apache.flink.runtime.shuffle.ShuffleMaster;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageLoader;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
import org.apache.flink.util.DynamicCodeLoadingException;
Expand Down Expand Up @@ -241,6 +243,39 @@ public static ExecutionGraph buildGraph(
jobId, "Could not instantiate configured state backend", e);
}

// load the checkpoint storage from the application settings
final CheckpointStorage applicationConfiguredStorage;
final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
snapshotSettings.getDefaultCheckpointStorage();

if (serializedAppConfiguredStorage == null) {
applicationConfiguredStorage = null;
} else {
try {
applicationConfiguredStorage =
serializedAppConfiguredStorage.deserializeValue(classLoader);
} catch (IOException | ClassNotFoundException e) {
throw new JobExecutionException(
jobId,
"Could not deserialize application-defined checkpoint storage.",
e);
}
}

final CheckpointStorage rootStorage;
try {
rootStorage =
CheckpointStorageLoader.load(
applicationConfiguredStorage,
rootBackend,
jobManagerConfig,
classLoader,
log);
} catch (IllegalConfigurationException | DynamicCodeLoadingException e) {
throw new JobExecutionException(
jobId, "Could not instantiate configured checkpoint storage", e);
}

// instantiate the user-defined checkpoint hooks

final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
Expand Down Expand Up @@ -284,6 +319,7 @@ public static ExecutionGraph buildGraph(
checkpointIdCounter,
completedCheckpointStore,
rootBackend,
rootStorage,
checkpointStatsTracker,
checkpointsCleaner);
}
Expand Down
Loading

0 comments on commit 947a02c

Please sign in to comment.