From 6ff392842f9dc4d3c9c808e7912558d477826379 Mon Sep 17 00:00:00 2001 From: Yu Li Date: Tue, 24 Dec 2019 16:52:08 +0800 Subject: [PATCH] [FLINK-14926][state-backend-rocksdb] (follow-up) Move additional setup with sharedResources into RocksDBResourceContainer This closes #10670 --- .../state/RocksDBResourceContainer.java | 26 ++++++ .../streaming/state/RocksDBStateBackend.java | 32 +------ .../state/RocksDBResourceContainerTest.java | 93 +++++++++++++++++-- 3 files changed, 111 insertions(+), 40 deletions(-) diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index a5e2e8f2b1aff..c5227a54f6bce 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -20,9 +20,13 @@ import org.apache.flink.runtime.memory.OpaqueMemoryResource; import org.apache.flink.util.IOUtils; +import org.apache.flink.util.Preconditions; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.TableFormatConfig; import javax.annotation.Nullable; @@ -89,6 +93,7 @@ public DBOptions getDbOptions() { // add necessary default options opt = opt.setCreateIfMissing(true); + // if sharedResources is non-null, use the write buffer manager from it. if (sharedResources != null) { opt.setWriteBufferManager(sharedResources.getResourceHandle().getWriteBufferManager()); } @@ -109,6 +114,27 @@ public ColumnFamilyOptions getColumnOptions() { opt = optionsFactory.createColumnOptions(opt, handlesToClose); } + // if sharedResources is non-null, use the block cache from it and + // set necessary options for performance consideration with memory control + if (sharedResources != null) { + final RocksDBSharedResources rocksResources = sharedResources.getResourceHandle(); + final Cache blockCache = rocksResources.getCache(); + TableFormatConfig tableFormatConfig = opt.tableFormatConfig(); + BlockBasedTableConfig blockBasedTableConfig; + if (tableFormatConfig == null) { + blockBasedTableConfig = new BlockBasedTableConfig(); + } else { + Preconditions.checkArgument(tableFormatConfig instanceof BlockBasedTableConfig, + "We currently only support BlockBasedTableConfig When bounding total memory."); + blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig; + } + blockBasedTableConfig.setBlockCache(blockCache); + blockBasedTableConfig.setCacheIndexAndFilterBlocks(true); + blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true); + blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(true); + opt.setTableFormatConfig(blockBasedTableConfig); + } + return opt; } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java index ba48835e1943a..954ad36cfe9d2 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackend.java @@ -52,12 +52,8 @@ import org.apache.flink.util.Preconditions; import org.apache.flink.util.TernaryBoolean; -import org.rocksdb.BlockBasedTableConfig; -import org.rocksdb.Cache; -import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.NativeLibraryLoader; import org.rocksdb.RocksDB; -import org.rocksdb.TableFormatConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -74,7 +70,6 @@ import java.util.List; import java.util.Random; import java.util.UUID; -import java.util.function.Function; import static org.apache.flink.contrib.streaming.state.RocksDBOptions.CHECKPOINT_TRANSFER_THREAD_NUM; import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_FACTORY; @@ -505,33 +500,10 @@ public AbstractKeyedStateBackend createKeyedStateBackend( final OpaqueMemoryResource sharedResources = RocksDBOperationUtils .allocateSharedCachesIfConfigured(memoryConfiguration, env.getMemoryManager(), LOG); - - final RocksDBResourceContainer resourceContainer = createOptionsAndResourceContainer(sharedResources); - - final Function createColumnOptions; - if (sharedResources != null) { LOG.info("Obtained shared RocksDB cache of size {} bytes", sharedResources.getSize()); - - final RocksDBSharedResources rocksResources = sharedResources.getResourceHandle(); - final Cache blockCache = rocksResources.getCache(); - - createColumnOptions = stateName -> { - ColumnFamilyOptions columnOptions = resourceContainer.getColumnOptions(); - TableFormatConfig tableFormatConfig = columnOptions.tableFormatConfig(); - Preconditions.checkArgument(tableFormatConfig instanceof BlockBasedTableConfig, - "We currently only support BlockBasedTableConfig When bounding total memory."); - BlockBasedTableConfig blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig; - blockBasedTableConfig.setBlockCache(blockCache); - blockBasedTableConfig.setCacheIndexAndFilterBlocks(true); - blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true); - blockBasedTableConfig.setPinL0FilterAndIndexBlocksInCache(true); - columnOptions.setTableFormatConfig(blockBasedTableConfig); - return columnOptions; - }; - } else { - createColumnOptions = stateName -> resourceContainer.getColumnOptions(); } + final RocksDBResourceContainer resourceContainer = createOptionsAndResourceContainer(sharedResources); ExecutionConfig executionConfig = env.getExecutionConfig(); StreamCompressionDecorator keyGroupCompressionDecorator = getCompressionDecorator(executionConfig); @@ -540,7 +512,7 @@ public AbstractKeyedStateBackend createKeyedStateBackend( env.getUserClassLoader(), instanceBasePath, resourceContainer, - createColumnOptions, + stateName -> resourceContainer.getColumnOptions(), kvStateRegistry, keySerializer, numberOfKeyGroups, diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java index ec590775d6a0a..73d4babb72e4e 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java @@ -25,6 +25,8 @@ import org.junit.ClassRule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; import org.rocksdb.LRUCache; @@ -78,15 +80,33 @@ public void testFreeMultipleDBOptionsAfterClose() throws Exception { } } + /** + * Guard the shared resources will be released after {@link RocksDBResourceContainer#close()} when the + * {@link RocksDBResourceContainer} instance is initiated with {@link OpaqueMemoryResource}. + * + * @throws Exception if unexpected error happened. + */ @Test - public void testSharedResources() throws Exception { + public void testSharedResourcesAfterClose() throws Exception { + OpaqueMemoryResource sharedResources = getSharedResources(); + RocksDBResourceContainer container = + new RocksDBResourceContainer(PredefinedOptions.DEFAULT, null, sharedResources); + container.close(); + RocksDBSharedResources rocksDBSharedResources = sharedResources.getResourceHandle(); + assertThat(rocksDBSharedResources.getCache().isOwningHandle(), is(false)); + assertThat(rocksDBSharedResources.getWriteBufferManager().isOwningHandle(), is(false)); + } + + /** + * Guard that {@link RocksDBResourceContainer#getDbOptions()} shares the same {@link WriteBufferManager} instance + * if the {@link RocksDBResourceContainer} instance is initiated with {@link OpaqueMemoryResource}. + * + * @throws Exception if unexpected error happened. + */ + @Test + public void testGetDbOptionsWithSharedResources() throws Exception { final int optionNumber = 20; - final long cacheSize = 1024L, writeBufferSize = 512L; - final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1); - final WriteBufferManager wbm = new WriteBufferManager(writeBufferSize, cache); - RocksDBSharedResources rocksDBSharedResources = new RocksDBSharedResources(cache, wbm); - OpaqueMemoryResource sharedResources = - new OpaqueMemoryResource<>(rocksDBSharedResources, cacheSize, rocksDBSharedResources::close); + OpaqueMemoryResource sharedResources = getSharedResources(); RocksDBResourceContainer container = new RocksDBResourceContainer(PredefinedOptions.DEFAULT, null, sharedResources); HashSet writeBufferManagers = new HashSet<>(); @@ -96,10 +116,63 @@ public void testSharedResources() throws Exception { writeBufferManagers.add(writeBufferManager); } assertThat(writeBufferManagers.size(), is(1)); - assertThat(writeBufferManagers.iterator().next(), is(rocksDBSharedResources.getWriteBufferManager())); + assertThat(writeBufferManagers.iterator().next(), + is(sharedResources.getResourceHandle().getWriteBufferManager())); container.close(); - assertThat(rocksDBSharedResources.getCache().isOwningHandle(), is(false)); - assertThat(rocksDBSharedResources.getWriteBufferManager().isOwningHandle(), is(false)); + } + + /** + * Guard that {@link RocksDBResourceContainer#getColumnOptions()} shares the same {@link Cache} instance + * if the {@link RocksDBResourceContainer} instance is initiated with {@link OpaqueMemoryResource}. + * + * @throws Exception if unexpected error happened. + */ + @Test + public void testGetColumnFamilyOptionsWithSharedResources() throws Exception { + final int optionNumber = 20; + OpaqueMemoryResource sharedResources = getSharedResources(); + RocksDBResourceContainer container = + new RocksDBResourceContainer(PredefinedOptions.DEFAULT, null, sharedResources); + HashSet caches = new HashSet<>(); + for (int i = 0; i < optionNumber; i++) { + ColumnFamilyOptions columnOptions = container.getColumnOptions(); + Cache cache = getBlockCache(columnOptions); + caches.add(cache); + } + assertThat(caches.size(), is(1)); + assertThat(caches.iterator().next(), + is(sharedResources.getResourceHandle().getCache())); + container.close(); + } + + private OpaqueMemoryResource getSharedResources() { + final long cacheSize = 1024L, writeBufferSize = 512L; + final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1); + final WriteBufferManager wbm = new WriteBufferManager(writeBufferSize, cache); + RocksDBSharedResources rocksDBSharedResources = new RocksDBSharedResources(cache, wbm); + return new OpaqueMemoryResource<>(rocksDBSharedResources, cacheSize, rocksDBSharedResources::close); + } + + private Cache getBlockCache(ColumnFamilyOptions columnOptions) { + BlockBasedTableConfig blockBasedTableConfig = null; + try { + blockBasedTableConfig = (BlockBasedTableConfig) columnOptions.tableFormatConfig(); + } catch (ClassCastException e) { + fail("Table config got from ColumnFamilyOptions is not BlockBasedTableConfig"); + } + Field cacheField = null; + try { + cacheField = BlockBasedTableConfig.class.getDeclaredField("blockCache_"); + } catch (NoSuchFieldException e) { + fail("blockCache_ is not defined"); + } + cacheField.setAccessible(true); + try { + return (Cache) cacheField.get(blockBasedTableConfig); + } catch (IllegalAccessException e) { + fail("Cannot access blockCache_ field."); + return null; + } } private WriteBufferManager getWriteBufferManager(DBOptions dbOptions) {