From dbf4c865f712bcac3bf039ed5b10b9ae2e5809ce Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Thu, 19 Oct 2017 13:10:21 +0800 Subject: [PATCH] [FLINK-5372] [tests] Fix RocksDBAsyncSnapshotTest.testCancelFullyAsyncCheckpoints() --- .../state/RocksDBAsyncSnapshotTest.java | 151 ++++++++---------- .../state/RocksDBStateBackendConfigTest.java | 45 ++++-- .../apache/flink/util/ResourceGuardTest.java | 2 +- .../util/BlockerCheckpointStreamFactory.java | 16 +- 4 files changed, 107 insertions(+), 107 deletions(-) diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java index 1f1b65a3f06b1..b519b1a19aa1c 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBAsyncSnapshotTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.execution.CancelTaskException; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.runtime.operators.testutils.DummyEnvironment; @@ -45,19 +46,20 @@ import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.runtime.state.memory.MemCheckpointStreamFactory; import org.apache.flink.runtime.state.memory.MemoryStateBackend; +import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.AsynchronousException; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; import org.apache.flink.streaming.runtime.tasks.StreamMockEnvironment; import org.apache.flink.streaming.runtime.tasks.StreamTask; import org.apache.flink.util.FutureUtil; +import org.apache.flink.util.IOUtils; +import org.apache.flink.util.TestLogger; import org.junit.Assert; -import org.junit.Ignore; import org.junit.Test; import org.junit.runner.RunWith; import org.powermock.core.classloader.annotations.PowerMockIgnore; @@ -69,7 +71,6 @@ import java.util.Arrays; import java.util.Map; import java.util.UUID; -import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.RunnableFuture; @@ -92,7 +93,7 @@ @RunWith(PowerMockRunner.class) @PowerMockIgnore({"javax.management.*", "com.sun.jndi.*", "org.apache.log4j.*"}) @SuppressWarnings("serial") -public class RocksDBAsyncSnapshotTest { +public class RocksDBAsyncSnapshotTest extends TestLogger { /** * This ensures that asynchronous state handles are actually materialized asynchronously. @@ -209,7 +210,6 @@ public void acknowledgeCheckpoint( * @throws Exception */ @Test - @Ignore public void testCancelFullyAsyncCheckpoints() throws Exception { final OneInputStreamTask task = new OneInputStreamTask<>(); @@ -229,6 +229,29 @@ public String getKey(String value) throws Exception { BlockingStreamMemoryStateBackend memoryStateBackend = new BlockingStreamMemoryStateBackend(); + BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = + new BlockerCheckpointStreamFactory(4 * 1024 * 1024) { + + int count = 1; + + @Override + public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream( + long checkpointID, + long timestamp) throws Exception { + + // we skip the first created stream, because it is used to checkpoint the timer service, which is + // currently not asynchronous. + if (count > 0) { + --count; + return new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize); + } else { + return super.createCheckpointStateOutputStream(checkpointID, timestamp); + } + } + }; + + BlockingStreamMemoryStateBackend.blockerCheckpointStreamFactory = blockerCheckpointStreamFactory; + RocksDBStateBackend backend = new RocksDBStateBackend(memoryStateBackend); backend.setDbStoragePath(dbDir.getAbsolutePath()); @@ -244,8 +267,8 @@ public String getKey(String value) throws Exception { new MockInputSplitProvider(), testHarness.bufferSize); - BlockingStreamMemoryStateBackend.waitFirstWriteLatch = new OneShotLatch(); - BlockingStreamMemoryStateBackend.unblockCancelLatch = new OneShotLatch(); + blockerCheckpointStreamFactory.setBlockerLatch(new OneShotLatch()); + blockerCheckpointStreamFactory.setWaiterLatch(new OneShotLatch()); testHarness.invoke(mockEnv); @@ -259,39 +282,31 @@ public String getKey(String value) throws Exception { } } - task.triggerCheckpoint(new CheckpointMetaData(42, 17), CheckpointOptions.forFullCheckpoint()); + task.triggerCheckpoint( + new CheckpointMetaData(42, 17), + CheckpointOptions.forFullCheckpoint()); + testHarness.processElement(new StreamRecord<>("Wohoo", 0)); - BlockingStreamMemoryStateBackend.waitFirstWriteLatch.await(); + blockerCheckpointStreamFactory.getWaiterLatch().await(); task.cancel(); - BlockingStreamMemoryStateBackend.unblockCancelLatch.trigger(); + blockerCheckpointStreamFactory.getBlockerLatch().trigger(); testHarness.endInput(); - try { + Assert.assertTrue(blockerCheckpointStreamFactory.getLastCreatedStream().isClosed()); + try { ExecutorService threadPool = task.getAsyncOperationsThreadPool(); threadPool.shutdown(); Assert.assertTrue(threadPool.awaitTermination(60_000, TimeUnit.MILLISECONDS)); testHarness.waitForTaskCompletion(); - if (mockEnv.wasFailedExternally()) { - throw new AsynchronousException(new InterruptedException("Exception was thrown as expected.")); - } fail("Operation completed. Cancel failed."); } catch (Exception expected) { - AsynchronousException asynchronousException = null; - if (expected instanceof AsynchronousException) { - asynchronousException = (AsynchronousException) expected; - } else if (expected.getCause() instanceof AsynchronousException) { - asynchronousException = (AsynchronousException) expected.getCause(); - } else { + Throwable cause = expected.getCause(); + + if (!(cause instanceof CancelTaskException)) { fail("Unexpected exception: " + expected); } - - // we expect the exception from canceling snapshots - Throwable innerCause = asynchronousException.getCause(); - Assert.assertTrue("Unexpected inner cause: " + innerCause, - innerCause instanceof CancellationException //future canceled - || innerCause instanceof InterruptedException); //thread interrupted } } @@ -329,25 +344,31 @@ public void testCleanupOfSnapshotsInFailureCase() throws Exception { new KeyGroupRange(0, 0), null); - keyedStateBackend.restore(null); + try { - // register a state so that the state backend has to checkpoint something - keyedStateBackend.getPartitionedState( - "namespace", - StringSerializer.INSTANCE, - new ValueStateDescriptor<>("foobar", String.class)); + keyedStateBackend.restore(null); - RunnableFuture snapshotFuture = keyedStateBackend.snapshot( - checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint()); + // register a state so that the state backend has to checkpoint something + keyedStateBackend.getPartitionedState( + "namespace", + StringSerializer.INSTANCE, + new ValueStateDescriptor<>("foobar", String.class)); - try { - FutureUtil.runIfNotDoneAndGet(snapshotFuture); - fail("Expected an exception to be thrown here."); - } catch (ExecutionException e) { - Assert.assertEquals(testException, e.getCause()); - } + RunnableFuture snapshotFuture = keyedStateBackend.snapshot( + checkpointId, timestamp, checkpointStreamFactory, CheckpointOptions.forFullCheckpoint()); - verify(outputStream).close(); + try { + FutureUtil.runIfNotDoneAndGet(snapshotFuture); + fail("Expected an exception to be thrown here."); + } catch (ExecutionException e) { + Assert.assertEquals(testException, e.getCause()); + } + + verify(outputStream).close(); + } finally { + IOUtils.closeQuietly(keyedStateBackend); + keyedStateBackend.dispose(); + } } @Test @@ -379,55 +400,11 @@ public void testConsistentSnapshotSerializationFlagsAndMasks() { */ static class BlockingStreamMemoryStateBackend extends MemoryStateBackend { - public static volatile OneShotLatch waitFirstWriteLatch = null; - - public static volatile OneShotLatch unblockCancelLatch = null; - - private volatile boolean closed = false; + public static volatile BlockerCheckpointStreamFactory blockerCheckpointStreamFactory = null; @Override public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException { - return new MemCheckpointStreamFactory(4 * 1024 * 1024) { - @Override - public CheckpointStateOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { - - return new MemoryCheckpointOutputStream(4 * 1024 * 1024) { - @Override - public void write(int b) throws IOException { - waitFirstWriteLatch.trigger(); - try { - unblockCancelLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - if (closed) { - throw new IOException("Stream closed."); - } - super.write(b); - } - - @Override - public void write(byte[] b, int off, int len) throws IOException { - waitFirstWriteLatch.trigger(); - try { - unblockCancelLatch.await(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - if (closed) { - throw new IOException("Stream closed."); - } - super.write(b, off, len); - } - - @Override - public void close() { - closed = true; - super.close(); - } - }; - } - }; + return blockerCheckpointStreamFactory; } } diff --git a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java index 8ec29e27e5e4a..853d80fcd2e5e 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java @@ -27,12 +27,14 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.query.KvStateRegistry; +import org.apache.flink.runtime.state.AbstractKeyedStateBackend; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; import org.apache.commons.io.FileUtils; +import org.apache.commons.io.IOUtils; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -103,12 +105,17 @@ public void testSetDbPath() throws Exception { new KeyGroupRange(0, 0), env.getTaskKvStateRegistry()); - File instanceBasePath = keyedBackend.getInstanceBasePath(); - assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath()))); + try { + File instanceBasePath = keyedBackend.getInstanceBasePath(); + assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(testDir1.getAbsolutePath()), startsWith(testDir2.getAbsolutePath()))); - //noinspection NullArgumentToVariableArgMethod - rocksDbBackend.setDbStoragePaths(null); - assertNull(rocksDbBackend.getDbStoragePaths()); + //noinspection NullArgumentToVariableArgMethod + rocksDbBackend.setDbStoragePaths(null); + assertNull(rocksDbBackend.getDbStoragePaths()); + } finally { + IOUtils.closeQuietly(keyedBackend); + keyedBackend.dispose(); + } } @Test(expected = IllegalArgumentException.class) @@ -158,8 +165,13 @@ public void testUseTempDirectories() throws Exception { new KeyGroupRange(0, 0), env.getTaskKvStateRegistry()); - File instanceBasePath = keyedBackend.getInstanceBasePath(); - assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath()))); + try { + File instanceBasePath = keyedBackend.getInstanceBasePath(); + assertThat(instanceBasePath.getAbsolutePath(), anyOf(startsWith(dir1.getAbsolutePath()), startsWith(dir2.getAbsolutePath()))); + } finally { + IOUtils.closeQuietly(keyedBackend); + keyedBackend.dispose(); + } } // ------------------------------------------------------------------------ @@ -225,14 +237,17 @@ public void testContinueOnSomeDbDirectoriesMissing() throws Exception { try { Environment env = getMockEnvironment(); - rocksDbBackend.createKeyedStateBackend( - env, - env.getJobID(), - "foobar", - IntSerializer.INSTANCE, - 1, - new KeyGroupRange(0, 0), - new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID())); + AbstractKeyedStateBackend keyedStateBackend = rocksDbBackend.createKeyedStateBackend( + env, + env.getJobID(), + "foobar", + IntSerializer.INSTANCE, + 1, + new KeyGroupRange(0, 0), + new KvStateRegistry().createTaskRegistry(env.getJobID(), new JobVertexID())); + + IOUtils.closeQuietly(keyedStateBackend); + keyedStateBackend.dispose(); } catch (Exception e) { e.printStackTrace(); diff --git a/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java b/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java index a030a819dfa1e..98aae4defb1d2 100644 --- a/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java +++ b/flink-core/src/test/java/org/apache/flink/util/ResourceGuardTest.java @@ -24,7 +24,7 @@ import java.io.IOException; import java.util.concurrent.atomic.AtomicBoolean; -public class ResourceGuardTest { +public class ResourceGuardTest extends TestLogger { @Test public void testClose() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java index 98e654f8d5682..2091e005aaa73 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/BlockerCheckpointStreamFactory.java @@ -33,10 +33,10 @@ @Internal public class BlockerCheckpointStreamFactory implements CheckpointStreamFactory { - private final int maxSize; - private volatile int afterNumberInvocations; - private volatile OneShotLatch blocker; - private volatile OneShotLatch waiter; + protected final int maxSize; + protected volatile int afterNumberInvocations; + protected volatile OneShotLatch blocker; + protected volatile OneShotLatch waiter; MemCheckpointStreamFactory.MemoryCheckpointOutputStream lastCreatedStream; @@ -60,6 +60,14 @@ public void setWaiterLatch(OneShotLatch latch) { this.waiter = latch; } + public OneShotLatch getBlockerLatch() { + return blocker; + } + + public OneShotLatch getWaiterLatch() { + return waiter; + } + @Override public MemCheckpointStreamFactory.MemoryCheckpointOutputStream createCheckpointStateOutputStream(long checkpointID, long timestamp) throws Exception { this.lastCreatedStream = new MemCheckpointStreamFactory.MemoryCheckpointOutputStream(maxSize) {