Skip to content

Commit

Permalink
[hotfix][tests] Consolidate the instantiation of the RocksDBKEyesStat…
Browse files Browse the repository at this point in the history
…eBackendBuilder for tests in a utility class.
  • Loading branch information
StephanEwen committed Dec 6, 2019
1 parent c8d69f4 commit 9da970f
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 113 deletions.
8 changes: 8 additions & 0 deletions flink-queryable-state/flink-queryable-state-runtime/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ under the License.
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-test-utils-junit</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,35 +18,20 @@

package org.apache.flink.queryablestate.network;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.typeutils.base.LongSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.contrib.streaming.state.PredefinedOptions;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackendBuilder;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.contrib.streaming.state.RocksDBTestUtils;
import org.apache.flink.queryablestate.client.VoidNamespace;
import org.apache.flink.queryablestate.client.VoidNamespaceSerializer;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.internal.InternalListState;
import org.apache.flink.runtime.state.internal.InternalMapState;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.DBOptions;

import java.util.Collections;

import static org.mockito.Mockito.mock;

/**
* Additional tests for the serialization and deserialization using
Expand All @@ -68,30 +53,10 @@ public final class KVStateRequestSerializerRocksDBTest {
public void testListSerialization() throws Exception {
final long key = 0L;

// objects for RocksDB state list serialisation
DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
dbOptions.setCreateIfMissing(true);
ExecutionConfig executionConfig = new ExecutionConfig();
final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
new RocksDBKeyedStateBackendBuilder<>(
"no-op",
ClassLoader.getSystemClassLoader(),
temporaryFolder.getRoot(),
dbOptions,
stateName -> PredefinedOptions.DEFAULT.createColumnOptions(),
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
executionConfig,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(executionConfig),
new CloseableRegistry()
).build();
final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend = RocksDBTestUtils
.builderForTestDefaults(temporaryFolder.getRoot(), LongSerializer.INSTANCE)
.build();

longHeapKeyedStateBackend.setCurrentKey(key);

final InternalListState<Long, VoidNamespace, Long> listState = longHeapKeyedStateBackend.createInternalState(VoidNamespaceSerializer.INSTANCE,
Expand All @@ -113,29 +78,10 @@ public void testMapSerialization() throws Exception {
final long key = 0L;

// objects for RocksDB state list serialisation
DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
dbOptions.setCreateIfMissing(true);
ExecutionConfig executionConfig = new ExecutionConfig();
final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend =
new RocksDBKeyedStateBackendBuilder<>(
"no-op",
ClassLoader.getSystemClassLoader(),
temporaryFolder.getRoot(),
dbOptions,
stateName -> PredefinedOptions.DEFAULT.createColumnOptions(),
mock(TaskKvStateRegistry.class),
LongSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
executionConfig,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(executionConfig),
new CloseableRegistry()
).build();
final RocksDBKeyedStateBackend<Long> longHeapKeyedStateBackend = RocksDBTestUtils
.builderForTestDefaults(temporaryFolder.getRoot(), LongSerializer.INSTANCE)
.build();

longHeapKeyedStateBackend.setCurrentKey(key);

final InternalMapState<Long, VoidNamespace, Long, String> mapState =
Expand Down
23 changes: 23 additions & 0 deletions flink-state-backends/flink-statebackend-rocksdb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -89,4 +89,27 @@ under the License.
<scope>test</scope>
</dependency>
</dependencies>

<!-- build a test jar -->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>test-jar</goal>
</goals>
<configuration>
<includes>
<include>**/org/apache/flink/contrib/streaming/state/RocksDBTestUtils*</include>
</includes>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,11 @@
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.IncrementalRemoteKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.LocalRecoveryConfig;
import org.apache.flink.runtime.state.SharedStateRegistry;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.StateBackendTestBase;
Expand All @@ -45,7 +37,6 @@
import org.apache.flink.runtime.state.VoidNamespace;
import org.apache.flink.runtime.state.VoidNamespaceSerializer;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.runtime.util.BlockerCheckpointStreamFactory;
import org.apache.flink.runtime.util.BlockingCheckpointOutputStream;
import org.apache.flink.util.IOUtils;
Expand Down Expand Up @@ -94,7 +85,6 @@
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.verify;
import static org.mockito.internal.verification.VerificationModeFactory.times;
import static org.powermock.api.mockito.PowerMockito.mock;
import static org.powermock.api.mockito.PowerMockito.spy;

/**
Expand Down Expand Up @@ -196,28 +186,13 @@ public void setupRocksKeyedStateBackend() throws Exception {
testStreamFactory.setAfterNumberInvocations(10);

prepareRocksDB();
Environment env = new DummyEnvironment("TestTask", 1, 0);

keyedStateBackend = new RocksDBKeyedStateBackendBuilder<>(
"Test",
Thread.currentThread().getContextClassLoader(),
instanceBasePath,
dbOptions,
stateName -> PredefinedOptions.DEFAULT.createColumnOptions(),
mock(TaskKvStateRegistry.class),
IntSerializer.INSTANCE,
2,
new KeyGroupRange(0, 1),
env.getExecutionConfig(),
env.getTaskStateManager().createLocalRecoveryConfig(),
RocksDBStateBackend.PriorityQueueStateType.ROCKSDB,
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(env.getExecutionConfig()),
spy(db),
defaultCFHandle,
new CloseableRegistry())

keyedStateBackend = RocksDBTestUtils.builderForTestDB(
instanceBasePath,
IntSerializer.INSTANCE,
spy(db),
defaultCFHandle,
PredefinedOptions.DEFAULT.createColumnOptions())
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing)
.build();

Expand Down Expand Up @@ -275,30 +250,17 @@ public void testCorrectMergeOperatorSet() throws Exception {
prepareRocksDB();
final ColumnFamilyOptions columnFamilyOptions = spy(new ColumnFamilyOptions());
RocksDBKeyedStateBackend<Integer> test = null;
try (DBOptions options = new DBOptions().setCreateIfMissing(true)) {
ExecutionConfig executionConfig = new ExecutionConfig();
test = new RocksDBKeyedStateBackendBuilder<>(
"test",
Thread.currentThread().getContextClassLoader(),

try {
test = RocksDBTestUtils.builderForTestDB(
tempFolder.newFolder(),
options,
stateName -> columnFamilyOptions,
mock(TaskKvStateRegistry.class),
IntSerializer.INSTANCE,
1,
new KeyGroupRange(0, 0),
executionConfig,
mock(LocalRecoveryConfig.class),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
AbstractStateBackend.getCompressionDecorator(executionConfig),
db,
defaultCFHandle,
new CloseableRegistry())
columnFamilyOptions)
.setEnableIncrementalCheckpointing(enableIncrementalCheckpointing)
.build();

ValueStateDescriptor<String> stubState1 =
new ValueStateDescriptor<>("StubState-1", StringSerializer.INSTANCE);
test.createInternalState(StringSerializer.INSTANCE, stubState1);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.query.KvStateRegistry;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;

import java.io.File;
import java.util.Collections;

/**
* Test utils for the RocksDB state backend.
*/
public final class RocksDBTestUtils {

public static <K> RocksDBKeyedStateBackendBuilder<K> builderForTestDefaults(
File instanceBasePath,
TypeSerializer<K> keySerializer) {

final DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
dbOptions.setCreateIfMissing(true);

return new RocksDBKeyedStateBackendBuilder<>(
"no-op",
ClassLoader.getSystemClassLoader(),
instanceBasePath,
dbOptions,
stateName -> PredefinedOptions.DEFAULT.createColumnOptions(),
new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()),
keySerializer,
2,
new KeyGroupRange(0, 1),
new ExecutionConfig(),
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
UncompressedStreamCompressionDecorator.INSTANCE,
new CloseableRegistry());
}

public static <K> RocksDBKeyedStateBackendBuilder<K> builderForTestDB(
File instanceBasePath,
TypeSerializer<K> keySerializer,
RocksDB db,
ColumnFamilyHandle defaultCFHandle,
ColumnFamilyOptions columnFamilyOptions) {

final DBOptions dbOptions = PredefinedOptions.DEFAULT.createDBOptions();
dbOptions.setCreateIfMissing(true);

return new RocksDBKeyedStateBackendBuilder<>(
"no-op",
ClassLoader.getSystemClassLoader(),
instanceBasePath,
dbOptions,
stateName -> columnFamilyOptions,
new KvStateRegistry().createTaskRegistry(new JobID(), new JobVertexID()),
keySerializer,
2,
new KeyGroupRange(0, 1),
new ExecutionConfig(),
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
TtlTimeProvider.DEFAULT,
new UnregisteredMetricsGroup(),
Collections.emptyList(),
UncompressedStreamCompressionDecorator.INSTANCE,
db,
defaultCFHandle,
new CloseableRegistry());
}
}

0 comments on commit 9da970f

Please sign in to comment.