Skip to content

Commit

Permalink
[FLINK-5372] [tests] Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyn…
Browse files Browse the repository at this point in the history
…cCheckpoints()
  • Loading branch information
StefanRRichter committed Oct 20, 2017
1 parent 479be9d commit dbf4c86
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 107 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.execution.CancelTaskException;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
Expand All @@ -45,19 +46,20 @@
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory;
import org.apache.flink.runtime.state.memory.MemoryStateBackend;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.AsynchronousException;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask;
import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness;
import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment;
import org.apache.flink.streaming.runtime.tasks.StreamTask;
import org.apache.flink.util.FutureUtil;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.powermock.core.classloader.annotations.PowerMockIgnore;
Expand All @@ -69,7 +71,6 @@
import java.util.Arrays;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RunnableFuture;
Expand All @@ -92,7 +93,7 @@
@RunWith(PowerMockRunner.class)
@PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"})
@SuppressWarnings("serial")
public class RocksDBAsyncSnapshotTest {
public class RocksDBAsyncSnapshotTest extends TestLogger {

/**
* This ensures that asynchronous state handles are actually materialized asynchronously.
Expand Down Expand Up @@ -209,7 +210,6 @@ public void acknowledgeCheckpoint(
* @throws Exception
*/
@Test
@Ignore
public void testCancelFullyAsyncCheckpoints() throws Exception {
final OneInputStreamTask<String, String> task = new OneInputStreamTask<>();

Expand All @@ -229,6 +229,29 @@ public String getKey(String value) throws Exception {

BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend();

BlockerCheckpointStreamFactory blockerCheckpointStreamFactory =
new BlockerCheckpointStreamFactory(4 * 1024 * 1024) {

int count = 1;

@Override
public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(
long checkpointID,
long timestamp) throws Exception {

// we skip the first created stream, because it is used to checkpoint the timer service, which is
// currently not asynchronous.
if (count > 0) {
--count;
return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize);
} else {
return super.createCheckpointStateOutputStream(checkpointID, timestamp);
}
}
};

BlockingStreamMemoryStateBackend.blockerCheckpointStreamFactory = blockerCheckpointStreamFactory;

RocksDBStateBackend backend = new RocksDBStateBackend(memoryStateBackend);
backend.setDbStoragePath(dbDir.getAbsolutePath());

Expand All @@ -244,8 +267,8 @@ public String getKey(String value) throws Exception {
new MockInputSplitProvider(),
testHarness.bufferSize);

BlockingStreamMemoryStateBackend.waitFirstWriteLatch = new OneShotLatch();
BlockingStreamMemoryStateBackend.unblockCancelLatch = new OneShotLatch();
blockerCheckpointStreamFactory.setBlockerLatch(new OneShotLatch());
blockerCheckpointStreamFactory.setWaiterLatch(new OneShotLatch());

testHarness.invoke(mockEnv);

Expand All @@ -259,39 +282,31 @@ public String getKey(String value) throws Exception {
}
}

task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint());
task.triggerCheckpoint(
new CheckpointMetaData(42, 17),
CheckpointOptions.forFullCheckpoint());

testHarness.processElement(new StreamRecord<>("Wohoo", 0));
BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await();
blockerCheckpointStreamFactory.getWaiterLatch().await();
task.cancel();
BlockingStreamMemoryStateBackend.unblockCancelLatch.trigger();
blockerCheckpointStreamFactory.getBlockerLatch().trigger();
testHarness.endInput();
try {
Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed());

try {
ExecutorService threadPool = task.getAsyncOperationsThreadPool();
threadPool.shutdown();
Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS));
testHarness.waitForTaskCompletion();

if (mockEnv.wasFailedExternally()) {
throw new AsynchronousException(new InterruptedException("Exception was thrown as expected."));
}
fail("Operation completed. Cancel failed.");
} catch (Exception expected) {
AsynchronousException asynchronousException = null;

if (expected instanceof AsynchronousException) {
asynchronousException = (AsynchronousException) expected;
} else if (expected.getCause() instanceof AsynchronousException) {
asynchronousException = (AsynchronousException) expected.getCause();
} else {
Throwable cause = expected.getCause();

if (!(cause instanceof CancelTaskException)) {
fail("Unexpected exception: " + expected);
}

// we expect the exception from canceling snapshots
Throwable innerCause = asynchronousException.getCause();
Assert.assertTrue("Unexpected inner cause: " + innerCause,
innerCause instanceof CancellationException //future canceled
|| innerCause instanceof InterruptedException); //thread interrupted
}
}

Expand Down Expand Up @@ -329,25 +344,31 @@ public void testCleanupOfSnapshotsInFailureCase() throws Exception {
new KeyGroupRange(0, 0),
null);

keyedStateBackend.restore(null);
try {

// register a state so that the state backend has to checkpoint something
keyedStateBackend.getPartitionedState(
"namespace",
StringSerializer.INSTANCE,
new ValueStateDescriptor<>("foobar", String.class));
keyedStateBackend.restore(null);

RunnableFuture<KeyedStateHandle> snapshotFuture = keyedStateBackend.snapshot(
checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint());
// register a state so that the state backend has to checkpoint something
keyedStateBackend.getPartitionedState(
"namespace",
StringSerializer.INSTANCE,
new ValueStateDescriptor<>("foobar", String.class));

try {
FutureUtil.runIfNotDoneAndGet(snapshotFuture);
fail("Expected an exception to be thrown here.");
} catch (ExecutionException e) {
Assert.assertEquals(testException, e.getCause());
}
RunnableFuture<KeyedStateHandle> snapshotFuture = keyedStateBackend.snapshot(
checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint());

verify(outputStream).close();
try {
FutureUtil.runIfNotDoneAndGet(snapshotFuture);
fail("Expected an exception to be thrown here.");
} catch (ExecutionException e) {
Assert.assertEquals(testException, e.getCause());
}

verify(outputStream).close();
} finally {
IOUtils.closeQuietly(keyedStateBackend);
keyedStateBackend.dispose();
}
}

@Test
Expand Down Expand Up @@ -379,55 +400,11 @@ public void testConsistentSnapshotSerializationFlagsAndMasks() {
*/
static class BlockingStreamMemoryStateBackend extends MemoryStateBackend {

public static volatile OneShotLatch waitFirstWriteLatch = null;

public static volatile OneShotLatch unblockCancelLatch = null;

private volatile boolean closed = false;
public static volatile BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = null;

@Override
public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
return new MemCheckpointStreamFactory(4 * 1024 * 1024) {
@Override
public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {

return new MemoryCheckpointOutputStream(4 * 1024 * 1024) {
@Override
public void write(int b) throws IOException {
waitFirstWriteLatch.trigger();
try {
unblockCancelLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (closed) {
throw new IOException("Stream closed.");
}
super.write(b);
}

@Override
public void write(byte[] b, int off, int len) throws IOException {
waitFirstWriteLatch.trigger();
try {
unblockCancelLatch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
if (closed) {
throw new IOException("Stream closed.");
}
super.write(b, off, len);
}

@Override
public void close() {
closed = true;
super.close();
}
};
}
};
return blockerCheckpointStreamFactory;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,14 @@
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;

import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
Expand Down Expand Up @@ -103,12 +105,17 @@ public void testSetDbPath() throws Exception {
new KeyGroupRange(0, 0),
env.getTaskKvStateRegistry());

File instanceBasePath = keyedBackend.getInstanceBasePath();
assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath())));
try {
File instanceBasePath = keyedBackend.getInstanceBasePath();
assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath())));

//noinspection NullArgumentToVariableArgMethod
rocksDbBackend.setDbStoragePaths(null);
assertNull(rocksDbBackend.getDbStoragePaths());
//noinspection NullArgumentToVariableArgMethod
rocksDbBackend.setDbStoragePaths(null);
assertNull(rocksDbBackend.getDbStoragePaths());
} finally {
IOUtils.closeQuietly(keyedBackend);
keyedBackend.dispose();
}
}

@Test(expected = IllegalArgumentException.class)
Expand Down Expand Up @@ -158,8 +165,13 @@ public void testUseTempDirectories() throws Exception {
new KeyGroupRange(0, 0),
env.getTaskKvStateRegistry());

File instanceBasePath = keyedBackend.getInstanceBasePath();
assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath())));
try {
File instanceBasePath = keyedBackend.getInstanceBasePath();
assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath())));
} finally {
IOUtils.closeQuietly(keyedBackend);
keyedBackend.dispose();
}
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -225,14 +237,17 @@ public void testContinueOnSomeDbDirectoriesMissing() throws Exception {

try {
Environment env = getMockEnvironment();
rocksDbBackend.createKeyedStateBackend(
env,
env.getJobID(),
"foobar",
IntSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));
AbstractKeyedStateBackend<Integer> keyedStateBackend = rocksDbBackend.createKeyedStateBackend(
env,
env.getJobID(),
"foobar",
IntSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID()));

IOUtils.closeQuietly(keyedStateBackend);
keyedStateBackend.dispose();
}
catch (Exception e) {
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;

public class ResourceGuardTest {
public class ResourceGuardTest extends TestLogger {

@Test
public void testClose() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,10 @@
@Internal
public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory {

private final int maxSize;
private volatile int afterNumberInvocations;
private volatile OneShotLatch blocker;
private volatile OneShotLatch waiter;
protected final int maxSize;
protected volatile int afterNumberInvocations;
protected volatile OneShotLatch blocker;
protected volatile OneShotLatch waiter;

MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream;

Expand All @@ -60,6 +60,14 @@ public void setWaiterLatch(OneShotLatch latch) {
this.waiter = latch;
}

public OneShotLatch getBlockerLatch() {
return blocker;
}

public OneShotLatch getWaiterLatch() {
return waiter;
}

@Override
public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception {
this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {
Expand Down

0 comments on commit dbf4c86

Please sign in to comment.