Skip to content

Commit

Permalink
[hotfix] RocksDB make default column family first
Browse files Browse the repository at this point in the history
According to the documentation of RocksDB, the default column family should always be created first.
  • Loading branch information
StefanRRichter committed Feb 25, 2018
1 parent 0f27116 commit ca523fd
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,9 +156,6 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** File suffix of sstable files. */
private static final String SST_FILE_SUFFIX = ".sst";

/** Bytes for the name of the column descriptor for the default column family. */
public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = "default".getBytes(ConfigConstants.DEFAULT_CHARSET);

/** String that identifies the operator that owns this backend. */
private final String operatorIdentifier;

Expand Down Expand Up @@ -349,29 +346,31 @@ public void dispose() {
if (db != null) {

// RocksDB's native memory management requires that *all* CFs (including default) are closed before the
// DB is closed. So we start with the ones created by Flink...
// DB is closed. See:
// https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
// Start with default CF ...
IOUtils.closeQuietly(defaultColumnFamily);

// ... continue with the ones created by Flink...
for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> columnMetaData :
kvStateInformation.values()) {
IOUtils.closeQuietly(columnMetaData.f0);
}

// ... close the default CF ...
IOUtils.closeQuietly(defaultColumnFamily);

// ... and finally close the DB instance ...
IOUtils.closeQuietly(db);

// invalidate the reference before releasing the lock so that other accesses will not cause crashes
// invalidate the reference
db = null;
}

kvStateInformation.clear();
restoredKvStateMetaInfos.clear();
kvStateInformation.clear();
restoredKvStateMetaInfos.clear();

IOUtils.closeQuietly(dbOptions);
IOUtils.closeQuietly(columnOptions);
IOUtils.closeQuietly(dbOptions);
IOUtils.closeQuietly(columnOptions);

cleanInstanceBasePath();
cleanInstanceBasePath();
}
}

private void cleanInstanceBasePath() {
Expand Down Expand Up @@ -475,11 +474,11 @@ private RocksDB openDB(
List<ColumnFamilyDescriptor> columnFamilyDescriptors =
new ArrayList<>(1 + stateColumnFamilyDescriptors.size());

// we add the required descriptor for the default CF in FIRST position, see
// https://github.com/facebook/rocksdb/wiki/RocksJava-Basics#opening-a-database-with-column-families
columnFamilyDescriptors.add(new ColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY, columnOptions));
columnFamilyDescriptors.addAll(stateColumnFamilyDescriptors);

// we add the required descriptor for the default CF in last position.
columnFamilyDescriptors.add(new ColumnFamilyDescriptor(DEFAULT_COLUMN_FAMILY_NAME_BYTES, columnOptions));

RocksDB dbRef;

try {
Expand Down Expand Up @@ -602,7 +601,6 @@ private void restoreKVStateMetaData() throws IOException, StateMigrationExceptio
List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> restoredMetaInfos =
serializationProxy.getStateMetaInfoSnapshots();
currentStateHandleKVStateColumnFamilies = new ArrayList<>(restoredMetaInfos.size());
//rocksDBKeyedStateBackend.restoredKvStateMetaInfos = new HashMap<>(restoredMetaInfos.size());

for (RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> restoredMetaInfo : restoredMetaInfos) {

Expand Down Expand Up @@ -845,8 +843,8 @@ private void restoreLocalStateIntoFullInstance(
stateBackend.instanceRocksDBPath.getAbsolutePath(),
columnFamilyDescriptors, columnFamilyHandles);

// extract and store the default column family which is located at the last index
stateBackend.defaultColumnFamily = columnFamilyHandles.remove(columnFamilyHandles.size() - 1);
// extract and store the default column family which is located at the first index
stateBackend.defaultColumnFamily = columnFamilyHandles.remove(0);

for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?> stateMetaInfoSnapshot = stateMetaInfoSnapshots.get(i);
Expand Down Expand Up @@ -1027,8 +1025,11 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
columnFamilyDescriptors,
columnFamilyHandles)) {

final ColumnFamilyHandle defaultColumnFamily = columnFamilyHandles.remove(0);

Preconditions.checkState(columnFamilyHandles.size() == columnFamilyDescriptors.size());

try {
// iterating only the requested descriptors automatically skips the default column family handle
for (int i = 0; i < columnFamilyDescriptors.size(); ++i) {
ColumnFamilyHandle columnFamilyHandle = columnFamilyHandles.get(i);
ColumnFamilyDescriptor columnFamilyDescriptor = columnFamilyDescriptors.get(i);
Expand Down Expand Up @@ -1085,9 +1086,12 @@ private void restoreKeyGroupsShardWithTemporaryHelperInstance(
} // releases native iterator resources
}
} finally {

//release native tmp db column family resources
for (ColumnFamilyHandle columnFamilyHandle : columnFamilyHandles) {
IOUtils.closeQuietly(columnFamilyHandle);
IOUtils.closeQuietly(defaultColumnFamily);

for (ColumnFamilyHandle flinkColumnFamilyHandle : columnFamilyHandles) {
IOUtils.closeQuietly(flinkColumnFamilyHandle);
}
}
} // releases native tmp db resources
Expand Down Expand Up @@ -1165,7 +1169,7 @@ protected <N, S> ColumnFamilyHandle getColumnFamily(
}

byte[] nameBytes = descriptor.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
Preconditions.checkState(!Arrays.equals(DEFAULT_COLUMN_FAMILY_NAME_BYTES, nameBytes),
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
"The chosen state name 'default' collides with the name of the default column family!");

ColumnFamilyDescriptor columnDescriptor = new ColumnFamilyDescriptor(nameBytes, columnOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.util.IOUtils;

import org.junit.Assert;
import org.junit.Rule;
Expand Down Expand Up @@ -53,7 +54,7 @@ public class RocksDBMergeIteratorTest {
@Test
public void testEmptyMergeIterator() throws IOException {
RocksDBKeyedStateBackend.RocksDBMergeIterator emptyIterator =
new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.EMPTY_LIST, 2);
new RocksDBKeyedStateBackend.RocksDBMergeIterator(Collections.emptyList(), 2);
Assert.assertFalse(emptyIterator.isValid());
}

Expand All @@ -74,16 +75,15 @@ public void testMergeIteratorShort() throws Exception {
public void testMergeIterator(int maxParallelism) throws Exception {
Random random = new Random(1234);

RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath());
try {
try (RocksDB rocksDB = RocksDB.open(tempFolder.getRoot().getAbsolutePath())) {
List<Tuple2<RocksIterator, Integer>> rocksIteratorsWithKVStateId = new ArrayList<>();
List<Tuple2<ColumnFamilyHandle, Integer>> columnFamilyHandlesWithKeyCount = new ArrayList<>();

int totalKeysExpected = 0;

for (int c = 0; c < NUM_KEY_VAL_STATES; ++c) {
ColumnFamilyHandle handle = rocksDB.createColumnFamily(
new ColumnFamilyDescriptor(("column-" + c).getBytes(ConfigConstants.DEFAULT_CHARSET)));
new ColumnFamilyDescriptor(("column-" + c).getBytes(ConfigConstants.DEFAULT_CHARSET)));

ByteArrayOutputStreamWithPos bos = new ByteArrayOutputStreamWithPos();
DataOutputStream dos = new DataOutputStream(bos);
Expand Down Expand Up @@ -113,39 +113,41 @@ public void testMergeIterator(int maxParallelism) throws Exception {
++id;
}

RocksDBKeyedStateBackend.RocksDBMergeIterator mergeIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator(rocksIteratorsWithKVStateId, maxParallelism <= Byte.MAX_VALUE ? 1 : 2);
try (RocksDBKeyedStateBackend.RocksDBMergeIterator mergeIterator = new RocksDBKeyedStateBackend.RocksDBMergeIterator(
rocksIteratorsWithKVStateId,
maxParallelism <= Byte.MAX_VALUE ? 1 : 2)) {

int prevKVState = -1;
int prevKey = -1;
int prevKeyGroup = -1;
int totalKeysActual = 0;
int prevKVState = -1;
int prevKey = -1;
int prevKeyGroup = -1;
int totalKeysActual = 0;

while (mergeIterator.isValid()) {
ByteBuffer bb = ByteBuffer.wrap(mergeIterator.key());
while (mergeIterator.isValid()) {
ByteBuffer bb = ByteBuffer.wrap(mergeIterator.key());

int keyGroup = maxParallelism > Byte.MAX_VALUE ? bb.getShort() : bb.get();
int key = bb.getInt();
int keyGroup = maxParallelism > Byte.MAX_VALUE ? bb.getShort() : bb.get();
int key = bb.getInt();

Assert.assertTrue(keyGroup >= prevKeyGroup);
Assert.assertTrue(key >= prevKey);
Assert.assertEquals(prevKeyGroup != keyGroup, mergeIterator.isNewKeyGroup());
Assert.assertEquals(prevKVState != mergeIterator.kvStateId(), mergeIterator.isNewKeyValueState());
Assert.assertTrue(keyGroup >= prevKeyGroup);
Assert.assertTrue(key >= prevKey);
Assert.assertEquals(prevKeyGroup != keyGroup, mergeIterator.isNewKeyGroup());
Assert.assertEquals(prevKVState != mergeIterator.kvStateId(), mergeIterator.isNewKeyValueState());

prevKeyGroup = keyGroup;
prevKVState = mergeIterator.kvStateId();
prevKeyGroup = keyGroup;
prevKVState = mergeIterator.kvStateId();

//System.out.println(keyGroup + " " + key + " " + mergeIterator.kvStateId());
mergeIterator.next();
++totalKeysActual;
mergeIterator.next();
++totalKeysActual;
}

Assert.assertEquals(totalKeysExpected, totalKeysActual);
}

Assert.assertEquals(totalKeysExpected, totalKeysActual);
IOUtils.closeQuietly(rocksDB.getDefaultColumnFamily());

for (Tuple2<ColumnFamilyHandle, Integer> handleWithCount : columnFamilyHandlesWithKeyCount) {
rocksDB.dropColumnFamily(handleWithCount.f0);
IOUtils.closeQuietly(handleWithCount.f0);
}
} finally {
rocksDB.close();
}
}

Expand Down

0 comments on commit ca523fd

Please sign in to comment.