diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
index 072f3a7dc65d3..5c389485119c5 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java
@@ -29,7 +29,6 @@
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -197,11 +196,11 @@ public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheck
* here where the snapshots from RocksDB would be stored.
*
*
The snapshots of the RocksDB state will be stored using the given backend's
- * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
+ * {@link StateBackend#createCheckpointStorage(JobID)}.
*
* @param checkpointStreamBackend The backend write the checkpoint streams to.
*/
- public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
+ public RocksDBStateBackend(StateBackend checkpointStreamBackend) {
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
}
@@ -211,11 +210,28 @@ public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
* here where the snapshots from RocksDB would be stored.
*
*
The snapshots of the RocksDB state will be stored using the given backend's
- * {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
+ * {@link StateBackend#createCheckpointStorage(JobID)}.
*
* @param checkpointStreamBackend The backend write the checkpoint streams to.
* @param enableIncrementalCheckpointing True if incremental checkpointing is enabled.
*/
+ public RocksDBStateBackend(StateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) {
+ this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
+ this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
+ }
+
+ /**
+ * @deprecated Use {@link #RocksDBStateBackend(StateBackend)} instead.
+ */
+ @Deprecated
+ public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
+ this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
+ }
+
+ /**
+ * @deprecated Use {@link #RocksDBStateBackend(StateBackend, boolean)} instead.
+ */
+ @Deprecated
public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) {
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
@@ -367,20 +383,6 @@ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException
return checkpointStreamBackend.createCheckpointStorage(jobId);
}
- @Override
- public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
- return checkpointStreamBackend.createStreamFactory(jobId, operatorIdentifier);
- }
-
- @Override
- public CheckpointStreamFactory createSavepointStreamFactory(
- JobID jobId,
- String operatorIdentifier,
- String targetLocation) throws IOException {
-
- return checkpointStreamBackend.createSavepointStreamFactory(jobId, operatorIdentifier, targetLocation);
- }
-
// ------------------------------------------------------------------------
// State holding data structures
// ------------------------------------------------------------------------
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
index c28854dee3db5..353dd93cc4c5b 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java
@@ -25,7 +25,6 @@
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.functions.KeySelector;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
@@ -39,15 +38,20 @@
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
+import org.apache.flink.runtime.state.StateBackend;
+import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.testutils.BackendForTestStream;
+import org.apache.flink.runtime.state.testutils.BackendForTestStream.StreamFactory;
+import org.apache.flink.runtime.state.testutils.TestCheckpointStreamFactory;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -82,13 +86,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyInt;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
/**
@@ -103,7 +101,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
* Temporary fold for test.
*/
@Rule
- public TemporaryFolder temporaryFolder = new TemporaryFolder();
+ public final TemporaryFolder temporaryFolder = new TemporaryFolder();
/**
* This ensures that asynchronous state handles are actually materialized asynchronously.
@@ -244,19 +242,13 @@ public void testCancelFullyAsyncCheckpoints() throws Exception {
testHarness.setupOutputForSingletonOperatorChain();
- testHarness.configureForKeyedStream(new KeySelector() {
- @Override
- public String getKey(String value) throws Exception {
- return value;
- }
- }, BasicTypeInfo.STRING_TYPE_INFO);
+ testHarness.configureForKeyedStream(value -> value, BasicTypeInfo.STRING_TYPE_INFO);
StreamConfig streamConfig = testHarness.getStreamConfig();
File dbDir = temporaryFolder.newFolder();
- BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend();
-
+ // this is the proper instance that we need to call.
BlockerCheckpointStreamFactory blockerCheckpointStreamFactory =
new BlockerCheckpointStreamFactory(4 * 1024 * 1024) {
@@ -278,9 +270,12 @@ public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointS
}
};
- BlockingStreamMemoryStateBackend.blockerCheckpointStreamFactory = blockerCheckpointStreamFactory;
+ // to avoid serialization of the above factory instance, we need to pass it in
+ // through a static variable
+
+ StateBackend stateBackend = new BackendForTestStream(new StaticForwardFactory(blockerCheckpointStreamFactory));
- RocksDBStateBackend backend = new RocksDBStateBackend(memoryStateBackend);
+ RocksDBStateBackend backend = new RocksDBStateBackend(stateBackend);
backend.setDbStoragePath(dbDir.getAbsolutePath());
streamConfig.setStateBackend(backend);
@@ -354,17 +349,10 @@ public void testCleanupOfSnapshotsInFailureCase() throws Exception {
Environment env = new DummyEnvironment("test task", 1, 0);
- CheckpointStreamFactory.CheckpointStateOutputStream outputStream = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
- CheckpointStreamFactory checkpointStreamFactory = mock(CheckpointStreamFactory.class);
- AbstractStateBackend stateBackend = mock(AbstractStateBackend.class);
-
final IOException testException = new IOException("Test exception");
+ CheckpointStateOutputStream outputStream = spy(new FailingStream(testException));
- doReturn(checkpointStreamFactory).when(stateBackend).createStreamFactory(any(JobID.class), anyString());
- doThrow(testException).when(outputStream).write(anyInt());
- doReturn(outputStream).when(checkpointStreamFactory).createCheckpointStateOutputStream(eq(checkpointId), eq(timestamp));
-
- RocksDBStateBackend backend = new RocksDBStateBackend(stateBackend);
+ RocksDBStateBackend backend = new RocksDBStateBackend((StateBackend) new MemoryStateBackend());
backend.setDbStoragePath(temporaryFolder.newFolder().toURI().toString());
@@ -388,7 +376,9 @@ public void testCleanupOfSnapshotsInFailureCase() throws Exception {
new ValueStateDescriptor<>("foobar", String.class));
RunnableFuture snapshotFuture = keyedStateBackend.snapshot(
- checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
+ checkpointId, timestamp,
+ new TestCheckpointStreamFactory(() -> outputStream),
+ CheckpointOptions.forCheckpointWithDefaultLocation());
try {
FutureUtil.runIfNotDoneAndGet(snapshotFuture);
@@ -428,25 +418,6 @@ public void testConsistentSnapshotSerializationFlagsAndMasks() {
// ------------------------------------------------------------------------
- /**
- * Creates us a CheckpointStateOutputStream that blocks write ops on a latch to delay writing of snapshots.
- */
- static class BlockingStreamMemoryStateBackend extends MemoryStateBackend {
-
- public static volatile BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = null;
-
- @Override
- public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
- return blockerCheckpointStreamFactory;
- }
-
- @Override
- public BlockingStreamMemoryStateBackend configure(Configuration config) {
- // retain this instance, no re-configuration!
- return this;
- }
- }
-
private static class AsyncCheckpointOperator
extends AbstractStreamOperator
implements OneInputStreamOperator {
@@ -476,4 +447,61 @@ public void processElement(StreamRecord element) throws Exception {
state.update(element.getValue());
}
}
+
+ // ------------------------------------------------------------------------
+ // failing stream
+ // ------------------------------------------------------------------------
+
+ private static class StaticForwardFactory implements StreamFactory {
+
+ static CheckpointStreamFactory factory;
+
+ StaticForwardFactory(CheckpointStreamFactory factory) {
+ StaticForwardFactory.factory = factory;
+ }
+
+ @Override
+ public CheckpointStateOutputStream get() throws Exception {
+ return factory.createCheckpointStateOutputStream(1L, 1L);
+ }
+ }
+
+ private static class FailingStream extends CheckpointStateOutputStream {
+
+ private final IOException testException;
+
+ FailingStream(IOException testException) {
+ this.testException = testException;
+ }
+
+ @Override
+ public StreamStateHandle closeAndGetHandle() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public long getPos() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ throw testException;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ throw testException;
+ }
+
+ @Override
+ public void sync() throws IOException {
+ throw testException;
+ }
+
+ @Override
+ public void close() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
}
diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
index eace5ff0c3c49..3a39ba0f8b863 100644
--- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
+++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
@@ -32,6 +32,7 @@
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.IOUtils;
@@ -55,11 +56,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
-import static org.mockito.Mockito.any;
-import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
/**
@@ -379,13 +376,9 @@ public void testRocksDbReconfigurationCopiesExistingValues() throws Exception {
@Test
public void testCallsForwardedToNonPartitionedBackend() throws Exception {
- AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class);
- RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(nonPartBackend);
-
- Environment env = getMockEnvironment();
- rocksDbBackend.createStreamFactory(env.getJobID(), "foobar");
-
- verify(nonPartBackend, times(1)).createStreamFactory(any(JobID.class), anyString());
+ AbstractStateBackend storageBackend = new MemoryStateBackend();
+ RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(storageBackend);
+ assertEquals(storageBackend, rocksDbBackend.getCheckpointBackend());
}
// ------------------------------------------------------------------------
diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
index 5c7db979f4d18..255709ad028cd 100644
--- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
+++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/FileStateBackendTest.java
@@ -133,8 +133,11 @@ public void testStateOutputStream() {
try {
FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(basePath, 15));
JobID jobId = new JobID();
+ long checkpointId = 97231523452L;
- CheckpointStreamFactory streamFactory = backend.createStreamFactory(jobId, "test_op");
+ CheckpointStreamFactory streamFactory = backend
+ .createCheckpointStorage(jobId)
+ .initializeLocationForCheckpoint(checkpointId);
// we know how FsCheckpointStreamFactory is implemented so we know where it
// will store checkpoints
@@ -151,8 +154,6 @@ public void testStateOutputStream() {
rnd.nextBytes(state3);
rnd.nextBytes(state4);
- long checkpointId = 97231523452L;
-
CheckpointStreamFactory.CheckpointStateOutputStream stream1 =
streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
CheckpointStreamFactory.CheckpointStateOutputStream stream2 =
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
index c72f012b236d4..7e9c35786bf29 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/AbstractStateBackend.java
@@ -24,7 +24,6 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import javax.annotation.Nullable;
import java.io.IOException;
/**
@@ -37,21 +36,6 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri
private static final long serialVersionUID = 4620415814639230247L;
- // ------------------------------------------------------------------------
- // State Backend - Persisting Byte Storage
- // ------------------------------------------------------------------------
-
- @Override
- public abstract CheckpointStreamFactory createStreamFactory(
- JobID jobId,
- String operatorIdentifier) throws IOException;
-
- @Override
- public abstract CheckpointStreamFactory createSavepointStreamFactory(
- JobID jobId,
- String operatorIdentifier,
- @Nullable String targetLocation) throws IOException;
-
// ------------------------------------------------------------------------
// State Backend - State-Holding Backends
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
index ea1015a873c7a..4a1e906d9512e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorage.java
@@ -18,6 +18,8 @@
package org.apache.flink.runtime.state;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+
import javax.annotation.Nullable;
import java.io.IOException;
@@ -32,8 +34,8 @@ public interface CheckpointStorage {
/**
* Checks whether this backend supports highly available storage of data.
*
- * Some state backends may offer support for that with default settings, which makes them
- * suitable for zero-config prototyping, but not for actual production setups.
+ *
Some state backends may not support highly-available durable storage, with default settings,
+ * which makes them suitable for zero-config prototyping, but not for actual production setups.
*/
boolean supportsHighlyAvailableStorage();
@@ -90,4 +92,44 @@ public interface CheckpointStorage {
CheckpointStorageLocation initializeLocationForSavepoint(
long checkpointId,
@Nullable String externalLocationPointer) throws IOException;
+
+ /**
+ * Resolves a storage location reference into a CheckpointStreamFactory.
+ *
+ *
The reference may be the {@link CheckpointStorageLocationReference#isDefaultReference() default reference},
+ * in which case the method should return the default location, taking existing configuration
+ * and checkpoint ID into account.
+ *
+ * @param checkpointId The ID of the checkpoint that the location is initialized for.
+ * @param reference The checkpoint location reference.
+ *
+ * @return A checkpoint storage location reflecting the reference and checkpoint ID.
+ *
+ * @throws IOException Thrown, if the storage location cannot be initialized from the reference.
+ */
+ CheckpointStreamFactory resolveCheckpointStorageLocation(
+ long checkpointId,
+ CheckpointStorageLocationReference reference) throws IOException;
+
+ /**
+ * Opens a stream to persist checkpoint state data that is owned strictly by tasks and
+ * not attached to the life cycle of a specific checkpoint.
+ *
+ *
This method should be used when the persisted data cannot be immediately dropped once
+ * the checkpoint that created it is dropped. Examples are write-ahead-logs.
+ * For those, the state can only be dropped once the data has been moved to the target system,
+ * which may sometimes take longer than one checkpoint (if the target system is temporarily unable
+ * to keep up).
+ *
+ *
The fact that the job manager does not own the life cycle of this type of state means also
+ * that it is strictly the responsibility of the tasks to handle the cleanup of this data.
+ *
+ *
Developer note: In the future, we may be able to make this a special case of "shared state",
+ * where the task re-emits the shared state reference as long as it needs to hold onto the
+ * persisted state data.
+ *
+ * @return A checkpoint state stream to the location for state owned by tasks.
+ * @throws IOException Thrown, if the stream cannot be opened.
+ */
+ CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException;
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
index aeb4b14257a28..246717a2937e1 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStorageLocation.java
@@ -18,16 +18,17 @@
package org.apache.flink.runtime.state;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-
import java.io.IOException;
/**
- * A storage location for one particular checkpoint. This location is typically
- * created and initialized via {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or
+ * A storage location for one particular checkpoint, offering data persistent, metadata persistence,
+ * and lifecycle/cleanup methods.
+ *
+ *
CheckpointStorageLocations are typically created and initialized via
+ * {@link CheckpointStorage#initializeLocationForCheckpoint(long)} or
* {@link CheckpointStorage#initializeLocationForSavepoint(long, String)}.
*/
-public interface CheckpointStorageLocation {
+public interface CheckpointStorageLocation extends CheckpointStreamFactory {
/**
* Creates the output stream to persist the checkpoint metadata to.
@@ -49,6 +50,8 @@ public interface CheckpointStorageLocation {
/**
* Disposes the checkpoint location in case the checkpoint has failed.
+ * This method disposes all the data at that location, not just the data written
+ * by the particular node or process that calls this method.
*/
void disposeOnFailure() throws IOException;
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
index c94b0e4be2f61..770414081a061 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/CheckpointStreamFactory.java
@@ -23,6 +23,12 @@
import java.io.IOException;
import java.io.OutputStream;
+/**
+ * A factory for checkpoint output streams, which are used to persist data for checkpoints.
+ *
+ *
Stream factories can be created from the {@link CheckpointStorage} through
+ * {@link CheckpointStorage#resolveCheckpointStorageLocation(long, CheckpointStorageLocationReference)}.
+ */
public interface CheckpointStreamFactory {
/**
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
index 3d3fda25f3e0c..8d17b6420f35f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/StateBackend.java
@@ -24,8 +24,6 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
-import javax.annotation.Nullable;
-
import java.io.IOException;
/**
@@ -112,44 +110,6 @@ public interface StateBackend extends java.io.Serializable {
*/
CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException;
- // ------------------------------------------------------------------------
- // Persistent bytes storage for checkpoint data
- // ------------------------------------------------------------------------
-
- /**
- * Creates a {@link CheckpointStreamFactory} that can be used to create streams
- * that should end up in a checkpoint.
- *
- *
NOTE: This method will probably go into the {@link CheckpointStorage} in the future.
- *
- * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
- * @param operatorIdentifier An identifier of the operator for which we create streams.
- */
- CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException;
-
- /**
- * Creates a {@link CheckpointStreamFactory} that can be used to create streams
- * that should end up in a savepoint.
- *
- *
This is only called if the triggered checkpoint is a savepoint. Commonly
- * this will return the same factory as for regular checkpoints, but maybe
- * slightly adjusted.
- *
- *
NOTE: This method will probably go into the {@link CheckpointStorage} in the future.
- *
- * @param jobId The {@link JobID} of the job for which we are creating checkpoint streams.
- * @param operatorIdentifier An identifier of the operator for which we create streams.
- * @param targetLocation An optional custom location for the savepoint stream.
- *
- * @return The stream factory for savepoints.
- *
- * @throws IOException Failures during stream creation are forwarded.
- */
- CheckpointStreamFactory createSavepointStreamFactory(
- JobID jobId,
- String operatorIdentifier,
- @Nullable String targetLocation) throws IOException;
-
// ------------------------------------------------------------------------
// Structure Backends
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
index b344cfd724f8a..054e636250472 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/AbstractFsCheckpointStorage.java
@@ -23,6 +23,7 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.util.FileUtils;
@@ -124,7 +125,7 @@ public StreamStateHandle resolveCheckpoint(String checkpointPointer) throws IOEx
* @throws IOException Thrown if the target directory could not be created.
*/
@Override
- public FsCheckpointStorageLocation initializeLocationForSavepoint(
+ public CheckpointStorageLocation initializeLocationForSavepoint(
@SuppressWarnings("unused") long checkpointId,
@Nullable String externalLocationPointer) throws IOException {
@@ -155,9 +156,7 @@ else if (defaultSavepointDirectory != null) {
// we make the path qualified, to make it independent of default schemes and authorities
final Path qp = path.makeQualified(fs);
- final CheckpointStorageLocationReference reference = encodePathAsReference(qp);
-
- return new FsCheckpointStorageLocation(fs, qp, qp, qp, reference);
+ return createSavepointLocation(fs, qp);
}
} catch (Exception e) {
latestException = e;
@@ -167,6 +166,8 @@ else if (defaultSavepointDirectory != null) {
throw new IOException("Failed to create savepoint directory at " + savepointBasePath, latestException);
}
+ protected abstract CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException;
+
// ------------------------------------------------------------------------
// Creating and resolving paths
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
index 08159d59db87b..aa0c6b719c6d7 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorage.java
@@ -23,6 +23,9 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
import javax.annotation.Nullable;
@@ -43,17 +46,23 @@ public class FsCheckpointStorage extends AbstractFsCheckpointStorage {
private final Path taskOwnedStateDirectory;
+ private final int fileSizeThreshold;
+
public FsCheckpointStorage(
Path checkpointBaseDirectory,
@Nullable Path defaultSavepointDirectory,
- JobID jobId) throws IOException {
+ JobID jobId,
+ int fileSizeThreshold) throws IOException {
super(jobId, defaultSavepointDirectory);
+ checkArgument(fileSizeThreshold >= 0);
+
this.fileSystem = checkpointBaseDirectory.getFileSystem();
this.checkpointsDirectory = getCheckpointDirectoryForJob(checkpointBaseDirectory, jobId);
this.sharedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_SHARED_STATE_DIR);
this.taskOwnedStateDirectory = new Path(checkpointsDirectory, CHECKPOINT_TASK_OWNED_STATE_DIR);
+ this.fileSizeThreshold = fileSizeThreshold;
// initialize the dedicated directories
fileSystem.mkdirs(checkpointsDirectory);
@@ -81,7 +90,57 @@ public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpoint
fileSystem.mkdirs(checkpointDir);
return new FsCheckpointStorageLocation(
- fileSystem, checkpointDir, sharedStateDirectory, taskOwnedStateDirectory,
- CheckpointStorageLocationReference.getDefault());
+ fileSystem,
+ checkpointDir,
+ sharedStateDirectory,
+ taskOwnedStateDirectory,
+ CheckpointStorageLocationReference.getDefault(),
+ fileSizeThreshold);
+ }
+
+ @Override
+ public CheckpointStreamFactory resolveCheckpointStorageLocation(
+ long checkpointId,
+ CheckpointStorageLocationReference reference) throws IOException {
+
+ if (reference.isDefaultReference()) {
+ // default reference, construct the default location for that particular checkpoint
+ final Path checkpointDir = createCheckpointDirectory(checkpointsDirectory, checkpointId);
+
+ return new FsCheckpointStorageLocation(
+ fileSystem,
+ checkpointDir,
+ sharedStateDirectory,
+ taskOwnedStateDirectory,
+ reference,
+ fileSizeThreshold);
+ }
+ else {
+ // location encoded in the reference
+ final Path path = decodePathFromReference(reference);
+
+ return new FsCheckpointStorageLocation(
+ fileSystem,
+ path,
+ path,
+ path,
+ reference,
+ fileSizeThreshold);
+ }
+ }
+
+ @Override
+ public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
+ return new FsCheckpointStateOutputStream(
+ taskOwnedStateDirectory,
+ fileSystem,
+ FsCheckpointStreamFactory.DEFAULT_WRITE_BUFFER_SIZE,
+ fileSizeThreshold);
+ }
+
+ @Override
+ protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException {
+ final CheckpointStorageLocationReference reference = encodePathAsReference(location);
+ return new FsCheckpointStorageLocation(fs, location, location, location, reference, fileSizeThreshold);
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
index 94102870343ff..92d807dc27d81 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStorageLocation.java
@@ -22,16 +22,16 @@
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import java.io.IOException;
+import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A storage location for checkpoints on a file system.
*/
-public class FsCheckpointStorageLocation implements CheckpointStorageLocation {
+public class FsCheckpointStorageLocation extends FsCheckpointStreamFactory implements CheckpointStorageLocation {
private final FileSystem fileSystem;
@@ -45,12 +45,19 @@ public class FsCheckpointStorageLocation implements CheckpointStorageLocation {
private final CheckpointStorageLocationReference reference;
+ private final int fileStateSizeThreshold;
+
public FsCheckpointStorageLocation(
FileSystem fileSystem,
Path checkpointDir,
Path sharedStateDir,
Path taskOwnedStateDir,
- CheckpointStorageLocationReference reference) {
+ CheckpointStorageLocationReference reference,
+ int fileStateSizeThreshold) {
+
+ super(fileSystem, checkpointDir, fileStateSizeThreshold);
+
+ checkArgument(fileStateSizeThreshold >= 0);
this.fileSystem = checkNotNull(fileSystem);
this.checkpointDirectory = checkNotNull(checkpointDir);
@@ -59,6 +66,7 @@ public FsCheckpointStorageLocation(
this.reference = checkNotNull(reference);
this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
+ this.fileStateSizeThreshold = fileStateSizeThreshold;
}
// ------------------------------------------------------------------------
@@ -114,10 +122,13 @@ public CheckpointStorageLocationReference getLocationReference() {
@Override
public String toString() {
return "FsCheckpointStorageLocation {" +
- "metadataFilePath=" + metadataFilePath +
- ", taskOwnedStateDirectory=" + taskOwnedStateDirectory +
- ", sharedStateDirectory=" + sharedStateDirectory +
+ "fileSystem=" + fileSystem +
", checkpointDirectory=" + checkpointDirectory +
+ ", sharedStateDirectory=" + sharedStateDirectory +
+ ", taskOwnedStateDirectory=" + taskOwnedStateDirectory +
+ ", metadataFilePath=" + metadataFilePath +
+ ", reference=" + reference +
+ ", fileStateSizeThreshold=" + fileStateSizeThreshold +
'}';
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
index 8f84a38141f9e..8a9c69ff4269a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStreamFactory.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.state.filesystem;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
@@ -26,33 +25,47 @@
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.memory.ByteStreamStateHandle;
import org.apache.flink.util.FileUtils;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.net.URI;
import java.util.Arrays;
import java.util.UUID;
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
/**
- * {@link org.apache.flink.runtime.state.CheckpointStreamFactory} that produces streams that
- * write to a {@link FileSystem}.
+ * A {@link CheckpointStreamFactory} that produces streams that write to a {@link FileSystem}.
+ * The streams from the factory put their data into files with a random name, within the
+ * given directory.
+ *
+ *
If the state written to the stream is fewer bytes than a configurable threshold, then no
+ * files are written, but the state is returned inline in the state handle instead. This reduces
+ * the problem of many small files that have only few bytes.
+ *
+ *
Note on directory creation
*
- * The factory has one core directory into which it puts all checkpoint data. Inside that
- * directory, it creates a directory per job, inside which each checkpoint gets a directory, with
- * files for each state, for example:
+ *
The given target directory must already exist, this factory does not ensure that the
+ * directory gets created. That is important, because if this factory checked for directory
+ * existence, there would be many checks per checkpoint (from each TaskManager and operator)
+ * and such floods of directory existence checks can be prohibitive on larger scale setups
+ * for some file systems.
*
- * {@code hdfs://namenode:port/flink-checkpoints//chk-17/6ba7b810-9dad-11d1-80b4-00c04fd430c8 }
+ * For example many S3 file systems (like Hadoop's s3a) use HTTP HEAD requests to check
+ * for the existence of a directory. S3 sometimes limits the number of HTTP HEAD requests to
+ * a few hundred per second only. Those numbers are easily reached by moderately large setups.
+ * Surprisingly (and fortunately), the actual state writing (POST) have much higher quotas.
*/
public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
private static final Logger LOG = LoggerFactory.getLogger(FsCheckpointStreamFactory.class);
/** Maximum size of state that is stored with the metadata, rather than in files */
- private static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
+ public static final int MAX_FILE_STATE_THRESHOLD = 1024 * 1024;
/** Default size for the write buffer */
- private static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
+ public static final int DEFAULT_WRITE_BUFFER_SIZE = 4096;
/** State below this size will be stored as part of the metadata, rather than in files */
private final int fileStateThreshold;
@@ -60,31 +73,26 @@ public class FsCheckpointStreamFactory implements CheckpointStreamFactory {
/** The directory (job specific) into this initialized instance of the backend stores its data */
private final Path checkpointDirectory;
- /** Cached handle to the file system for file operations */
+ /** Cached handle to the file system for file operations. */
private final FileSystem filesystem;
/**
- * Creates a new state backend that stores its checkpoint data in the file system and location
- * defined by the given URI.
+ * Creates a new stream factory that stores its checkpoint data in the file system and location
+ * defined by the given Path.
*
- *
A file system for the file system scheme in the URI (e.g., 'file://', 'hdfs://', or 'S3://')
- * must be accessible via {@link FileSystem#get(URI)}.
+ *
Important: The given checkpoint directory must already exist. Refer to the class-level
+ * JavaDocs for an explanation why this factory must not try and create the checkpoints.
*
- *
For a state backend targeting HDFS, this means that the URI must either specify the authority
- * (host and port), or that the Hadoop configuration that describes that information must be in the
- * classpath.
- *
- * @param checkpointDataUri The URI describing the filesystem (scheme and optionally authority),
- * and the path to the checkpoint data directory.
+ * @param fileSystem The filesystem to write to.
+ * @param checkpointDirectory The URI describing the filesystem (scheme and optionally authority),
+ * and the path to the checkpoint data directory.
* @param fileStateSizeThreshold State up to this size will be stored as part of the metadata,
* rather than in files
- *
- * @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
public FsCheckpointStreamFactory(
- Path checkpointDataUri,
- JobID jobId,
- int fileStateSizeThreshold) throws IOException {
+ FileSystem fileSystem,
+ Path checkpointDirectory,
+ int fileStateSizeThreshold) {
if (fileStateSizeThreshold < 0) {
throw new IllegalArgumentException("The threshold for file state size must be zero or larger.");
@@ -93,52 +101,33 @@ public FsCheckpointStreamFactory(
throw new IllegalArgumentException("The threshold for file state size cannot be larger than " +
MAX_FILE_STATE_THRESHOLD);
}
- this.fileStateThreshold = fileStateSizeThreshold;
- Path basePath = checkpointDataUri;
- filesystem = basePath.getFileSystem();
-
- checkpointDirectory = createBasePath(filesystem, basePath, jobId);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Initialed file stream factory to URI {}.", checkpointDirectory);
- }
+ this.filesystem = checkNotNull(fileSystem);
+ this.checkpointDirectory = checkNotNull(checkpointDirectory);
+ this.fileStateThreshold = fileStateSizeThreshold;
}
+ // ------------------------------------------------------------------------
+
@Override
public FsCheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
- checkFileSystemInitialized();
-
- Path checkpointDir = createCheckpointDirPath(checkpointDirectory, checkpointID);
int bufferSize = Math.max(DEFAULT_WRITE_BUFFER_SIZE, fileStateThreshold);
- return new FsCheckpointStateOutputStream(checkpointDir, filesystem, bufferSize, fileStateThreshold);
+ return new FsCheckpointStateOutputStream(checkpointDirectory, filesystem, bufferSize, fileStateThreshold);
}
// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
- private void checkFileSystemInitialized() throws IllegalStateException {
- if (filesystem == null || checkpointDirectory == null) {
- throw new IllegalStateException("filesystem has not been re-initialized after deserialization");
- }
- }
-
- protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException {
- Path dir = new Path(checkpointDirectory, jobID.toString());
- fs.mkdirs(dir);
- return dir;
- }
-
- protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) {
- return new Path(checkpointDirectory, "chk-" + checkpointID);
- }
-
@Override
public String toString() {
return "File Stream Factory @ " + checkpointDirectory;
}
+ // ------------------------------------------------------------------------
+ // Checkpoint stream implementation
+ // ------------------------------------------------------------------------
+
/**
* A {@link CheckpointStreamFactory.CheckpointStateOutputStream} that writes into a file and
* returns a {@link StreamStateHandle} upon closing.
@@ -349,9 +338,6 @@ private Path createStatePath() {
}
private void createStream() throws IOException {
- // make sure the directory for that specific checkpoint exists
- fs.mkdirs(basePath);
-
Exception latestException = null;
for (int attempt = 0; attempt < 10; attempt++) {
try {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
deleted file mode 100644
index 7410d2dd971ff..0000000000000
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactory.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import java.io.IOException;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.FileSystem;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
-
-/**
- * A {@link CheckpointStreamFactory} that produces streams that write to a
- * {@link FileSystem}.
- *
- *
The difference to the parent {@link FsCheckpointStreamFactory} is only
- * in the created directory layout. All checkpoint files go to the checkpoint
- * directory.
- */
-public class FsSavepointStreamFactory extends FsCheckpointStreamFactory {
-
- public FsSavepointStreamFactory(
- Path checkpointDataUri,
- JobID jobId,
- int fileStateSizeThreshold) throws IOException {
-
- super(checkpointDataUri, jobId, fileStateSizeThreshold);
- }
-
- @Override
- protected Path createBasePath(FileSystem fs, Path checkpointDirectory, JobID jobID) throws IOException {
- // No checkpoint specific directory required as the savepoint directory
- // is already unique.
- return checkpointDirectory;
- }
-
- @Override
- protected Path createCheckpointDirPath(Path checkpointDirectory, long checkpointID) {
- // No checkpoint specific directory required as the savepoint directory
- // is already unique.
- return checkpointDirectory;
- }
-}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
index de4955232a732..58791e27dce61 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/filesystem/FsStateBackend.java
@@ -29,7 +29,6 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -437,21 +436,7 @@ public FsStateBackend configure(Configuration config) {
@Override
public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
checkNotNull(jobId, "jobId");
- return new FsCheckpointStorage(getCheckpointPath(), getSavepointPath(), jobId);
- }
-
- @Override
- public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
- return new FsCheckpointStreamFactory(getCheckpointPath(), jobId, getMinFileSizeThreshold());
- }
-
- @Override
- public CheckpointStreamFactory createSavepointStreamFactory(
- JobID jobId,
- String operatorIdentifier,
- String targetLocation) throws IOException {
-
- return new FsSavepointStreamFactory(new Path(targetLocation), jobId, getMinFileSizeThreshold());
+ return new FsCheckpointStorage(getCheckpointPath(), getSavepointPath(), jobId, getMinFileSizeThreshold());
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
index 3fb26276ab6fe..24db5350bcefd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryBackendCheckpointStorage.java
@@ -22,7 +22,11 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
import javax.annotation.Nullable;
@@ -46,19 +50,8 @@ public class MemoryBackendCheckpointStorage extends AbstractFsCheckpointStorage
@Nullable
private final FileSystem fileSystem;
- /**
- * Creates a new MemoryBackendCheckpointStorage. The storage neither persists checkpoints
- * in the filesystem, nor does it have a default savepoint location. The storage does support
- * savepoints, though, when an explicit savepoint location is passed to
- * {@link #initializeLocationForSavepoint(long, String)}.
- *
- * @param jobId The ID of the job writing the checkpoints.
- */
- public MemoryBackendCheckpointStorage(JobID jobId) {
- super(jobId, null);
- checkpointsDirectory = null;
- fileSystem = null;
- }
+ /** The maximum size of state stored in a state handle. */
+ private final int maxStateSize;
/**
* Creates a new MemoryBackendCheckpointStorage.
@@ -67,6 +60,7 @@ public MemoryBackendCheckpointStorage(JobID jobId) {
* @param checkpointsBaseDirectory The directory to write checkpoints to. May be null,
* in which case this storage does not support durable persistence.
* @param defaultSavepointLocation The default savepoint directory, or null, if none is set.
+ * @param maxStateSize The maximum size of each individual piece of state.
*
* @throws IOException Thrown if a checkpoint base directory is given configured and the
* checkpoint directory cannot be created within that directory.
@@ -74,10 +68,14 @@ public MemoryBackendCheckpointStorage(JobID jobId) {
public MemoryBackendCheckpointStorage(
JobID jobId,
@Nullable Path checkpointsBaseDirectory,
- @Nullable Path defaultSavepointLocation) throws IOException {
+ @Nullable Path defaultSavepointLocation,
+ int maxStateSize) throws IOException {
super(jobId, defaultSavepointLocation);
+ checkArgument(maxStateSize > 0);
+ this.maxStateSize = maxStateSize;
+
if (checkpointsBaseDirectory == null) {
checkpointsDirectory = null;
fileSystem = null;
@@ -113,23 +111,44 @@ public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpoint
// create the checkpoint exclusive directory
fileSystem.mkdirs(checkpointDir);
- return new PersistentMetadataCheckpointStorageLocation(fileSystem, checkpointDir);
+ return new PersistentMetadataCheckpointStorageLocation(fileSystem, checkpointDir, maxStateSize);
}
else {
// no durable metadata - typical in IDE or test setup case
- return new NonPersistentMetadataCheckpointStorageLocation();
+ return new NonPersistentMetadataCheckpointStorageLocation(maxStateSize);
}
}
+ @Override
+ public CheckpointStreamFactory resolveCheckpointStorageLocation(
+ long checkpointId,
+ CheckpointStorageLocationReference reference) throws IOException {
+
+ // no matter where the checkpoint goes, we always return the storage location that stores
+ // state inline with the state handles.
+ return new MemCheckpointStreamFactory(maxStateSize);
+ }
+
+ @Override
+ public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
+ return new MemoryCheckpointOutputStream(maxStateSize);
+ }
+
+ @Override
+ protected CheckpointStorageLocation createSavepointLocation(FileSystem fs, Path location) throws IOException {
+ return new PersistentMetadataCheckpointStorageLocation(fs, location, maxStateSize);
+ }
+
// ------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------
@Override
public String toString() {
- return getClass().getName() + " - " +
- (checkpointsDirectory == null ? "(not persistent)" : checkpointsDirectory) +
- ", default savepoint dir: " +
- (getDefaultSavepointDirectory() == null ? "(none)" : getDefaultSavepointDirectory());
+ return "MemoryBackendCheckpointStorage {" +
+ "checkpointsDirectory=" + checkpointsDirectory +
+ ", fileSystem=" + fileSystem +
+ ", maxStateSize=" + maxStateSize +
+ '}';
}
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
index afcf9a80f67bd..88d7b01e236ea 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/MemoryStateBackend.java
@@ -28,7 +28,6 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -280,22 +279,7 @@ public MemoryStateBackend configure(Configuration config) {
@Override
public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
- return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath());
- }
-
- @Override
- public CheckpointStreamFactory createStreamFactory(
- JobID jobId, String operatorIdentifier) throws IOException {
- return new MemCheckpointStreamFactory(maxStateSize);
- }
-
- @Override
- public CheckpointStreamFactory createSavepointStreamFactory(
- JobID jobId,
- String operatorIdentifier,
- String targetLocation) throws IOException {
-
- return new MemCheckpointStreamFactory(maxStateSize);
+ return new MemoryBackendCheckpointStorage(jobId, getCheckpointPath(), getSavepointPath(), maxStateSize);
}
// ------------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
index fb8bd7eeb0b75..0582aefa26138 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/NonPersistentMetadataCheckpointStorageLocation.java
@@ -20,8 +20,6 @@
import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
-import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
-import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory.MemoryCheckpointOutputStream;
import java.io.IOException;
@@ -29,7 +27,9 @@
* A checkpoint storage location for the {@link MemoryStateBackend} in case no durable persistence
* for metadata has been configured.
*/
-public class NonPersistentMetadataCheckpointStorageLocation implements CheckpointStorageLocation {
+public class NonPersistentMetadataCheckpointStorageLocation
+ extends MemCheckpointStreamFactory
+ implements CheckpointStorageLocation {
/** The external pointer returned for checkpoints that are not externally addressable. */
public static final String EXTERNAL_POINTER = "";
@@ -37,6 +37,10 @@ public class NonPersistentMetadataCheckpointStorageLocation implements Checkpoin
/** The maximum serialized state size for the checkpoint metadata. */
private static final int MAX_METADATA_STATE_SIZE = Integer.MAX_VALUE;
+ public NonPersistentMetadataCheckpointStorageLocation(int maxStateSize) {
+ super(maxStateSize);
+ }
+
@Override
public CheckpointStateOutputStream createMetadataOutputStream() throws IOException {
return new MemoryCheckpointOutputStream(MAX_METADATA_STATE_SIZE);
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
index b6a863575a705..6da8191aa9878 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/state/memory/PersistentMetadataCheckpointStorageLocation.java
@@ -20,19 +20,28 @@
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
-import org.apache.flink.runtime.state.filesystem.FsCheckpointStorageLocation;
+import org.apache.flink.runtime.state.filesystem.AbstractFsCheckpointStorage;
+import org.apache.flink.runtime.state.filesystem.FixFileFsStateOutputStream;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* A checkpoint storage location for the {@link MemoryStateBackend} when it durably
* persists the metadata in a file system.
- *
- * This class inherits its behavior for metadata from the {@link FsCheckpointStorageLocation},
- * which makes checkpoint metadata cross compatible between the two classes and hence between
- * the {@link org.apache.flink.runtime.state.filesystem.FsStateBackend FsStateBackend} and the
- * {@link MemoryStateBackend}.
*/
-public class PersistentMetadataCheckpointStorageLocation extends FsCheckpointStorageLocation {
+public class PersistentMetadataCheckpointStorageLocation
+ extends MemCheckpointStreamFactory
+ implements CheckpointStorageLocation {
+
+ private final FileSystem fileSystem;
+
+ private final Path checkpointDirectory;
+
+ private final Path metadataFilePath;
/**
* Creates a checkpoint storage persists metadata to a file system and stores state
@@ -41,14 +50,39 @@ public class PersistentMetadataCheckpointStorageLocation extends FsCheckpointSto
* @param fileSystem The file system to which the metadata will be written.
* @param checkpointDir The directory where the checkpoint metadata will be written.
*/
- public PersistentMetadataCheckpointStorageLocation(FileSystem fileSystem, Path checkpointDir) {
- super(fileSystem, checkpointDir, checkpointDir, checkpointDir, CheckpointStorageLocationReference.getDefault());
+ public PersistentMetadataCheckpointStorageLocation(
+ FileSystem fileSystem,
+ Path checkpointDir,
+ int maxStateSize) {
+
+ super(maxStateSize);
+
+ this.fileSystem = checkNotNull(fileSystem);
+ this.checkpointDirectory = checkNotNull(checkpointDir);
+ this.metadataFilePath = new Path(checkpointDir, AbstractFsCheckpointStorage.METADATA_FILE_NAME);
}
// ------------------------------------------------------------------------
@Override
- public String toString() {
- return getClass().getName() + " @ " + getCheckpointDirectory();
+ public CheckpointStateOutputStream createMetadataOutputStream() throws IOException {
+ return new FixFileFsStateOutputStream(fileSystem, metadataFilePath);
+ }
+
+ @Override
+ public String markCheckpointAsFinished() throws IOException {
+ return checkpointDirectory.toString();
+ }
+
+ @Override
+ public void disposeOnFailure() throws IOException {
+ // on a failure, no chunk in the checkpoint directory needs to be saved, so
+ // we can drop it as a whole
+ fileSystem.delete(checkpointDirectory, true);
+ }
+
+ @Override
+ public CheckpointStorageLocationReference getLocationReference() {
+ return CheckpointStorageLocationReference.getDefault();
}
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index b8a2a54e4aa4c..479bba2c87eb5 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -37,7 +37,6 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
@@ -48,7 +47,6 @@
import org.junit.Test;
-import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.net.URL;
@@ -160,20 +158,6 @@ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException
return mock(CheckpointStorage.class);
}
- @Override
- public CheckpointStreamFactory createStreamFactory(
- JobID jobId, String operatorIdentifier) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CheckpointStreamFactory createSavepointStreamFactory(
- JobID jobId,
- String operatorIdentifier,
- @Nullable String targetLocation) throws IOException {
- throw new UnsupportedOperationException();
- }
-
@Override
public AbstractKeyedStateBackend createKeyedStateBackend(
Environment env,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
index 904b533a8dc1b..f45ac14091c9d 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/PendingCheckpointTest.java
@@ -335,7 +335,8 @@ private PendingCheckpoint createPendingCheckpoint(CheckpointProperties props, Ex
final FsCheckpointStorageLocation location = new FsCheckpointStorageLocation(
LocalFileSystem.getSharedInstance(),
checkpointDir, checkpointDir, checkpointDir,
- CheckpointStorageLocationReference.getDefault());
+ CheckpointStorageLocationReference.getDefault(),
+ 1024);
final Map ackTasks = new HashMap<>(ACK_TASKS);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
index 6be23437d88ca..b5774bd8d8059 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FileStateBackendTest.java
@@ -82,13 +82,17 @@ public void testStateOutputStream() throws IOException {
try {
// the state backend has a very low in-mem state threshold (15 bytes)
FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(basePath.toURI(), 15));
- JobID jobId = new JobID();
+
+ final JobID jobId = new JobID();
+ final long checkpointId = 97231523452L;
// we know how FsCheckpointStreamFactory is implemented so we know where it
// will store checkpoints
File checkpointPath = new File(basePath.getAbsolutePath(), jobId.toString());
- CheckpointStreamFactory streamFactory = backend.createStreamFactory(jobId, "test_op");
+ CheckpointStreamFactory streamFactory = backend
+ .createCheckpointStorage(jobId)
+ .initializeLocationForCheckpoint(checkpointId);
byte[] state1 = new byte[1274673];
byte[] state2 = new byte[1];
@@ -101,8 +105,6 @@ public void testStateOutputStream() throws IOException {
rnd.nextBytes(state3);
rnd.nextBytes(state4);
- long checkpointId = 97231523452L;
-
CheckpointStreamFactory.CheckpointStateOutputStream stream1 =
streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
index 7cf3abd6600f2..092b304da7895 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java
@@ -19,7 +19,6 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.ValueState;
@@ -30,9 +29,10 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.heap.HeapKeyedStateBackend;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.util.FutureUtil;
-import org.apache.flink.util.TernaryBoolean;
+
import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
@@ -94,8 +94,7 @@ public void testMapStateRestoreWithWrongSerializers() {}
@Test
public void testOversizedState() {
try {
- MemoryStateBackend backend = new MemoryStateBackend(null, null, 10, TernaryBoolean.TRUE);
- CheckpointStreamFactory streamFactory = backend.createStreamFactory(new JobID(), "test_op");
+ CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(10);
HashMap state = new HashMap<>();
state.put("hey there", 2);
@@ -127,8 +126,8 @@ public void testOversizedState() {
@Test
public void testStateStream() {
try {
- MemoryStateBackend backend = new MemoryStateBackend();
- CheckpointStreamFactory streamFactory = backend.createStreamFactory(new JobID(), "test_op");
+// MemoryStateBackend backend = new MemoryStateBackend();
+ CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
HashMap state = new HashMap<>();
state.put("hey there", 2);
@@ -156,8 +155,7 @@ public void testStateStream() {
@Test
public void testOversizedStateStream() {
try {
- MemoryStateBackend backend = new MemoryStateBackend(10);
- CheckpointStreamFactory streamFactory = backend.createStreamFactory(new JobID(), "test_op");
+ CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(10);
HashMap state = new HashMap<>();
state.put("hey there", 2);
@@ -216,7 +214,8 @@ public void testOperatorStateRestoreFailsIfSerializerDeserializationFails() thro
listState3.add(19);
listState3.add(20);
- CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+ CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(MemoryStateBackend.DEFAULT_MAX_STATE_SIZE);
+
RunnableFuture runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
index e891ab35b44c2..ef390db479029 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/OperatorStateBackendTest.java
@@ -18,7 +18,6 @@
package org.apache.flink.runtime.state;
import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
@@ -34,6 +33,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend.PartitionableListState;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.util.FutureUtil;
@@ -233,7 +233,7 @@ public void testCorrectClassLoaderUsedOnSnapshot() throws Exception {
listState.add(42);
- CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+ CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
RunnableFuture runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
FutureUtil.runIfNotDoneAndGet(runnableFuture);
@@ -351,8 +351,7 @@ public void testSnapshotEmpty() throws Exception {
final OperatorStateBackend operatorStateBackend =
abstractStateBackend.createOperatorStateBackend(createMockEnvironment(), "testOperator");
- CheckpointStreamFactory streamFactory =
- abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+ CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
RunnableFuture snapshot =
operatorStateBackend.snapshot(0L, 0L, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
@@ -385,7 +384,7 @@ public void testSnapshotRestoreSync() throws Exception {
listState3.add(19);
listState3.add(20);
- CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+ CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
RunnableFuture runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);
@@ -665,7 +664,7 @@ public void testRestoreFailsIfSerializerDeserializationFails() throws Exception
listState3.add(19);
listState3.add(20);
- CheckpointStreamFactory streamFactory = abstractStateBackend.createStreamFactory(new JobID(), "testOperator");
+ CheckpointStreamFactory streamFactory = new MemCheckpointStreamFactory(4096);
RunnableFuture runnableFuture =
operatorStateBackend.snapshot(1, 1, streamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
OperatorStateHandle stateHandle = FutureUtil.runIfNotDoneAndGet(runnableFuture);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
index 7533c9f460723..90a68dc203faa 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java
@@ -122,12 +122,21 @@
public abstract class StateBackendTestBase extends TestLogger {
@Rule
- public ExpectedException expectedException = ExpectedException.none();
+ public final ExpectedException expectedException = ExpectedException.none();
+
+ // lazily initialized stream storage
+ private CheckpointStorageLocation checkpointStorageLocation;
protected abstract B getStateBackend() throws Exception;
protected CheckpointStreamFactory createStreamFactory() throws Exception {
- return getStateBackend().createStreamFactory(new JobID(), "test_op");
+ if (checkpointStorageLocation == null) {
+ checkpointStorageLocation = getStateBackend()
+ .createCheckpointStorage(new JobID())
+ .initializeLocationForCheckpoint(1L);
+ }
+
+ return checkpointStorageLocation;
}
protected AbstractKeyedStateBackend createKeyedBackend(TypeSerializer keySerializer) throws Exception {
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackendTest.java
index 4aac253863a9e..1bdebf63e0d12 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackendTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/AbstractFileStateBackendTest.java
@@ -257,7 +257,7 @@ private void testSavepoint(
final String customLocation = customDir == null ? null : customDir.toString();
- final FsCheckpointStorageLocation savepointLocation =
+ final FsCheckpointStorageLocation savepointLocation = (FsCheckpointStorageLocation)
storage.initializeLocationForSavepoint(52452L, customLocation);
// all state types should be in the expected location
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
deleted file mode 100644
index 3095a09590f04..0000000000000
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsSavepointStreamFactoryTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.state.filesystem;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-
-import java.io.File;
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.core.fs.FileStatus;
-import org.apache.flink.core.fs.Path;
-import org.apache.flink.runtime.state.filesystem.FsCheckpointStreamFactory.FsCheckpointStateOutputStream;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
-
-public class FsSavepointStreamFactoryTest {
-
- @Rule
- public TemporaryFolder folder = new TemporaryFolder();
-
- /**
- * Tests that the factory creates all files in the given directory without
- * creating any sub directories.
- */
- @Test
- public void testSavepointStreamDirectoryLayout() throws Exception {
- File testRoot = folder.newFolder();
- JobID jobId = new JobID();
-
- FsSavepointStreamFactory savepointStreamFactory = new FsSavepointStreamFactory(
- new Path(testRoot.getAbsolutePath()),
- jobId,
- 0);
-
- Path root = new Path(testRoot.getAbsolutePath());
- FileStatus[] listed = root.getFileSystem().listStatus(root);
- assertNotNull(listed);
- assertEquals(0, listed.length);
-
- FsCheckpointStateOutputStream stream = savepointStreamFactory
- .createCheckpointStateOutputStream(1273, 19231);
-
- stream.write(1);
-
- FileStateHandle handle = (FileStateHandle) stream.closeAndGetHandle();
-
- listed = root.getFileSystem().listStatus(root);
- assertNotNull(listed);
- assertEquals(1, listed.length);
- assertEquals(handle.getFilePath().getPath(), listed[0].getPath().getPath());
- }
-}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
new file mode 100644
index 0000000000000..fdb04ccce9716
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/BackendForTestStream.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.testutils;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocation;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
+import org.apache.flink.runtime.state.StreamStateHandle;
+import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.util.function.SupplierWithException;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A test backends that allows you to supply a specific test stream.
+ */
+@SuppressWarnings({"serial"})
+public class BackendForTestStream extends MemoryStateBackend {
+
+ private static final long serialVersionUID = 1L;
+
+ private final TestFactory streamFactory;
+
+ public BackendForTestStream(TestFactory streamFactory) {
+ this.streamFactory = checkNotNull(streamFactory);
+ }
+
+ public BackendForTestStream(StreamFactory streamSupplier) {
+ this(new TestFactory(streamSupplier));
+ }
+
+ // make no reconfiguration!
+ @Override
+ public MemoryStateBackend configure(Configuration config) {
+ return this;
+ }
+
+ @Override
+ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
+ return new TestCheckpointStorage();
+ }
+
+ // ------------------------------------------------------------------------
+
+ public interface StreamFactory
+ extends SupplierWithException, java.io.Serializable {}
+
+ // ------------------------------------------------------------------------
+
+ private final class TestCheckpointStorage implements CheckpointStorage {
+
+ @Override
+ public boolean supportsHighlyAvailableStorage() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean hasDefaultSavepointLocation() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public StreamStateHandle resolveCheckpoint(String pointer) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CheckpointStorageLocation initializeLocationForCheckpoint(long checkpointId) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CheckpointStorageLocation initializeLocationForSavepoint(long checkpointId, @Nullable String externalLocationPointer) throws IOException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CheckpointStreamFactory resolveCheckpointStorageLocation(long checkpointId, CheckpointStorageLocationReference reference) throws IOException {
+ return streamFactory;
+ }
+
+ @Override
+ public CheckpointStateOutputStream createTaskOwnedStateStream() throws IOException {
+ throw new UnsupportedOperationException();
+ }
+ }
+
+ private static final class TestFactory implements CheckpointStreamFactory, java.io.Serializable {
+
+ private final StreamFactory streamFactory;
+
+ TestFactory(StreamFactory streamFactory) {
+ this.streamFactory = streamFactory;
+ }
+
+ @Override
+ public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
+ return streamFactory.get();
+ }
+ }
+}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
new file mode 100644
index 0000000000000..57a1d3bbbbac1
--- /dev/null
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/testutils/TestCheckpointStreamFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state.testutils;
+
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
+
+import java.util.function.Supplier;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A bridge between factory lambdas (like {@link Supplier} and the
+ * CheckpointStreamFactory class.
+ */
+public class TestCheckpointStreamFactory implements CheckpointStreamFactory {
+
+ private final Supplier supplier;
+
+ public TestCheckpointStreamFactory(Supplier supplier) {
+ this.supplier = checkNotNull(supplier);
+ }
+
+ @Override
+ public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointId, long timestamp) {
+ return supplier.get();
+ }
+}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
index c6bc1443d8b0c..7db157caec2c1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java
@@ -34,7 +34,6 @@
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
-import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
@@ -72,9 +71,7 @@
import org.slf4j.LoggerFactory;
import java.io.Closeable;
-import java.io.IOException;
import java.io.Serializable;
-import java.nio.charset.StandardCharsets;
import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.Map;
@@ -119,11 +116,6 @@ public abstract class AbstractStreamOperator
/** The runtime context for UDFs. */
private transient StreamingRuntimeContext runtimeContext;
- // ----------------- general state -------------------
-
- /** The factory that give this operator access to checkpoint storage. */
- private transient CheckpointStreamFactory checkpointStreamFactory;
-
// ---------------- key/value state ------------------
/**
@@ -243,7 +235,6 @@ public final void initializeState() throws Exception {
}
timeServiceManager = context.internalTimerServiceManager();
- checkpointStreamFactory = context.checkpointStreamFactory();
CloseableIterable keyedStateInputs = context.rawKeyedStateInputs();
CloseableIterable operatorStateInputs = context.rawOperatorStateInputs();
@@ -352,15 +343,17 @@ public void dispose() throws Exception {
}
@Override
- public final OperatorSnapshotResult snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception {
+ public final OperatorSnapshotResult snapshotState(
+ long checkpointId,
+ long timestamp,
+ CheckpointOptions checkpointOptions,
+ CheckpointStreamFactory factory) throws Exception {
KeyGroupRange keyGroupRange = null != keyedStateBackend ?
keyedStateBackend.getKeyGroupRange() : KeyGroupRange.EMPTY_KEY_GROUP_RANGE;
OperatorSnapshotResult snapshotInProgress = new OperatorSnapshotResult();
- CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions);
-
try (StateSnapshotContextSynchronousImpl snapshotContext = new StateSnapshotContextSynchronousImpl(
checkpointId,
timestamp,
@@ -450,37 +443,6 @@ public void notifyCheckpointComplete(long checkpointId) throws Exception {
}
}
- /**
- * Returns a checkpoint stream factory for the provided options.
- *
- * For {@link CheckpointType#CHECKPOINT} this returns the shared
- * factory of this operator.
- *
- *
For {@link CheckpointType#SAVEPOINT} it creates a custom factory per
- * savepoint.
- *
- * @param checkpointOptions Options for the checkpoint
- * @return Checkpoint stream factory for the checkpoints
- * @throws IOException Failures while creating a new stream factory are forwarded
- */
- @VisibleForTesting
- CheckpointStreamFactory getCheckpointStreamFactory(CheckpointOptions checkpointOptions) throws IOException {
- CheckpointType checkpointType = checkpointOptions.getCheckpointType();
- if (checkpointType == CheckpointType.CHECKPOINT) {
- return checkpointStreamFactory;
- } else if (checkpointType == CheckpointType.SAVEPOINT) {
-
- // temporary fix: hard-code back conversion of the location reference to a string
- String targetAsString = new String(
- checkpointOptions.getTargetLocation().getReferenceBytes(),
- StandardCharsets.UTF_8);
-
- return container.createSavepointStreamFactory(this, targetAsString);
- } else {
- throw new IllegalStateException("Unknown checkpoint type " + checkpointType);
- }
- }
-
// ------------------------------------------------------------------------
// Properties and Services
// ------------------------------------------------------------------------
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
index 07adfe409345b..8450396881dec 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java
@@ -22,6 +22,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.CheckpointListener;
+import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -101,7 +102,8 @@ public interface StreamOperator extends CheckpointListener, KeyContext, Ser
OperatorSnapshotResult snapshotState(
long checkpointId,
long timestamp,
- CheckpointOptions checkpointOptions) throws Exception;
+ CheckpointOptions checkpointOptions,
+ CheckpointStreamFactory storageLocation) throws Exception;
/**
* Provides a context to initialize all state in the operator.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
index 52afd2a9de321..75ead44630ea3 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorStateContext.java
@@ -19,7 +19,6 @@
package org.apache.flink.streaming.api.operators;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
@@ -53,11 +52,6 @@ public interface StreamOperatorStateContext {
*/
InternalTimeServiceManager, ?> internalTimerServiceManager();
- /**
- * Returns the checkpoint stream factory for the stream operator.
- */
- CheckpointStreamFactory checkpointStreamFactory();
-
/**
* Returns an iterable to obtain input streams for previously stored operator state partitions that are assigned to
* this stream operator.
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
index ff00a8c0531f9..9afd2b70dd988 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImpl.java
@@ -28,7 +28,6 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
@@ -127,8 +126,7 @@ public StreamOperatorStateContext streamOperatorStateContext(
OperatorStateBackend operatorStateBackend = null;
CloseableIterable rawKeyedStateInputs = null;
CloseableIterable rawOperatorStateInputs = null;
- CheckpointStreamFactory checkpointStreamFactory = null;
- InternalTimeServiceManager, ?> timeServiceManager = null;
+ InternalTimeServiceManager, ?> timeServiceManager;
try {
@@ -152,9 +150,6 @@ public StreamOperatorStateContext streamOperatorStateContext(
rawOperatorStateInputs = rawOperatorStateInputs(operatorSubtaskStateFromJobManager);
streamTaskCloseableRegistry.registerCloseable(rawOperatorStateInputs);
- // -------------- Checkpoint Stream Factory --------------
- checkpointStreamFactory = streamFactory(operatorIdentifierText);
-
// -------------- Internal Timer Service Manager --------------
timeServiceManager = internalTimeServiceManager(keyedStatedBackend, keyContext, rawKeyedStateInputs);
@@ -166,8 +161,7 @@ public StreamOperatorStateContext streamOperatorStateContext(
keyedStatedBackend,
timeServiceManager,
rawOperatorStateInputs,
- rawKeyedStateInputs,
- checkpointStreamFactory);
+ rawKeyedStateInputs);
} catch (Exception ex) {
// cleanup if something went wrong before results got published.
@@ -259,10 +253,6 @@ protected AbstractKeyedStateBackend keyedStatedBackend(
backendCloseableRegistry);
}
- protected CheckpointStreamFactory streamFactory(String operatorIdentifierText) throws IOException {
- return stateBackend.createStreamFactory(environment.getJobID(), operatorIdentifierText);
- }
-
protected CloseableIterable rawOperatorStateInputs(
OperatorSubtaskState operatorSubtaskStateFromJobManager) {
@@ -589,16 +579,13 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta
private final CloseableIterable rawOperatorStateInputs;
private final CloseableIterable rawKeyedStateInputs;
- private final CheckpointStreamFactory checkpointStreamFactory;
-
StreamOperatorStateContextImpl(
boolean restored,
OperatorStateBackend operatorStateBackend,
AbstractKeyedStateBackend> keyedStateBackend,
InternalTimeServiceManager, ?> internalTimeServiceManager,
CloseableIterable rawOperatorStateInputs,
- CloseableIterable rawKeyedStateInputs,
- CheckpointStreamFactory checkpointStreamFactory) {
+ CloseableIterable rawKeyedStateInputs) {
this.restored = restored;
this.operatorStateBackend = operatorStateBackend;
@@ -606,7 +593,6 @@ private static class StreamOperatorStateContextImpl implements StreamOperatorSta
this.internalTimeServiceManager = internalTimeServiceManager;
this.rawOperatorStateInputs = rawOperatorStateInputs;
this.rawKeyedStateInputs = rawKeyedStateInputs;
- this.checkpointStreamFactory = checkpointStreamFactory;
}
@Override
@@ -629,11 +615,6 @@ public OperatorStateBackend operatorStateBackend() {
return internalTimeServiceManager;
}
- @Override
- public CheckpointStreamFactory checkpointStreamFactory() {
- return checkpointStreamFactory;
- }
-
@Override
public CloseableIterable rawOperatorStateInputs() {
return rawOperatorStateInputs;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
index 89ef327ea522e..27db1263f1a6b 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java
@@ -24,6 +24,7 @@
import org.apache.flink.core.memory.DataInputViewStreamWrapper;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.io.disk.InputViewIterator;
+import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StateSnapshotContext;
@@ -65,7 +66,7 @@ public abstract class GenericWriteAheadSink extends AbstractStreamOperator serializer;
private transient CheckpointStreamFactory.CheckpointStateOutputStream out;
- private transient CheckpointStreamFactory checkpointStreamFactory;
+ private transient CheckpointStorage checkpointStorage;
private transient ListState checkpointedState;
@@ -116,8 +117,7 @@ public void open() throws Exception {
committer.setOperatorId(id);
committer.open();
- checkpointStreamFactory = getContainingTask()
- .createCheckpointStreamFactory(this);
+ checkpointStorage = getContainingTask().getCheckpointStorage();
cleanRestoredHandles();
}
@@ -274,7 +274,7 @@ public void processElement(StreamRecord element) throws Exception {
IN value = element.getValue();
// generate initial operator state
if (out == null) {
- out = checkpointStreamFactory.createCheckpointStateOutputStream(0, 0);
+ out = checkpointStorage.createTaskOwnedStateStream();
}
serializer.serialize(value, new DataOutputViewStreamWrapper(out));
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 801e9907af3db..4a85fb9acf41c 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -34,6 +34,7 @@
import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
+import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateBackendLoader;
@@ -61,7 +62,6 @@
import javax.annotation.Nullable;
import java.io.Closeable;
-import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
@@ -141,6 +141,9 @@ public abstract class StreamTask>
/** Our state backend. We use this to create checkpoint streams and a keyed state backend. */
protected StateBackend stateBackend;
+ /** The external storage where checkpoint data is persisted. */
+ private CheckpointStorage checkpointStorage;
+
/**
* The internal {@link ProcessingTimeService} used to define the current
* processing time (default = {@code System.currentTimeMillis()}) and
@@ -246,6 +249,7 @@ public final void invoke() throws Exception {
asynchronousCheckpointExceptionHandler = new AsyncCheckpointExceptionHandler(this);
stateBackend = createStateBackend();
+ checkpointStorage = stateBackend.createCheckpointStorage(getEnvironment().getJobID());
accumulatorMap = getEnvironment().getAccumulatorRegistry().getUserMap();
@@ -527,6 +531,10 @@ public Object getCheckpointLock() {
return lock;
}
+ public CheckpointStorage getCheckpointStorage() {
+ return checkpointStorage;
+ }
+
public StreamConfig getConfiguration() {
return configuration;
}
@@ -686,10 +694,15 @@ private void checkpointState(
CheckpointOptions checkpointOptions,
CheckpointMetrics checkpointMetrics) throws Exception {
+ CheckpointStreamFactory storage = checkpointStorage.resolveCheckpointStorageLocation(
+ checkpointMetaData.getCheckpointId(),
+ checkpointOptions.getTargetLocation());
+
CheckpointingOperation checkpointingOperation = new CheckpointingOperation(
this,
checkpointMetaData,
checkpointOptions,
+ storage,
checkpointMetrics);
checkpointingOperation.executeCheckpointing();
@@ -720,27 +733,6 @@ private StateBackend createStateBackend() throws Exception {
LOG);
}
- /**
- * This is only visible because
- * {@link org.apache.flink.streaming.runtime.operators.GenericWriteAheadSink} uses the
- * checkpoint stream factory to write write-ahead logs. This should not be used for
- * anything else.
- */
- public CheckpointStreamFactory createCheckpointStreamFactory(
- StreamOperator> operator) throws IOException {
- return stateBackend.createStreamFactory(
- getEnvironment().getJobID(),
- createOperatorIdentifier(operator));
- }
-
- public CheckpointStreamFactory createSavepointStreamFactory(
- StreamOperator> operator, String targetLocation) throws IOException {
- return stateBackend.createSavepointStreamFactory(
- getEnvironment().getJobID(),
- createOperatorIdentifier(operator),
- targetLocation);
- }
-
protected CheckpointExceptionHandlerFactory createCheckpointExceptionHandlerFactory() {
return new CheckpointExceptionHandlerFactory();
}
@@ -883,7 +875,6 @@ public void run() {
checkpointMetaData.getCheckpointId());
}
} catch (Exception e) {
- e.printStackTrace();
// the state is completed if an exception occurred in the acknowledgeCheckpoint call
// in order to clean up, we have to set it to RUNNING again.
asyncCheckpointState.compareAndSet(
@@ -960,6 +951,7 @@ private static final class CheckpointingOperation {
private final CheckpointMetaData checkpointMetaData;
private final CheckpointOptions checkpointOptions;
private final CheckpointMetrics checkpointMetrics;
+ private final CheckpointStreamFactory storageLocation;
private final StreamOperator>[] allOperators;
@@ -974,12 +966,14 @@ public CheckpointingOperation(
StreamTask, ?> owner,
CheckpointMetaData checkpointMetaData,
CheckpointOptions checkpointOptions,
+ CheckpointStreamFactory checkpointStorageLocation,
CheckpointMetrics checkpointMetrics) {
this.owner = Preconditions.checkNotNull(owner);
this.checkpointMetaData = Preconditions.checkNotNull(checkpointMetaData);
this.checkpointOptions = Preconditions.checkNotNull(checkpointOptions);
this.checkpointMetrics = Preconditions.checkNotNull(checkpointMetrics);
+ this.storageLocation = Preconditions.checkNotNull(checkpointStorageLocation);
this.allOperators = owner.operatorChain.getAllOperators();
this.operatorSnapshotsInProgress = new HashMap<>(allOperators.length);
}
@@ -1050,7 +1044,8 @@ private void checkpointStreamOperator(StreamOperator> op) throws Exception {
OperatorSnapshotResult snapshotInProgress = op.snapshotState(
checkpointMetaData.getCheckpointId(),
checkpointMetaData.getTimestamp(),
- checkpointOptions);
+ checkpointOptions,
+ storageLocation);
operatorSnapshotsInProgress.put(op.getOperatorID(), snapshotInProgress);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
index ee0f8dee15a31..97a0182b193f4 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorTest.java
@@ -35,6 +35,7 @@
import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
+import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
@@ -501,10 +502,14 @@ public void testSnapshotMethod() throws Exception {
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator operator = mock(AbstractStreamOperator.class);
- when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
+ when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
doReturn(containingTask).when(operator).getContainingTask();
- operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
+ operator.snapshotState(
+ checkpointId,
+ timestamp,
+ CheckpointOptions.forCheckpointWithDefaultLocation(),
+ new MemCheckpointStreamFactory(Integer.MAX_VALUE));
verify(context).close();
}
@@ -530,14 +535,18 @@ public void testFailingSnapshotMethod() throws Exception {
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator operator = mock(AbstractStreamOperator.class);
- when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
+ when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
doReturn(containingTask).when(operator).getContainingTask();
// lets fail when calling the actual snapshotState method
doThrow(failingException).when(operator).snapshotState(eq(context));
try {
- operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
+ operator.snapshotState(
+ checkpointId,
+ timestamp,
+ CheckpointOptions.forCheckpointWithDefaultLocation(),
+ new MemCheckpointStreamFactory(Integer.MAX_VALUE));
fail("Exception expected.");
} catch (Exception e) {
assertEquals(failingException, e.getCause());
@@ -577,16 +586,11 @@ public void testFailingBackendSnapshotMethod() throws Exception {
when(containingTask.getCancelables()).thenReturn(closeableRegistry);
AbstractStreamOperator operator = mock(AbstractStreamOperator.class);
- when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenCallRealMethod();
+ when(operator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenCallRealMethod();
doCallRealMethod().when(operator).close();
doCallRealMethod().when(operator).dispose();
- // The amount of mocking in this test makes it necessary to make the
- // getCheckpointStreamFactory method visible for the test and to
- // overwrite its behaviour.
- when(operator.getCheckpointStreamFactory(any(CheckpointOptions.class))).thenReturn(streamFactory);
-
doReturn(containingTask).when(operator).getContainingTask();
RunnableFuture futureManagedOperatorStateHandle = mock(RunnableFuture.class);
@@ -602,7 +606,7 @@ public void testFailingBackendSnapshotMethod() throws Exception {
when(keyedStateBackend.snapshot(
eq(checkpointId),
eq(timestamp),
- eq(streamFactory),
+ any(CheckpointStreamFactory.class),
eq(CheckpointOptions.forCheckpointWithDefaultLocation()))).thenThrow(failingException);
closeableRegistry.registerCloseable(operatorStateBackend);
@@ -610,10 +614,13 @@ public void testFailingBackendSnapshotMethod() throws Exception {
Whitebox.setInternalState(operator, "operatorStateBackend", operatorStateBackend);
Whitebox.setInternalState(operator, "keyedStateBackend", keyedStateBackend);
- Whitebox.setInternalState(operator, "checkpointStreamFactory", streamFactory);
try {
- operator.snapshotState(checkpointId, timestamp, CheckpointOptions.forCheckpointWithDefaultLocation());
+ operator.snapshotState(
+ checkpointId,
+ timestamp,
+ CheckpointOptions.forCheckpointWithDefaultLocation(),
+ new MemCheckpointStreamFactory(Integer.MAX_VALUE));
fail("Exception expected.");
} catch (Exception e) {
assertEquals(failingException, e.getCause());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 84c74d6825e10..f82e403874076 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -91,7 +91,7 @@ public class AbstractUdfStreamOperatorLifecycleTest {
"setup[class org.apache.flink.streaming.runtime.tasks.StreamTask, class " +
"org.apache.flink.streaming.api.graph.StreamConfig, interface " +
"org.apache.flink.streaming.api.operators.Output], " +
- "snapshotState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions]]";
+ "snapshotState[long, long, class org.apache.flink.runtime.checkpoint.CheckpointOptions, interface org.apache.flink.runtime.state.CheckpointStreamFactory]]";
private static final String ALL_METHODS_RICH_FUNCTION = "[close[], getIterationRuntimeContext[], getRuntimeContext[]" +
", open[class org.apache.flink.configuration.Configuration], setRuntimeContext[interface " +
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
index 3495bc04d7d2a..6b69707bc6de2 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/StreamTaskStateInitializerImplTest.java
@@ -33,7 +33,6 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupStatePartitionStreamProvider;
import org.apache.flink.runtime.state.OperatorStateBackend;
@@ -52,8 +51,6 @@
import org.junit.Assert;
import org.junit.Test;
-import javax.annotation.Nullable;
-
import java.io.Closeable;
import java.io.IOException;
import java.util.Collections;
@@ -107,14 +104,9 @@ public void testNoRestore() throws Exception {
any(Environment.class),
any(String.class));
- verify(stateBackend).createStreamFactory(
- any(JobID.class),
- any(String.class));
-
OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend();
AbstractKeyedStateBackend> keyedStateBackend = stateContext.keyedStateBackend();
InternalTimeServiceManager, ?> timeServiceManager = stateContext.internalTimerServiceManager();
- CheckpointStreamFactory streamFactory = stateContext.checkpointStreamFactory();
CloseableIterable keyedStateInputs = stateContext.rawKeyedStateInputs();
CloseableIterable operatorStateInputs = stateContext.rawOperatorStateInputs();
@@ -122,7 +114,6 @@ public void testNoRestore() throws Exception {
Assert.assertNotNull(operatorStateBackend);
Assert.assertNotNull(keyedStateBackend);
Assert.assertNotNull(timeServiceManager);
- Assert.assertNotNull(streamFactory);
Assert.assertNotNull(keyedStateInputs);
Assert.assertNotNull(operatorStateInputs);
@@ -157,18 +148,6 @@ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException
return null;
}
- @Override
- public CheckpointStreamFactory createStreamFactory(
- JobID jobId, String operatorIdentifier) throws IOException {
- return mock(CheckpointStreamFactory.class);
- }
-
- @Override
- public CheckpointStreamFactory createSavepointStreamFactory(
- JobID jobId, String operatorIdentifier, @Nullable String targetLocation) throws IOException {
- return mock(CheckpointStreamFactory.class);
- }
-
@Override
public AbstractKeyedStateBackend createKeyedStateBackend(
Environment env,
@@ -243,14 +222,9 @@ public OperatorStateBackend createOperatorStateBackend(
any(Environment.class),
any(String.class));
- verify(mockingBackend).createStreamFactory(
- any(JobID.class),
- any(String.class));
-
OperatorStateBackend operatorStateBackend = stateContext.operatorStateBackend();
AbstractKeyedStateBackend> keyedStateBackend = stateContext.keyedStateBackend();
InternalTimeServiceManager, ?> timeServiceManager = stateContext.internalTimerServiceManager();
- CheckpointStreamFactory streamFactory = stateContext.checkpointStreamFactory();
CloseableIterable keyedStateInputs = stateContext.rawKeyedStateInputs();
CloseableIterable operatorStateInputs = stateContext.rawOperatorStateInputs();
@@ -260,7 +234,6 @@ public OperatorStateBackend createOperatorStateBackend(
Assert.assertNotNull(keyedStateBackend);
// this is deactivated on purpose so that it does not attempt to consume the raw keyed state.
Assert.assertNull(timeServiceManager);
- Assert.assertNotNull(streamFactory);
Assert.assertNotNull(keyedStateInputs);
Assert.assertNotNull(operatorStateInputs);
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
index ab97a18d880b4..f88857b60d746 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTerminationTest.java
@@ -60,6 +60,7 @@
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
@@ -75,8 +76,6 @@
import org.junit.Test;
-import javax.annotation.Nullable;
-
import java.io.IOException;
import java.util.Collections;
import java.util.concurrent.Callable;
@@ -258,17 +257,7 @@ public StreamStateHandle resolveCheckpoint(String pointer) throws IOException {
@Override
public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
- throw new UnsupportedOperationException();
- }
-
- @Override
- public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
- return mock(CheckpointStreamFactory.class);
- }
-
- @Override
- public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId, String operatorIdentifier, @Nullable String targetLocation) throws IOException {
- return null;
+ return new MemoryBackendCheckpointStorage(jobId, null, null, Integer.MAX_VALUE);
}
@Override
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 5fb683f78c0e7..9ee35ee56214f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -64,6 +64,7 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
+import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.KeyGroupRange;
@@ -74,11 +75,11 @@
import org.apache.flink.runtime.state.StateBackendFactory;
import org.apache.flink.runtime.state.StateInitializationContext;
import org.apache.flink.runtime.state.StatePartitionStreamProvider;
-import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TaskLocalStateStore;
import org.apache.flink.runtime.state.TaskStateManager;
import org.apache.flink.runtime.state.TaskStateManagerImpl;
import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.runtime.state.memory.MemoryBackendCheckpointStorage;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
@@ -346,9 +347,9 @@ public void testFailingCheckpointStreamOperator() throws Exception {
final Exception testException = new Exception("Test exception");
- when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1);
- when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
- when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenThrow(testException);
+ when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
+ when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
+ when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenThrow(testException);
OperatorID operatorID1 = new OperatorID();
OperatorID operatorID2 = new OperatorID();
@@ -369,6 +370,7 @@ public void testFailingCheckpointStreamOperator() throws Exception {
Whitebox.setInternalState(streamTask, "operatorChain", operatorChain);
Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+ Whitebox.setInternalState(streamTask, "checkpointStorage", new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
CheckpointExceptionHandler checkpointExceptionHandler =
@@ -423,9 +425,9 @@ public void testFailingAsyncCheckpointRunnable() throws Exception {
when(operatorSnapshotResult3.getOperatorStateRawFuture()).thenReturn(failingFuture);
- when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult1);
- when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult2);
- when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult3);
+ when(streamOperator1.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult1);
+ when(streamOperator2.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult2);
+ when(streamOperator3.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult3);
OperatorID operatorID1 = new OperatorID();
OperatorID operatorID2 = new OperatorID();
@@ -445,6 +447,7 @@ public void testFailingAsyncCheckpointRunnable() throws Exception {
Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", new DirectExecutorService());
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
+ Whitebox.setInternalState(streamTask, "checkpointStorage", new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
CheckpointExceptionHandlerFactory checkpointExceptionHandlerFactory = new CheckpointExceptionHandlerFactory();
CheckpointExceptionHandler checkpointExceptionHandler =
@@ -529,24 +532,14 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
new DoneFuture<>(managedOperatorStateHandle),
new DoneFuture<>(rawOperatorStateHandle));
- when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult);
+ when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
StreamOperator>[] streamOperators = {streamOperator};
OperatorChain> operatorChain = mock(OperatorChain.class);
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
- StreamStateHandle streamStateHandle = mock(StreamStateHandle.class);
-
- CheckpointStreamFactory.CheckpointStateOutputStream outStream = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-
- when(outStream.closeAndGetHandle()).thenReturn(streamStateHandle);
-
- CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
- when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(outStream);
-
- AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
- when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+ CheckpointStorage checkpointStorage = new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE);
Whitebox.setInternalState(streamTask, "isRunning", true);
Whitebox.setInternalState(streamTask, "lock", new Object());
@@ -554,7 +547,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newFixedThreadPool(1));
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
- Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+ Whitebox.setInternalState(streamTask, "checkpointStorage", checkpointStorage);
streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
@@ -658,24 +651,14 @@ public OperatorSubtaskState answer(InvocationOnMock invocation) throws Throwable
new DoneFuture<>(managedOperatorStateHandle),
new DoneFuture<>(rawOperatorStateHandle));
- when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class))).thenReturn(operatorSnapshotResult);
+ when(streamOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class))).thenReturn(operatorSnapshotResult);
StreamOperator>[] streamOperators = {streamOperator};
OperatorChain> operatorChain = mock(OperatorChain.class);
when(operatorChain.getAllOperators()).thenReturn(streamOperators);
- StreamStateHandle streamStateHandle = mock(StreamStateHandle.class);
-
- CheckpointStreamFactory.CheckpointStateOutputStream outStream = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
-
- when(outStream.closeAndGetHandle()).thenReturn(streamStateHandle);
-
- CheckpointStreamFactory mockStreamFactory = mock(CheckpointStreamFactory.class);
- when(mockStreamFactory.createCheckpointStateOutputStream(anyLong(), anyLong())).thenReturn(outStream);
-
- AbstractStateBackend mockStateBackend = mock(AbstractStateBackend.class);
- when(mockStateBackend.createStreamFactory(any(JobID.class), anyString())).thenReturn(mockStreamFactory);
+ CheckpointStorage checkpointStorage = new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE);
ExecutorService executor = Executors.newFixedThreadPool(1);
@@ -685,7 +668,7 @@ public OperatorSubtaskState answer(InvocationOnMock invocation) throws Throwable
Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", executor);
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
- Whitebox.setInternalState(streamTask, "stateBackend", mockStateBackend);
+ Whitebox.setInternalState(streamTask, "checkpointStorage", checkpointStorage);
streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
@@ -773,7 +756,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
// mock the returned empty snapshot result (all state handles are null)
OperatorSnapshotResult statelessOperatorSnapshotResult = new OperatorSnapshotResult();
- when(statelessOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class)))
+ when(statelessOperator.snapshotState(anyLong(), anyLong(), any(CheckpointOptions.class), any(CheckpointStreamFactory.class)))
.thenReturn(statelessOperatorSnapshotResult);
// set up the task
@@ -787,6 +770,7 @@ public Object answer(InvocationOnMock invocation) throws Throwable {
Whitebox.setInternalState(streamTask, "cancelables", new CloseableRegistry());
Whitebox.setInternalState(streamTask, "configuration", new StreamConfig(new Configuration()));
Whitebox.setInternalState(streamTask, "asyncOperationsThreadPool", Executors.newCachedThreadPool());
+ Whitebox.setInternalState(streamTask, "checkpointStorage", new MemoryBackendCheckpointStorage(new JobID(), null, null, Integer.MAX_VALUE));
streamTask.triggerCheckpoint(checkpointMetaData, CheckpointOptions.forCheckpointWithDefaultLocation());
checkpointCompletedLatch.await(30, TimeUnit.SECONDS);
@@ -904,7 +888,7 @@ public int compare(Tuple2> o1, Tuple2 notifyWhenExecutionState(ExecutionState executionState) {
synchronized (priorityQueue) {
if (this.executionState != null && this.executionState.ordinal() >= executionState.ordinal()) {
- return Futures.successful(executionState);
+ return Futures.successful(executionState);
} else {
Promise promise = new Promise.DefaultPromise();
@@ -1159,11 +1143,6 @@ public AbstractKeyedStateBackend> keyedStateBackend() {
return timeServiceManager != null ? spy(timeServiceManager) : null;
}
- @Override
- public CheckpointStreamFactory checkpointStreamFactory() {
- return context.checkpointStreamFactory();
- }
-
@Override
public CloseableIterable rawOperatorStateInputs() {
return replaceWithSpy(context.rawOperatorStateInputs());
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
index 1525848d95758..6df33b7edd5d6 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/TaskCheckpointingBehaviourTest.java
@@ -61,10 +61,12 @@
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.OperatorStateCheckpointOutputStream;
import org.apache.flink.runtime.state.OperatorStateHandle;
+import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateSnapshotContext;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
+import org.apache.flink.runtime.state.testutils.BackendForTestStream;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.taskmanager.Task;
import org.apache.flink.runtime.taskmanager.TaskManagerActions;
@@ -128,8 +130,10 @@ public void testTaskFailingOnCheckpointErrorInAsyncPart() throws Exception {
@Test
public void testBlockingNonInterruptibleCheckpoint() throws Exception {
+ StateBackend lockingStateBackend = new BackendForTestStream(LockingOutputStream::new);
+
Task task =
- createTask(new TestOperator(), new LockingStreamStateBackend(), mock(CheckpointResponder.class), true);
+ createTask(new TestOperator(), lockingStateBackend, mock(CheckpointResponder.class), true);
// start the task and wait until it is in "restore"
task.startTaskThread();
@@ -182,7 +186,7 @@ private Throwable runTestTaskFailingOnCheckpointError(AbstractStateBackend backe
private static Task createTask(
StreamOperator> op,
- AbstractStateBackend backend,
+ StateBackend backend,
CheckpointResponder checkpointResponder,
boolean failOnCheckpointErrors) throws IOException {
@@ -352,41 +356,9 @@ public AsyncFailureInducingStateBackend configure(Configuration config) {
}
// ------------------------------------------------------------------------
- // state backend with locking output stream.
+ // locking output stream.
// ------------------------------------------------------------------------
- private static class LockingStreamStateBackend extends MemoryStateBackend {
-
- private static final long serialVersionUID = 1L;
-
- @Override
- public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
- return new LockingOutputStreamFactory();
- }
-
- @Override
- public OperatorStateBackend createOperatorStateBackend(Environment env, String operatorIdentifier) throws Exception {
- return new DefaultOperatorStateBackend(
- getClass().getClassLoader(),
- new ExecutionConfig(),
- true);
- }
-
- @Override
- public LockingStreamStateBackend configure(Configuration config) {
- // retain this instance, no re-configuration!
- return this;
- }
- }
-
- private static final class LockingOutputStreamFactory implements CheckpointStreamFactory {
-
- @Override
- public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) {
- return new LockingOutputStream();
- }
- }
-
private static final class LockingOutputStream extends CheckpointStateOutputStream {
private final Object lock = new Object();
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 26026036eaba8..966d205812632 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -35,7 +35,8 @@
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
+import org.apache.flink.runtime.state.CheckpointStorage;
+import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.OperatorStateHandle;
@@ -67,6 +68,7 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -113,6 +115,7 @@ public class AbstractStreamOperatorTestHarness implements AutoCloseable {
// use this as default for tests
protected StateBackend stateBackend = new MemoryStateBackend();
+ private CheckpointStorage checkpointStorage = stateBackend.createCheckpointStorage(new JobID());
private final Object checkpointLock;
@@ -219,18 +222,7 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
}
}).when(mockTask).handleAsyncException(any(String.class), any(Throwable.class));
- try {
- doAnswer(new Answer() {
- @Override
- public CheckpointStreamFactory answer(InvocationOnMock invocationOnMock) throws Throwable {
-
- final StreamOperator> operator = (StreamOperator>) invocationOnMock.getArguments()[0];
- return stateBackend.createStreamFactory(new JobID(), operator.getClass().getSimpleName());
- }
- }).when(mockTask).createCheckpointStreamFactory(any(StreamOperator.class));
- } catch (Exception e) {
- throw new RuntimeException(e.getMessage(), e);
- }
+ when(mockTask.getCheckpointStorage()).thenAnswer((invocationOnMock) -> this.checkpointStorage);
doAnswer(new Answer() {
@Override
@@ -253,6 +245,12 @@ protected StreamTaskStateInitializer createStreamTaskStateManager(
public void setStateBackend(StateBackend stateBackend) {
this.stateBackend = stateBackend;
+
+ try {
+ this.checkpointStorage = stateBackend.createCheckpointStorage(new JobID());
+ } catch (IOException e) {
+ throw new RuntimeException(e.getMessage(), e);
+ }
}
public Object getCheckpointLock() {
@@ -470,14 +468,15 @@ public void open() throws Exception {
}
/**
- * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions)}.
+ * Calls {@link StreamOperator#snapshotState(long, long, CheckpointOptions, org.apache.flink.runtime.state.CheckpointStreamFactory)}.
*/
public OperatorStateHandles snapshot(long checkpointId, long timestamp) throws Exception {
OperatorSnapshotResult operatorStateResult = operator.snapshotState(
checkpointId,
timestamp,
- CheckpointOptions.forCheckpointWithDefaultLocation());
+ CheckpointOptions.forCheckpointWithDefaultLocation(),
+ checkpointStorage.resolveCheckpointStorageLocation(checkpointId, CheckpointStorageLocationReference.getDefault()));
KeyedStateHandle keyedManaged = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateManagedFuture());
KeyedStateHandle keyedRaw = FutureUtil.runIfNotDoneAndGet(operatorStateResult.getKeyedStateRawFuture());
diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
index 9c023d44caf87..df8d40e200587 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StateBackendITCase.java
@@ -30,7 +30,6 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
-import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
@@ -101,19 +100,7 @@ public StreamStateHandle resolveCheckpoint(String pointer) throws IOException {
@Override
public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException {
- return new MemoryBackendCheckpointStorage(jobId);
- }
-
- @Override
- public CheckpointStreamFactory createStreamFactory(JobID jobId,
- String operatorIdentifier) throws IOException {
- throw new SuccessException();
- }
-
- @Override
- public CheckpointStreamFactory createSavepointStreamFactory(JobID jobId,
- String operatorIdentifier, String targetLocation) throws IOException {
- throw new SuccessException();
+ return new MemoryBackendCheckpointStorage(jobId, null, null, 1_000_000);
}
@Override