Skip to content

Commit

Permalink
[FLINK-8531] [checkpoints] (part 6) Tasks resolve CheckpointStreamFac…
Browse files Browse the repository at this point in the history
…tory from CheckpointStorage and Checkpoint Location Reference to persist checkpoint data.
  • Loading branch information
StephanEwen committed Feb 1, 2018
1 parent bb19e7f commit 1887187
Show file tree
Hide file tree
Showing 43 changed files with 683 additions and 706 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStorage;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand Down Expand Up @@ -197,11 +196,11 @@ public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheck
* here where the snapshots from RocksDB would be stored.
*
* <p>The snapshots of the RocksDB state will be stored using the given backend's
* {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
* {@link StateBackend#createCheckpointStorage(JobID)}.
*
* @param checkpointStreamBackend The backend write the checkpoint streams to.
*/
public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
public RocksDBStateBackend(StateBackend checkpointStreamBackend) {
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
}

Expand All @@ -211,11 +210,28 @@ public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
* here where the snapshots from RocksDB would be stored.
*
* <p>The snapshots of the RocksDB state will be stored using the given backend's
* {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
* {@link StateBackend#createCheckpointStorage(JobID)}.
*
* @param checkpointStreamBackend The backend write the checkpoint streams to.
* @param enableIncrementalCheckpointing True if incremental checkpointing is enabled.
*/
public RocksDBStateBackend(StateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) {
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
}

/**
* @deprecated Use {@link #RocksDBStateBackend(StateBackend)} instead.
*/
@Deprecated
public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
}

/**
* @deprecated Use {@link #RocksDBStateBackend(StateBackend, boolean)} instead.
*/
@Deprecated
public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) {
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
Expand Down Expand Up @@ -367,20 +383,6 @@ public CheckpointStorage createCheckpointStorage(JobID jobId) throws IOException
return checkpointStreamBackend.createCheckpointStorage(jobId);
}

@Override
public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
return checkpointStreamBackend.createStreamFactory(jobId, operatorIdentifier);
}

@Override
public CheckpointStreamFactory createSavepointStreamFactory(
JobID jobId,
String operatorIdentifier,
String targetLocation) throws IOException {

return checkpointStreamBackend.createSavepointStreamFactory(jobId, operatorIdentifier, targetLocation);
}

// ------------------------------------------------------------------------
// State holding data structures
// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
Expand All @@ -39,15 +38,20 @@
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.CheckpointStreamFactory.CheckpointStateOutputStream;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StreamStateHandle;
import org.apache.flink.runtime.state.TestTaskStateManager;
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.state.testutils.BackendForTestStream;
import org.apache.flink.runtime.state.testutils.BackendForTestStream.StreamFactory;
import org.apache.flink.runtime.state.testutils.TestCheckpointStreamFactory;
import org.apache.flink.runtime.taskmanager.CheckpointResponder;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
Expand Down Expand Up @@ -82,13 +86,7 @@

import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;

/**
Expand All @@ -103,7 +101,7 @@ public class RocksDBAsyncSnapshotTest extends TestLogger {
* Temporary fold for test.
*/
@Rule
public TemporaryFolder temporaryFolder = new TemporaryFolder();
public final TemporaryFolder temporaryFolder = new TemporaryFolder();

/**
* This ensures that asynchronous state handles are actually materialized asynchronously.
Expand Down Expand Up @@ -244,19 +242,13 @@ public void testCancelFullyAsyncCheckpoints() throws Exception {

testHarness.setupOutputForSingletonOperatorChain();

testHarness.configureForKeyedStream(new KeySelector<String, String>() {
@Override
public String getKey(String value) throws Exception {
return value;
}
}, BasicTypeInfo.STRING_TYPE_INFO);
testHarness.configureForKeyedStream(value -> value, BasicTypeInfo.STRING_TYPE_INFO);

StreamConfig streamConfig = testHarness.getStreamConfig();

File dbDir = temporaryFolder.newFolder();

BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend();

// this is the proper instance that we need to call.
BlockerCheckpointStreamFactory blockerCheckpointStreamFactory =
new BlockerCheckpointStreamFactory(4 * 1024 * 1024) {

Expand All @@ -278,9 +270,12 @@ public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointS
}
};

BlockingStreamMemoryStateBackend.blockerCheckpointStreamFactory = blockerCheckpointStreamFactory;
// to avoid serialization of the above factory instance, we need to pass it in
// through a static variable

StateBackend stateBackend = new BackendForTestStream(new StaticForwardFactory(blockerCheckpointStreamFactory));

RocksDBStateBackend backend = new RocksDBStateBackend(memoryStateBackend);
RocksDBStateBackend backend = new RocksDBStateBackend(stateBackend);
backend.setDbStoragePath(dbDir.getAbsolutePath());

streamConfig.setStateBackend(backend);
Expand Down Expand Up @@ -354,17 +349,10 @@ public void testCleanupOfSnapshotsInFailureCase() throws Exception {

Environment env = new DummyEnvironment("test task", 1, 0);

CheckpointStreamFactory.CheckpointStateOutputStream outputStream = mock(CheckpointStreamFactory.CheckpointStateOutputStream.class);
CheckpointStreamFactory checkpointStreamFactory = mock(CheckpointStreamFactory.class);
AbstractStateBackend stateBackend = mock(AbstractStateBackend.class);

final IOException testException = new IOException("Test exception");
CheckpointStateOutputStream outputStream = spy(new FailingStream(testException));

doReturn(checkpointStreamFactory).when(stateBackend).createStreamFactory(any(JobID.class), anyString());
doThrow(testException).when(outputStream).write(anyInt());
doReturn(outputStream).when(checkpointStreamFactory).createCheckpointStateOutputStream(eq(checkpointId), eq(timestamp));

RocksDBStateBackend backend = new RocksDBStateBackend(stateBackend);
RocksDBStateBackend backend = new RocksDBStateBackend((StateBackend) new MemoryStateBackend());

backend.setDbStoragePath(temporaryFolder.newFolder().toURI().toString());

Expand All @@ -388,7 +376,9 @@ public void testCleanupOfSnapshotsInFailureCase() throws Exception {
new ValueStateDescriptor<>("foobar", String.class));

RunnableFuture<KeyedStateHandle> snapshotFuture = keyedStateBackend.snapshot(
checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forCheckpointWithDefaultLocation());
checkpointId, timestamp,
new TestCheckpointStreamFactory(() -> outputStream),
CheckpointOptions.forCheckpointWithDefaultLocation());

try {
FutureUtil.runIfNotDoneAndGet(snapshotFuture);
Expand Down Expand Up @@ -428,25 +418,6 @@ public void testConsistentSnapshotSerializationFlagsAndMasks() {

// ------------------------------------------------------------------------

/**
* Creates us a CheckpointStateOutputStream that blocks write ops on a latch to delay writing of snapshots.
*/
static class BlockingStreamMemoryStateBackend extends MemoryStateBackend {

public static volatile BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = null;

@Override
public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
return blockerCheckpointStreamFactory;
}

@Override
public BlockingStreamMemoryStateBackend configure(Configuration config) {
// retain this instance, no re-configuration!
return this;
}
}

private static class AsyncCheckpointOperator
extends AbstractStreamOperator<String>
implements OneInputStreamOperator<String, String> {
Expand Down Expand Up @@ -476,4 +447,61 @@ public void processElement(StreamRecord<String> element) throws Exception {
state.update(element.getValue());
}
}

// ------------------------------------------------------------------------
// failing stream
// ------------------------------------------------------------------------

private static class StaticForwardFactory implements StreamFactory {

static CheckpointStreamFactory factory;

StaticForwardFactory(CheckpointStreamFactory factory) {
StaticForwardFactory.factory = factory;
}

@Override
public CheckpointStateOutputStream get() throws Exception {
return factory.createCheckpointStateOutputStream(1L, 1L);
}
}

private static class FailingStream extends CheckpointStateOutputStream {

private final IOException testException;

FailingStream(IOException testException) {
this.testException = testException;
}

@Override
public StreamStateHandle closeAndGetHandle() throws IOException {
throw new UnsupportedOperationException();
}

@Override
public long getPos() throws IOException {
throw new UnsupportedOperationException();
}

@Override
public void write(int b) throws IOException {
throw testException;
}

@Override
public void flush() throws IOException {
throw testException;
}

@Override
public void sync() throws IOException {
throw testException;
}

@Override
public void close() throws IOException {
throw new UnsupportedOperationException();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
import org.apache.flink.util.IOUtils;
Expand All @@ -55,11 +56,7 @@
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

/**
Expand Down Expand Up @@ -379,13 +376,9 @@ public void testRocksDbReconfigurationCopiesExistingValues() throws Exception {

@Test
public void testCallsForwardedToNonPartitionedBackend() throws Exception {
AbstractStateBackend nonPartBackend = mock(AbstractStateBackend.class);
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(nonPartBackend);

Environment env = getMockEnvironment();
rocksDbBackend.createStreamFactory(env.getJobID(), "foobar");

verify(nonPartBackend, times(1)).createStreamFactory(any(JobID.class), anyString());
AbstractStateBackend storageBackend = new MemoryStateBackend();
RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(storageBackend);
assertEquals(storageBackend, rocksDbBackend.getCheckpointBackend());
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,8 +133,11 @@ public void testStateOutputStream() {
try {
FsStateBackend backend = CommonTestUtils.createCopySerializable(new FsStateBackend(basePath, 15));
JobID jobId = new JobID();
long checkpointId = 97231523452L;

CheckpointStreamFactory streamFactory = backend.createStreamFactory(jobId, "test_op");
CheckpointStreamFactory streamFactory = backend
.createCheckpointStorage(jobId)
.initializeLocationForCheckpoint(checkpointId);

// we know how FsCheckpointStreamFactory is implemented so we know where it
// will store checkpoints
Expand All @@ -151,8 +154,6 @@ public void testStateOutputStream() {
rnd.nextBytes(state3);
rnd.nextBytes(state4);

long checkpointId = 97231523452L;

CheckpointStreamFactory.CheckpointStateOutputStream stream1 =
streamFactory.createCheckpointStateOutputStream(checkpointId, System.currentTimeMillis());
CheckpointStreamFactory.CheckpointStateOutputStream stream2 =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;

import javax.annotation.Nullable;
import java.io.IOException;

/**
Expand All @@ -37,21 +36,6 @@ public abstract class AbstractStateBackend implements StateBackend, java.io.Seri

private static final long serialVersionUID = 4620415814639230247L;

// ------------------------------------------------------------------------
// State Backend - Persisting Byte Storage
// ------------------------------------------------------------------------

@Override
public abstract CheckpointStreamFactory createStreamFactory(
JobID jobId,
String operatorIdentifier) throws IOException;

@Override
public abstract CheckpointStreamFactory createSavepointStreamFactory(
JobID jobId,
String operatorIdentifier,
@Nullable String targetLocation) throws IOException;

// ------------------------------------------------------------------------
// State Backend - State-Holding Backends
// ------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 1887187

Please sign in to comment.