Skip to content

Commit

Permalink
[FLINK-14926][state-backend-rocksdb] (follow-up) Move additional setu…
Browse files Browse the repository at this point in the history
…p with sharedResources into RocksDBResourceContainer

This closes apache#10670
  • Loading branch information
carp84 authored and StephanEwen committed Jan 7, 2020
1 parent 0b7d496 commit 6ff3928
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,13 @@

import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.TableFormatConfig;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -89,6 +93,7 @@ public DBOptions getDbOptions() {
// add necessary default options
opt = opt.setCreateIfMissing(true);

// if sharedResources is non-null, use the write buffer manager from it.
if (sharedResources != null) {
opt.setWriteBufferManager(sharedResources.getResourceHandle().getWriteBufferManager());
}
Expand All @@ -109,6 +114,27 @@ public ColumnFamilyOptions getColumnOptions() {
opt = optionsFactory.createColumnOptions(opt, handlesToClose);
}

// if sharedResources is non-null, use the block cache from it and
// set necessary options for performance consideration with memory control
if (sharedResources != null) {
final RocksDBSharedResources rocksResources = sharedResources.getResourceHandle();
final Cache blockCache = rocksResources.getCache();
TableFormatConfig tableFormatConfig = opt.tableFormatConfig();
BlockBasedTableConfig blockBasedTableConfig;
if (tableFormatConfig == null) {
blockBasedTableConfig = new BlockBasedTableConfig();
} else {
Preconditions.checkArgument(tableFormatConfig instanceof BlockBasedTableConfig,
"We currently only support BlockBasedTableConfig When bounding total memory.");
blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig;
}
blockBasedTableConfig.setBlockCache(blockCache);
blockBasedTableConfig.setCacheIndexAndFilterBlocks(true);
blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(true);
opt.setTableFormatConfig(blockBasedTableConfig);
}

return opt;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,12 +52,8 @@
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;

import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.RocksDB;
import org.rocksdb.TableFormatConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -74,7 +70,6 @@
import java.util.List;
import java.util.Random;
import java.util.UUID;
import java.util.function.Function;

import static org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM;
import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY;
Expand Down Expand Up @@ -505,33 +500,10 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(

final OpaqueMemoryResource<RocksDBSharedResources> sharedResources = RocksDBOperationUtils
.allocateSharedCachesIfConfigured(memoryConfiguration, env.getMemoryManager(), LOG);

final RocksDBResourceContainer resourceContainer = createOptionsAndResourceContainer(sharedResources);

final Function<String, ColumnFamilyOptions> createColumnOptions;

if (sharedResources != null) {
LOG.info("Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize());

final RocksDBSharedResources rocksResources = sharedResources.getResourceHandle();
final Cache blockCache = rocksResources.getCache();

createColumnOptions = stateName -> {
ColumnFamilyOptions columnOptions = resourceContainer.getColumnOptions();
TableFormatConfig tableFormatConfig = columnOptions.tableFormatConfig();
Preconditions.checkArgument(tableFormatConfig instanceof BlockBasedTableConfig,
"We currently only support BlockBasedTableConfig When bounding total memory.");
BlockBasedTableConfig blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig;
blockBasedTableConfig.setBlockCache(blockCache);
blockBasedTableConfig.setCacheIndexAndFilterBlocks(true);
blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(true);
columnOptions.setTableFormatConfig(blockBasedTableConfig);
return columnOptions;
};
} else {
createColumnOptions = stateName -> resourceContainer.getColumnOptions();
}
final RocksDBResourceContainer resourceContainer = createOptionsAndResourceContainer(sharedResources);

ExecutionConfig executionConfig = env.getExecutionConfig();
StreamCompressionDecorator keyGroupCompressionDecorator = getCompressionDecorator(executionConfig);
Expand All @@ -540,7 +512,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
env.getUserClassLoader(),
instanceBasePath,
resourceContainer,
createColumnOptions,
stateName -> resourceContainer.getColumnOptions(),
kvStateRegistry,
keySerializer,
numberOfKeyGroups,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.LRUCache;
Expand Down Expand Up @@ -78,15 +80,33 @@ public void testFreeMultipleDBOptionsAfterClose() throws Exception {
}
}

/**
* Guard the shared resources will be released after {@link RocksDBResourceContainer#close()} when the
* {@link RocksDBResourceContainer} instance is initiated with {@link OpaqueMemoryResource}.
*
* @throws Exception if unexpected error happened.
*/
@Test
public void testSharedResources() throws Exception {
public void testSharedResourcesAfterClose() throws Exception {
OpaqueMemoryResource<RocksDBSharedResources> sharedResources = getSharedResources();
RocksDBResourceContainer container =
new RocksDBResourceContainer(PredefinedOptions.DEFAULT, null, sharedResources);
container.close();
RocksDBSharedResources rocksDBSharedResources = sharedResources.getResourceHandle();
assertThat(rocksDBSharedResources.getCache().isOwningHandle(), is(false));
assertThat(rocksDBSharedResources.getWriteBufferManager().isOwningHandle(), is(false));
}

/**
* Guard that {@link RocksDBResourceContainer#getDbOptions()} shares the same {@link WriteBufferManager} instance
* if the {@link RocksDBResourceContainer} instance is initiated with {@link OpaqueMemoryResource}.
*
* @throws Exception if unexpected error happened.
*/
@Test
public void testGetDbOptionsWithSharedResources() throws Exception {
final int optionNumber = 20;
final long cacheSize = 1024L, writeBufferSize = 512L;
final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1);
final WriteBufferManager wbm = new WriteBufferManager(writeBufferSize, cache);
RocksDBSharedResources rocksDBSharedResources = new RocksDBSharedResources(cache, wbm);
OpaqueMemoryResource<RocksDBSharedResources> sharedResources =
new OpaqueMemoryResource<>(rocksDBSharedResources, cacheSize, rocksDBSharedResources::close);
OpaqueMemoryResource<RocksDBSharedResources> sharedResources = getSharedResources();
RocksDBResourceContainer container =
new RocksDBResourceContainer(PredefinedOptions.DEFAULT, null, sharedResources);
HashSet<WriteBufferManager> writeBufferManagers = new HashSet<>();
Expand All @@ -96,10 +116,63 @@ public void testSharedResources() throws Exception {
writeBufferManagers.add(writeBufferManager);
}
assertThat(writeBufferManagers.size(), is(1));
assertThat(writeBufferManagers.iterator().next(), is(rocksDBSharedResources.getWriteBufferManager()));
assertThat(writeBufferManagers.iterator().next(),
is(sharedResources.getResourceHandle().getWriteBufferManager()));
container.close();
assertThat(rocksDBSharedResources.getCache().isOwningHandle(), is(false));
assertThat(rocksDBSharedResources.getWriteBufferManager().isOwningHandle(), is(false));
}

/**
* Guard that {@link RocksDBResourceContainer#getColumnOptions()} shares the same {@link Cache} instance
* if the {@link RocksDBResourceContainer} instance is initiated with {@link OpaqueMemoryResource}.
*
* @throws Exception if unexpected error happened.
*/
@Test
public void testGetColumnFamilyOptionsWithSharedResources() throws Exception {
final int optionNumber = 20;
OpaqueMemoryResource<RocksDBSharedResources> sharedResources = getSharedResources();
RocksDBResourceContainer container =
new RocksDBResourceContainer(PredefinedOptions.DEFAULT, null, sharedResources);
HashSet<Cache> caches = new HashSet<>();
for (int i = 0; i < optionNumber; i++) {
ColumnFamilyOptions columnOptions = container.getColumnOptions();
Cache cache = getBlockCache(columnOptions);
caches.add(cache);
}
assertThat(caches.size(), is(1));
assertThat(caches.iterator().next(),
is(sharedResources.getResourceHandle().getCache()));
container.close();
}

private OpaqueMemoryResource<RocksDBSharedResources> getSharedResources() {
final long cacheSize = 1024L, writeBufferSize = 512L;
final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1);
final WriteBufferManager wbm = new WriteBufferManager(writeBufferSize, cache);
RocksDBSharedResources rocksDBSharedResources = new RocksDBSharedResources(cache, wbm);
return new OpaqueMemoryResource<>(rocksDBSharedResources, cacheSize, rocksDBSharedResources::close);
}

private Cache getBlockCache(ColumnFamilyOptions columnOptions) {
BlockBasedTableConfig blockBasedTableConfig = null;
try {
blockBasedTableConfig = (BlockBasedTableConfig) columnOptions.tableFormatConfig();
} catch (ClassCastException e) {
fail("Table config got from ColumnFamilyOptions is not BlockBasedTableConfig");
}
Field cacheField = null;
try {
cacheField = BlockBasedTableConfig.class.getDeclaredField("blockCache_");
} catch (NoSuchFieldException e) {
fail("blockCache_ is not defined");
}
cacheField.setAccessible(true);
try {
return (Cache) cacheField.get(blockBasedTableConfig);
} catch (IllegalAccessException e) {
fail("Cannot access blockCache_ field.");
return null;
}
}

private WriteBufferManager getWriteBufferManager(DBOptions dbOptions) {
Expand Down

0 comments on commit 6ff3928

Please sign in to comment.