Skip to content

Commit

Permalink
[FLINK-19464][runtime] Rename CheckpointStorage interface to Checkpoi…
Browse files Browse the repository at this point in the history
…ntStorageAccess

This closes apache#13794
  • Loading branch information
sjwiesman committed Oct 27, 2020
1 parent f612388 commit d2b86a1
Show file tree
Hide file tree
Showing 40 changed files with 147 additions and 145 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointableKeyedStateBackend;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand Down Expand Up @@ -64,7 +64,7 @@ public CompletedCheckpointStorageLocation resolveCheckpoint(String externalPoint
}

@Override
public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
return backend.createCheckpointStorage(jobId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
import org.apache.flink.util.LambdaUtil;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -86,7 +86,7 @@ public void writeRecord(CheckpointMetadata metadata) throws IOException {
public void close() {}

private static CheckpointStorageLocation createSavepointLocation(Path location) throws IOException {
final CheckpointStorageLocationReference reference = AbstractFsCheckpointStorage.encodePathAsReference(location);
final CheckpointStorageLocationReference reference = AbstractFsCheckpointStorageAccess.encodePathAsReference(location);
return new FsCheckpointStorageLocation(
location.getFileSystem(),
location,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.state.CheckpointStorageWorkerView;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFinalizer;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
import org.apache.flink.streaming.api.operators.StreamOperator;
Expand All @@ -50,7 +50,7 @@ public static <OUT, OP extends StreamOperator<OUT>> TaggedOperatorSubtaskState s

CheckpointOptions options = new CheckpointOptions(
CheckpointType.SAVEPOINT,
AbstractFsCheckpointStorage.encodePathAsReference(savepointPath),
AbstractFsCheckpointStorageAccess.encodePathAsReference(savepointPath),
isExactlyOnceMode,
isUnalignedCheckpoint);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;

import java.io.DataInputStream;
import java.io.IOException;
Expand All @@ -46,7 +46,7 @@ private SavepointLoader() {}
* the path points to a location that does not seem to be a savepoint.
*/
public static CheckpointMetadata loadSavepointMetadata(String savepointPath) throws IOException {
CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorage
CompletedCheckpointStorageLocation location = AbstractFsCheckpointStorageAccess
.resolveCheckpointPointer(savepointPath);

try (DataInputStream stream = new DataInputStream(location.getMetadataHandle().openInputStream())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.filesystem.RelativeFileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
Expand Down Expand Up @@ -623,7 +623,7 @@ Path getExclusiveDirPath() throws IOException {

private static Path createExclusiveDirPath(String externalPointer) throws IOException {
try {
return AbstractFsCheckpointStorage.resolveCheckpointPointer(externalPointer).getExclusiveCheckpointDir();
return AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(externalPointer).getExclusiveCheckpointDir();
} catch (IOException e) {
throw new IOException("Could not parse external pointer as state base path", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,5 +23,5 @@
* which defined in {@link CheckpointStorageCoordinatorView}. And also implement methods acting as a worker role,
* which defined in {@link CheckpointStorageWorkerView}.
*/
public interface CheckpointStorage extends CheckpointStorageCoordinatorView, CheckpointStorageWorkerView {
public interface CheckpointStorageAccess extends CheckpointStorageCoordinatorView, CheckpointStorageWorkerView {
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@
* and lifecycle/cleanup methods.
*
* <p>CheckpointStorageLocations are typically created and initialized via
* {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or
* {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}.
* {@link CheckpointStorageAccess#initializeLocationForCheckpoint(long)} or
* {@link CheckpointStorageAccess#initializeLocationForSavepoint(long, String)}.
*/
public interface CheckpointStorageLocation extends CheckpointStreamFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@
/**
* A factory for checkpoint output streams, which are used to persist data for checkpoints.
*
* <p>Stream factories can be created from the {@link CheckpointStorage} through
* {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}.
* <p>Stream factories can be created from the {@link CheckpointStorageAccess} through
* {@link CheckpointStorageAccess#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}.
*/
public interface CheckpointStreamFactory {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public interface StateBackend extends java.io.Serializable {
*
* @throws IOException Thrown if the checkpoint storage cannot be initialized.
*/
CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException;
CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException;

// ------------------------------------------------------------------------
// Structure Backends
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
* <h1>Metadata File</h1>
*
* <p>A completed checkpoint writes its metadata into a file
* '{@value AbstractFsCheckpointStorage#METADATA_FILE_NAME}'.
* '{@value AbstractFsCheckpointStorageAccess#METADATA_FILE_NAME}'.
*/
@PublicEvolving
public abstract class AbstractFileStateBackend extends AbstractStateBackend {
Expand Down Expand Up @@ -162,7 +162,7 @@ public Path getSavepointPath() {

@Override
public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) throws IOException {
return AbstractFsCheckpointStorage.resolveCheckpointPointer(pointer);
return AbstractFsCheckpointStorageAccess.resolveCheckpointPointer(pointer);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
Expand All @@ -41,7 +41,7 @@
/**
* An implementation of durable checkpoint storage to file systems.
*/
public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {
public abstract class AbstractFsCheckpointStorageAccess implements CheckpointStorageAccess {

// ------------------------------------------------------------------------
// Constants
Expand Down Expand Up @@ -79,7 +79,7 @@ public abstract class AbstractFsCheckpointStorage implements CheckpointStorage {
* @param jobId The ID of the job that writes the checkpoints.
* @param defaultSavepointDirectory The default location for savepoints, or null, if none is set.
*/
protected AbstractFsCheckpointStorage(
protected AbstractFsCheckpointStorageAccess(
JobID jobId,
@Nullable Path defaultSavepointDirectory) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
/**
* An implementation of durable checkpoint storage to file systems.
*/
public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
public class FsCheckpointStorageAccess extends AbstractFsCheckpointStorageAccess {

private final FileSystem fileSystem;

Expand All @@ -54,7 +54,7 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {

private boolean baseLocationsInitialized = false;

public FsCheckpointStorage(
public FsCheckpointStorageAccess(
Path checkpointBaseDirectory,
@Nullable Path defaultSavepointDirectory,
JobID jobId,
Expand All @@ -69,7 +69,7 @@ public FsCheckpointStorage(
writeBufferSize);
}

public FsCheckpointStorage(
public FsCheckpointStorageAccess(
FileSystem fs,
Path checkpointBaseDirectory,
@Nullable Path defaultSavepointDirectory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public FsCheckpointStorageLocation(
// the metadata file should not have entropy in its path
Path metadataDir = EntropyInjector.removeEntropyMarkerIfPresent(fileSystem, checkpointDir);

this.metadataFilePath = new Path(metadataDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
this.metadataFilePath = new Path(metadataDir, AbstractFsCheckpointStorageAccess.METADATA_FILE_NAME);
this.fileStateSizeThreshold = fileStateSizeThreshold;
this.writeBufferSize = writeBufferSize;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand Down Expand Up @@ -491,9 +491,9 @@ public FsStateBackend configure(ReadableConfig config, ClassLoader classLoader)
// ------------------------------------------------------------------------

@Override
public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
checkNotNull(jobId, "jobId");
return new FsCheckpointStorage(
return new FsCheckpointStorageAccess(
getCheckpointPath(),
getSavepointPath(),
jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;

import javax.annotation.Nullable;
Expand All @@ -41,7 +41,7 @@
* Depending on whether this is created with a checkpoint location, the setup supports
* durable checkpoints (durable metadata) or not.
*/
public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage {
public class MemoryBackendCheckpointStorageAccess extends AbstractFsCheckpointStorageAccess {

/** The target directory for checkpoints (here metadata files only). Null, if not configured. */
@Nullable
Expand All @@ -66,7 +66,7 @@ public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage
* @throws IOException Thrown if a checkpoint base directory is given configured and the
* checkpoint directory cannot be created within that directory.
*/
public MemoryBackendCheckpointStorage(
public MemoryBackendCheckpointStorageAccess(
JobID jobId,
@Nullable Path checkpointsBaseDirectory,
@Nullable Path defaultSavepointLocation,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.BackendBuildingException;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackendBuilder;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand Down Expand Up @@ -291,8 +291,8 @@ public MemoryStateBackend configure(ReadableConfig config, ClassLoader classLoad
// ------------------------------------------------------------------------

@Override
public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath(), maxStateSize);
public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
return new MemoryBackendCheckpointStorageAccess(jobId, getCheckpointPath(), getSavepointPath(), maxStateSize);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorageAccess;
import org.apache.flink.runtime.state.filesystem.FsCheckpointMetadataOutputStream;

import java.io.IOException;
Expand Down Expand Up @@ -60,7 +60,7 @@ public PersistentMetadataCheckpointStorageLocation(

this.fileSystem = checkNotNull(fileSystem);
this.checkpointDirectory = checkNotNull(checkpointDir);
this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorageAccess.METADATA_FILE_NAME);
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@
import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
import org.apache.flink.runtime.messages.checkpoint.DeclineCheckpoint;
import org.apache.flink.runtime.state.CheckpointMetadataOutputStream;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand All @@ -53,7 +53,7 @@
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.filesystem.FileStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.memory.NonPersistentMetadataCheckpointStorageLocation;
import org.apache.flink.runtime.state.testutils.TestCompletedCheckpointStorageLocation;
Expand Down Expand Up @@ -2485,8 +2485,8 @@ public void testCompleteCheckpointFailureWithExternallyInducedSource() throws Ex

// Throw exception when finalizing the checkpoint.
@Override
public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
return new MemoryBackendCheckpointStorage(jobId, null, null, 100) {
public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
return new MemoryBackendCheckpointStorageAccess(jobId, null, null, 100) {
@Override
public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
return new NonPersistentMetadataCheckpointStorageLocation(1000) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStorageAccess;
import org.apache.flink.runtime.state.CompletedCheckpointStorageLocation;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand Down Expand Up @@ -170,8 +170,8 @@ public CompletedCheckpointStorageLocation resolveCheckpoint(String pointer) thro
}

@Override
public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
return mock(CheckpointStorage.class);
public CheckpointStorageAccess createCheckpointStorage(JobID jobId) throws IOException {
return mock(CheckpointStorageAccess.class);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorageAccess;

import org.junit.Test;

Expand All @@ -49,7 +49,7 @@ public void testPartialResultSubpartitionStateWrite() throws Exception {
}

private void testBuffersRecycled(Function<NetworkBuffer[], ChannelStateWriteRequest> requestBuilder) throws Exception {
ChannelStateWriteRequestDispatcher dispatcher = new ChannelStateWriteRequestDispatcherImpl(new MemoryBackendCheckpointStorage(new JobID(), null, null, 1), new ChannelStateSerializerImpl());
ChannelStateWriteRequestDispatcher dispatcher = new ChannelStateWriteRequestDispatcherImpl(new MemoryBackendCheckpointStorageAccess(new JobID(), null, null, 1), new ChannelStateSerializerImpl());
ChannelStateWriteResult result = new ChannelStateWriteResult();
dispatcher.dispatch(ChannelStateWriteRequest.start(1L, result, CheckpointStorageLocationReference.getDefault()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import org.apache.flink.runtime.scheduler.SchedulerTestingUtils;
import org.apache.flink.runtime.scheduler.strategy.ExecutionVertexID;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestingCheckpointStorageCoordinatorView;
import org.apache.flink.runtime.state.TestingCheckpointStorageAccessCoordinatorView;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.runtime.taskexecutor.TaskExecutorOperatorEventGateway;
import org.apache.flink.runtime.testtasks.NoOpInvokable;
Expand Down Expand Up @@ -438,7 +438,7 @@ private DefaultScheduler createSchedulerWithRestoredSavepoint(byte[] coordinator
final byte[] savepointMetadata = serializeAsCheckpointMetadata(testOperatorId, coordinatorState);
final String savepointPointer = "testingSavepointPointer";

final TestingCheckpointStorageCoordinatorView storage = new TestingCheckpointStorageCoordinatorView();
final TestingCheckpointStorageAccessCoordinatorView storage = new TestingCheckpointStorageAccessCoordinatorView();
storage.registerSavepoint(savepointPointer, savepointMetadata);

final Consumer<JobGraph> savepointConfigurer = (jobGraph) -> {
Expand Down
Loading

0 comments on commit d2b86a1

Please sign in to comment.