Skip to content

Commit

Permalink
[FLINK-17800][rocksdb] Ensure total order seek to avoid user misuse
Browse files Browse the repository at this point in the history
  • Loading branch information
Myasuka authored and carp84 committed Jun 26, 2020
1 parent 6227fff commit 3516e37
Show file tree
Hide file tree
Showing 16 changed files with 255 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.flink.shaded.guava18.com.google.common.primitives.UnsignedBytes;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

Expand Down Expand Up @@ -63,6 +64,9 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
@Nonnull
private final RocksDB db;

@Nonnull
private final ReadOptions readOptions;

/** Handle to the column family of the RocksDB instance in which the elements are stored. */
@Nonnull
private final ColumnFamilyHandle columnFamilyHandle;
Expand Down Expand Up @@ -112,13 +116,15 @@ public class RocksDBCachingPriorityQueueSet<E extends HeapPriorityQueueElement>
@Nonnegative int keyGroupId,
@Nonnegative int keyGroupPrefixBytes,
@Nonnull RocksDB db,
@Nonnull ReadOptions readOptions,
@Nonnull ColumnFamilyHandle columnFamilyHandle,
@Nonnull TypeSerializer<E> byteOrderProducingSerializer,
@Nonnull DataOutputSerializer outputStream,
@Nonnull DataInputDeserializer inputStream,
@Nonnull RocksDBWriteBatchWrapper batchWrapper,
@Nonnull OrderedByteArraySetCache orderedByteArraySetCache) {
this.db = db;
this.readOptions = readOptions;
this.columnFamilyHandle = columnFamilyHandle;
this.byteOrderProducingSerializer = byteOrderProducingSerializer;
this.batchWrapper = batchWrapper;
Expand Down Expand Up @@ -304,7 +310,7 @@ private RocksBytesIterator orderedBytesIterator() {
flushWriteBatch();
return new RocksBytesIterator(
new RocksIteratorWrapper(
db.newIterator(columnFamilyHandle)));
db.newIterator(columnFamilyHandle, readOptions)));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.runtime.state.KeyedStateHandle;

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

Expand Down Expand Up @@ -116,7 +117,8 @@ private static void deleteRange(
@Nonnegative long writeBatchSize) throws RocksDBException {

for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
try (RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle);
try (ReadOptions readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
RocksIteratorWrapper iteratorWrapper = RocksDBOperationUtils.getRocksIterator(db, columnFamilyHandle, readOptions);
RocksDBWriteBatchWrapper writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeBatchSize)) {

iteratorWrapper.seek(beginKeyBytes);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.rocksdb.Snapshot;
Expand Down Expand Up @@ -149,6 +150,12 @@ <K, N, SV, S extends State, IS extends S> IS createState(
*/
private final WriteOptions writeOptions;

/**
* The read options to use when creating iterators.
* We ensure total order seek in case user misuse, see FLINK-17800 for more details.
*/
private final ReadOptions readOptions;

/**
* The max memory size for one batch in {@link RocksDBWriteBatchWrapper}.
*/
Expand Down Expand Up @@ -212,6 +219,8 @@ public RocksDBKeyedStateBackend(
ExecutionConfig executionConfig,
TtlTimeProvider ttlTimeProvider,
RocksDB db,
WriteOptions writeOptions,
ReadOptions readOptions,
LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
int keyGroupPrefixBytes,
CloseableRegistry cancelStreamRegistry,
Expand Down Expand Up @@ -250,7 +259,8 @@ public RocksDBKeyedStateBackend(
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.kvStateInformation = kvStateInformation;

this.writeOptions = new WriteOptions().setDisableWAL(true);
this.writeOptions = writeOptions;
this.readOptions = readOptions;
checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative value.");
this.writeBatchSize = writeBatchSize;
this.db = db;
Expand Down Expand Up @@ -290,7 +300,7 @@ public <N> Stream<K> getKeys(String state, N namespace) {
throw new FlinkRuntimeException("Failed to get keys from RocksDB state backend.", ex);
}

RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle);
RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnInfo.columnFamilyHandle, readOptions);
iterator.seekToFirst();

final RocksStateKeysIterator<K> iteratorWrapper = new RocksStateKeysIterator<>(iterator, state, getKeySerializer(), keyGroupPrefixBytes,
Expand Down Expand Up @@ -360,6 +370,7 @@ public void dispose() {
columnFamilyOptions.forEach(IOUtils::closeQuietly);

IOUtils.closeQuietly(optionsContainer);
IOUtils.closeQuietly(readOptions);
IOUtils.closeQuietly(writeOptions);

ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
Expand Down Expand Up @@ -407,6 +418,10 @@ public WriteOptions getWriteOptions() {
return writeOptions;
}

public ReadOptions getReadOptions() {
return readOptions;
}

RocksDBSerializedCompositeKeyBuilder<K> getSharedRocksKeyBuilder() {
return sharedRocksKeyBuilder;
}
Expand Down Expand Up @@ -606,7 +621,7 @@ private <N, S extends State, SV> void migrateStateValues(

Snapshot rocksDBSnapshot = db.getSnapshot();
try (
RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0);
RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, stateMetaInfo.f0, readOptions);
RocksDBWriteBatchWrapper batchWriter = new RocksDBWriteBatchWrapper(db, getWriteOptions(), getWriteBatchSize())
) {
iterator.seekToFirst();
Expand Down Expand Up @@ -681,7 +696,7 @@ public int numKeyValueStateEntries() {

for (RocksDbKvStateInfo metaInfo : kvStateInformation.values()) {
//TODO maybe filterOrTransform only for k/v states
try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle)) {
try (RocksIteratorWrapper rocksIterator = RocksDBOperationUtils.getRocksIterator(db, metaInfo.columnFamilyHandle, readOptions)) {
rocksIterator.seekToFirst();

while (rocksIterator.isValid()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
Expand Down Expand Up @@ -244,6 +245,7 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
CloseableRegistry cancelStreamRegistryForBackend = new CloseableRegistry();
// The write options to use in the states.
WriteOptions writeOptions = null;
ReadOptions readOptions = null;
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation = new LinkedHashMap<>();
RocksDB db = null;
AbstractRocksDBRestoreOperation restoreOperation = null;
Expand Down Expand Up @@ -282,6 +284,7 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
}

writeOptions = new WriteOptions().setDisableWAL(true);
readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
writeBatchWrapper = new RocksDBWriteBatchWrapper(db, writeOptions, writeBatchSize);
// it is important that we only create the key builder after the restore, and not before;
// restore operations may reconfigure the key serializer, so accessing the key serializer
Expand All @@ -294,8 +297,13 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
snapshotStrategy = initializeSavepointAndCheckpointStrategies(cancelStreamRegistryForBackend, rocksDBResourceGuard,
kvStateInformation, keyGroupPrefixBytes, db, backendUID, materializedSstFiles, lastCompletedCheckpointId);
// init priority queue factory
priorityQueueFactory = initPriorityQueueFactory(keyGroupPrefixBytes, kvStateInformation, db,
writeBatchWrapper, nativeMetricMonitor);
priorityQueueFactory = initPriorityQueueFactory(
keyGroupPrefixBytes,
kvStateInformation,
db,
readOptions,
writeBatchWrapper,
nativeMetricMonitor);
} catch (Throwable e) {
// Do clean up
List<ColumnFamilyOptions> columnFamilyOptions = new ArrayList<>(kvStateInformation.values().size());
Expand All @@ -313,6 +321,7 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
IOUtils.closeQuietly(restoreOperation);
IOUtils.closeAllQuietly(columnFamilyOptions);
IOUtils.closeQuietly(optionsContainer);
IOUtils.closeQuietly(readOptions);
IOUtils.closeQuietly(writeOptions);
ttlCompactFiltersManager.disposeAndClearRegisteredCompactionFactories();
kvStateInformation.clear();
Expand Down Expand Up @@ -344,6 +353,8 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
this.executionConfig,
this.ttlTimeProvider,
db,
writeOptions,
readOptions,
kvStateInformation,
keyGroupPrefixBytes,
cancelStreamRegistryForBackend,
Expand Down Expand Up @@ -472,6 +483,7 @@ private PriorityQueueSetFactory initPriorityQueueFactory(
int keyGroupPrefixBytes,
Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDB db,
ReadOptions readOptions,
RocksDBWriteBatchWrapper writeBatchWrapper,
RocksDBNativeMetricMonitor nativeMetricMonitor) {
PriorityQueueSetFactory priorityQueueFactory;
Expand All @@ -486,6 +498,7 @@ private PriorityQueueSetFactory initPriorityQueueFactory(
numberOfKeyGroups,
kvStateInformation,
db,
readOptions,
writeBatchWrapper,
nativeMetricMonitor,
columnFamilyOptionsFactory
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ public Map.Entry<UK, UV> next() {
public boolean isEmpty() {
final byte[] prefixBytes = serializeCurrentKeyWithGroupAndNamespace();

try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily)) {
try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions())) {

iterator.seek(prefixBytes);

Expand All @@ -247,7 +247,7 @@ public boolean isEmpty() {
@Override
public void clear() {
try {
try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily);
try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(backend.db, columnFamily, backend.getReadOptions());
RocksDBWriteBatchWrapper rocksDBWriteBatchWrapper = new RocksDBWriteBatchWrapper(backend.db, backend.getWriteOptions(), backend.getWriteBatchSize())) {

final byte[] keyPrefixBytes = serializeCurrentKeyWithGroupAndNamespace();
Expand Down Expand Up @@ -570,7 +570,7 @@ private void loadCache() {

// use try-with-resources to ensure RocksIterator can be release even some runtime exception
// occurred in the below code block.
try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily)) {
try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(db, columnFamily, backend.getReadOptions())) {

/*
* The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -94,12 +95,18 @@ public static RocksDB openDB(
return dbRef;
}

public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
return new RocksIteratorWrapper(db.newIterator());
public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle, ReadOptions readOptions) {
return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle, readOptions));
}

public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) {
return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
/**
* Create a total order read option to avoid user misuse, see FLINK-17800 for more details.
*
* <p>Note, remember to close the generated {@link ReadOptions} when dispose.
*/
// TODO We would remove this method once we bump RocksDB version larger than 6.2.2.
public static ReadOptions createTotalOrderSeekReadOptions() {
return new ReadOptions().setTotalOrderSeek(true);
}

public static void registerKvStateInformation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.core.memory.DataInputDeserializer;
Expand All @@ -36,6 +37,7 @@

import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;

import javax.annotation.Nonnull;
Expand All @@ -52,7 +54,8 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
/**
* Default cache size per key-group.
*/
private static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable
@VisibleForTesting
static final int DEFAULT_CACHES_SIZE = 128; //TODO make this configurable

/**
* A shared buffer to serialize elements for the priority queue.
Expand All @@ -71,6 +74,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
private final int numberOfKeyGroups;
private final Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation;
private final RocksDB db;
private final ReadOptions readOptions;
private final RocksDBWriteBatchWrapper writeBatchWrapper;
private final RocksDBNativeMetricMonitor nativeMetricMonitor;
private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;
Expand All @@ -81,6 +85,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
int numberOfKeyGroups,
Map<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDB db,
ReadOptions readOptions,
RocksDBWriteBatchWrapper writeBatchWrapper,
RocksDBNativeMetricMonitor nativeMetricMonitor,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {
Expand All @@ -89,6 +94,7 @@ public class RocksDBPriorityQueueSetFactory implements PriorityQueueSetFactory {
this.numberOfKeyGroups = numberOfKeyGroups;
this.kvStateInformation = kvStateInformation;
this.db = db;
this.readOptions = readOptions;
this.writeBatchWrapper = writeBatchWrapper;
this.nativeMetricMonitor = nativeMetricMonitor;
this.columnFamilyOptionsFactory = columnFamilyOptionsFactory;
Expand Down Expand Up @@ -122,6 +128,7 @@ public RocksDBCachingPriorityQueueSet<T> create(
keyGroupId,
keyGroupPrefixBytes,
db,
readOptions,
columnFamilyHandle,
byteOrderedElementSerializer,
sharedElementOutView,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import org.rocksdb.ColumnFamilyHandle;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.ReadOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;
import org.slf4j.Logger;
Expand Down Expand Up @@ -309,7 +310,7 @@ private void restoreWithRescaling(Collection<KeyedStateHandle> restoreStateHandl
null, tmpRestoreDBInfo.stateMetaInfoSnapshots.get(i))
.columnFamilyHandle;

try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle)) {
try (RocksIteratorWrapper iterator = RocksDBOperationUtils.getRocksIterator(tmpRestoreDBInfo.db, tmpColumnFamilyHandle, tmpRestoreDBInfo.readOptions)) {

iterator.seek(startKeyGroupPrefixBytes);

Expand Down Expand Up @@ -376,6 +377,8 @@ private static class RestoredDBInstance implements AutoCloseable {
@Nonnull
private final List<StateMetaInfoSnapshot> stateMetaInfoSnapshots;

private final ReadOptions readOptions;

private RestoredDBInstance(
@Nonnull RocksDB db,
@Nonnull List<ColumnFamilyHandle> columnFamilyHandles,
Expand All @@ -386,6 +389,7 @@ private RestoredDBInstance(
this.columnFamilyHandles = columnFamilyHandles;
this.columnFamilyDescriptors = columnFamilyDescriptors;
this.stateMetaInfoSnapshots = stateMetaInfoSnapshots;
this.readOptions = RocksDBOperationUtils.createTotalOrderSeekReadOptions();
}

@Override
Expand All @@ -397,6 +401,7 @@ public void close() {
IOUtils.closeAllQuietly(columnFamilyHandles);
IOUtils.closeQuietly(db);
IOUtils.closeAllQuietly(columnFamilyOptions);
IOUtils.closeQuietly(readOptions);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ TestElement, RocksDBCachingPriorityQueueSet<TestElement>> newFactory() {
keyGroupId,
keyGroupPrefixBytes,
rocksDBResource.getRocksDB(),
rocksDBResource.getReadOptions(),
rocksDBResource.getDefaultColumnFamily(),
TestElementSerializer.INSTANCE,
outputStreamWithPos,
Expand Down
Loading

0 comments on commit 3516e37

Please sign in to comment.