From f7439740e8e023d458d2ac0cb2a58682eb9b6beb Mon Sep 17 00:00:00 2001 From: liuyufei Date: Wed, 9 Dec 2020 23:39:32 +0800 Subject: [PATCH] [FLINK-20496][state backends] RocksDB partitioned index/filters option. Configure partitioned index and filters options according to 'https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters'. This closes #14341. --- .../generated/rocksdb_configuration.html | 6 ++ .../state_backend_rocksdb_section.html | 6 ++ .../state/RocksDBMemoryConfiguration.java | 19 ++++++ .../state/RocksDBMemoryControllerUtils.java | 8 ++- .../state/RocksDBOperationUtils.java | 6 +- .../streaming/state/RocksDBOptions.java | 14 +++++ .../state/RocksDBResourceContainer.java | 49 +++++++++++++++ .../state/RocksDBSharedResources.java | 12 +++- .../RocksDBMemoryControllerUtilsTest.java | 5 +- .../state/RocksDBResourceContainerTest.java | 62 ++++++++++++++++++- 10 files changed, 179 insertions(+), 8 deletions(-) diff --git a/docs/layouts/shortcodes/generated/rocksdb_configuration.html b/docs/layouts/shortcodes/generated/rocksdb_configuration.html index 79b769dc0f758..74b12365ffc2b 100644 --- a/docs/layouts/shortcodes/generated/rocksdb_configuration.html +++ b/docs/layouts/shortcodes/generated/rocksdb_configuration.html @@ -38,6 +38,12 @@ Boolean If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped. + +
state.backend.rocksdb.memory.partitioned-index-filters
+ false + Boolean + With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. The partitioned index/filter then uses the top-level index to load on demand into the block cache the partitions that are required to perform the index/filter query. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured. +
state.backend.rocksdb.memory.write-buffer-ratio
0.5 diff --git a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html index 8a74eeef48114..a8ed2c75a2d99 100644 --- a/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html +++ b/docs/layouts/shortcodes/generated/state_backend_rocksdb_section.html @@ -26,6 +26,12 @@ Boolean If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped. + +
state.backend.rocksdb.memory.partitioned-index-filters
+ false + Boolean + With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. The partitioned index/filter then uses the top-level index to load on demand into the block cache the partitions that are required to perform the index/filter query. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured. +
state.backend.rocksdb.memory.write-buffer-ratio
0.5 diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java index dda5409fed3ad..3bfcc03441e76 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryConfiguration.java @@ -51,6 +51,9 @@ public final class RocksDBMemoryConfiguration implements Serializable { */ @Nullable private Double highPriorityPoolRatio; + /** Flag whether to use partition index/filters. Null if not set. */ + @Nullable private Boolean usePartitionedIndexFilters; + // ------------------------------------------------------------------------ /** @@ -166,6 +169,17 @@ public double getHighPriorityPoolRatio() { : RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue(); } + /** + * Gets whether the state backend is configured to use partitioned index/filters for RocksDB. + * + *

See {@link RocksDBOptions#USE_PARTITIONED_INDEX_FILTERS} for details. + */ + public Boolean isUsingPartitionedIndexFilters() { + return usePartitionedIndexFilters != null + ? usePartitionedIndexFilters + : RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS.defaultValue(); + } + // ------------------------------------------------------------------------ /** Validates if the configured options are valid with respect to one another. */ @@ -219,6 +233,11 @@ public static RocksDBMemoryConfiguration fromOtherAndConfiguration( ? other.highPriorityPoolRatio : config.get(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO); + newConfig.usePartitionedIndexFilters = + other.usePartitionedIndexFilters != null + ? other.usePartitionedIndexFilters + : config.get(RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS); + return newConfig; } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java index 7362f1a7d4272..78aa1a5e7dad3 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtils.java @@ -39,7 +39,10 @@ public class RocksDBMemoryControllerUtils { * @return memory controllable RocksDB shared resources. */ public static RocksDBSharedResources allocateRocksDBSharedResources( - long totalMemorySize, double writeBufferRatio, double highPriorityPoolRatio) { + long totalMemorySize, + double writeBufferRatio, + double highPriorityPoolRatio, + boolean usingPartitionedIndexFilters) { long calculatedCacheCapacity = RocksDBMemoryControllerUtils.calculateActualCacheCapacity( totalMemorySize, writeBufferRatio); @@ -54,7 +57,8 @@ public static RocksDBSharedResources allocateRocksDBSharedResources( RocksDBMemoryControllerUtils.createWriteBufferManager( writeBufferManagerCapacity, cache); - return new RocksDBSharedResources(cache, wbm, writeBufferManagerCapacity); + return new RocksDBSharedResources( + cache, wbm, writeBufferManagerCapacity, usingPartitionedIndexFilters); } /** diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java index acb99e4b6c0a6..e14d83bb74439 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOperationUtils.java @@ -270,11 +270,15 @@ public static OpaqueMemoryResource allocateSharedCachesI final double highPriorityPoolRatio = memoryConfig.getHighPriorityPoolRatio(); final double writeBufferRatio = memoryConfig.getWriteBufferRatio(); + final boolean usingPartitionedIndexFilters = memoryConfig.isUsingPartitionedIndexFilters(); final LongFunctionWithException allocator = (size) -> RocksDBMemoryControllerUtils.allocateRocksDBSharedResources( - size, writeBufferRatio, highPriorityPoolRatio); + size, + writeBufferRatio, + highPriorityPoolRatio, + usingPartitionedIndexFilters); try { if (memoryConfig.isUsingFixedMemoryPerSlot()) { diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java index 5e8def43118af..369e9440dba3d 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBOptions.java @@ -135,4 +135,18 @@ public class RocksDBOptions { "The fraction of cache memory that is reserved for high-priority data like index, filter, and " + "compression dictionary blocks. This option only has an effect when '%s' or '%s' are configured.", USE_MANAGED_MEMORY.key(), FIX_PER_SLOT_MEMORY_SIZE.key())); + + @Documentation.Section(Documentation.Sections.STATE_BACKEND_ROCKSDB) + public static final ConfigOption USE_PARTITIONED_INDEX_FILTERS = + ConfigOptions.key("state.backend.rocksdb.memory.partitioned-index-filters") + .booleanType() + .defaultValue(false) + .withDescription( + String.format( + "With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with " + + "an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. " + + "The partitioned index/filter then uses the top-level index to load on demand into the block cache " + + "the partitions that are required to perform the index/filter query. " + + "This option only has an effect when '%s' or '%s' are configured.", + USE_MANAGED_MEMORY.key(), FIX_PER_SLOT_MEMORY_SIZE.key())); } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java index 702141e51a8ad..542bfd30b8fed 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainer.java @@ -18,20 +18,27 @@ package org.apache.flink.contrib.streaming.state; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.runtime.memory.OpaqueMemoryResource; import org.apache.flink.util.IOUtils; import org.apache.flink.util.Preconditions; import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.Filter; +import org.rocksdb.IndexType; import org.rocksdb.ReadOptions; import org.rocksdb.TableFormatConfig; import org.rocksdb.WriteOptions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.annotation.Nullable; +import java.lang.reflect.Field; import java.util.ArrayList; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -44,6 +51,7 @@ * options, and should be properly (and necessarily) closed to prevent resource leak. */ public final class RocksDBResourceContainer implements AutoCloseable { + private static final Logger LOG = LoggerFactory.getLogger(RocksDBResourceContainer.class); /** The pre-configured option settings. */ private final PredefinedOptions predefinedOptions; @@ -143,6 +151,12 @@ public ColumnFamilyOptions getColumnOptions() { "We currently only support BlockBasedTableConfig When bounding total memory."); blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig; } + if (rocksResources.isUsingPartitionedIndexFilters() + && overwriteFilterIfExist(blockBasedTableConfig)) { + blockBasedTableConfig.setIndexType(IndexType.kTwoLevelIndexSearch); + blockBasedTableConfig.setPartitionFilters(true); + blockBasedTableConfig.setPinTopLevelIndexAndFilter(true); + } blockBasedTableConfig.setBlockCache(blockCache); blockBasedTableConfig.setCacheIndexAndFilterBlocks(true); blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true); @@ -207,4 +221,39 @@ public void close() throws Exception { sharedResources.close(); } } + + /** + * Overwrite configured {@link Filter} if enable partitioned filter. Partitioned filter only + * worked in full bloom filter, not blocked based. + */ + private boolean overwriteFilterIfExist(BlockBasedTableConfig blockBasedTableConfig) { + Filter filter = null; + try { + filter = getFilterFromBlockBasedTableConfig(blockBasedTableConfig); + } catch (NoSuchFieldException | IllegalAccessException e) { + LOG.warn( + "Reflection exception occurred when getting filter from BlockBasedTableConfig, disable partition index filters!"); + return false; + } + if (filter != null) { + // TODO Can get filter's config in the future RocksDB version, and build new filter use + // existing config. + BloomFilter newFilter = new BloomFilter(10, false); + LOG.info( + "Existing filter has been overwritten to full filters since partitioned index filters is enabled."); + blockBasedTableConfig.setFilter(newFilter); + handlesToClose.add(newFilter); + } + return true; + } + + @VisibleForTesting + static Filter getFilterFromBlockBasedTableConfig(BlockBasedTableConfig blockBasedTableConfig) + throws NoSuchFieldException, IllegalAccessException { + Field filterField = blockBasedTableConfig.getClass().getDeclaredField("filter_"); + filterField.setAccessible(true); + Object filter = filterField.get(blockBasedTableConfig); + filterField.setAccessible(false); + return filter == null ? null : (Filter) filter; + } } diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java index 144e6f365b4cb..f48f4eb025600 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBSharedResources.java @@ -32,11 +32,17 @@ final class RocksDBSharedResources implements AutoCloseable { private final WriteBufferManager writeBufferManager; private final long writeBufferManagerCapacity; + private final boolean usingPartitionedIndexFilters; + RocksDBSharedResources( - Cache cache, WriteBufferManager writeBufferManager, long writeBufferManagerCapacity) { + Cache cache, + WriteBufferManager writeBufferManager, + long writeBufferManagerCapacity, + boolean usingPartitionedIndexFilters) { this.cache = cache; this.writeBufferManager = writeBufferManager; this.writeBufferManagerCapacity = writeBufferManagerCapacity; + this.usingPartitionedIndexFilters = usingPartitionedIndexFilters; } public Cache getCache() { @@ -51,6 +57,10 @@ public long getWriteBufferManagerCapacity() { return writeBufferManagerCapacity; } + public boolean isUsingPartitionedIndexFilters() { + return usingPartitionedIndexFilters; + } + @Override public void close() { writeBufferManager.close(); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java index ce59b4b04af45..12b33e1fdfc0a 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBMemoryControllerUtilsTest.java @@ -40,6 +40,7 @@ import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyBoolean; import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.ArgumentMatchers.anyLong; import static org.powermock.api.mockito.PowerMockito.when; @@ -64,7 +65,7 @@ public void testCreateSharedResourcesWithExpectedCapacity() { final AtomicLong actualWbmCapacity = new AtomicLong(0L); when(RocksDBMemoryControllerUtils.allocateRocksDBSharedResources( - anyLong(), anyDouble(), anyDouble())) + anyLong(), anyDouble(), anyDouble(), anyBoolean())) .thenCallRealMethod(); when(RocksDBMemoryControllerUtils.calculateActualCacheCapacity(anyLong(), anyDouble())) @@ -99,7 +100,7 @@ public void testCreateSharedResourcesWithExpectedCapacity() { double highPriPoolRatio = 0.1; RocksDBSharedResources rocksDBSharedResources = RocksDBMemoryControllerUtils.allocateRocksDBSharedResources( - totalMemorySize, writeBufferRatio, highPriPoolRatio); + totalMemorySize, writeBufferRatio, highPriPoolRatio, false); long expectedCacheCapacity = RocksDBMemoryControllerUtils.calculateActualCacheCapacity( totalMemorySize, writeBufferRatio); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java index 93f56bb0ce9ec..ad8eb8a02d2b8 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBResourceContainerTest.java @@ -26,22 +26,29 @@ import org.junit.Test; import org.junit.rules.TemporaryFolder; import org.rocksdb.BlockBasedTableConfig; +import org.rocksdb.BloomFilter; import org.rocksdb.Cache; import org.rocksdb.ColumnFamilyOptions; import org.rocksdb.DBOptions; +import org.rocksdb.IndexType; import org.rocksdb.LRUCache; import org.rocksdb.NativeLibraryLoader; import org.rocksdb.ReadOptions; +import org.rocksdb.TableFormatConfig; import org.rocksdb.WriteBufferManager; import org.rocksdb.WriteOptions; import java.io.IOException; import java.lang.reflect.Field; import java.util.ArrayList; +import java.util.Collection; import java.util.HashSet; +import static org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getFilterFromBlockBasedTableConfig; import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.fail; /** Tests to guard {@link RocksDBResourceContainer}. */ @@ -152,7 +159,7 @@ private OpaqueMemoryResource getSharedResources() { final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1); final WriteBufferManager wbm = new WriteBufferManager(writeBufferSize, cache); RocksDBSharedResources rocksDBSharedResources = - new RocksDBSharedResources(cache, wbm, writeBufferSize); + new RocksDBSharedResources(cache, wbm, writeBufferSize, false); return new OpaqueMemoryResource<>( rocksDBSharedResources, cacheSize, rocksDBSharedResources::close); } @@ -240,7 +247,8 @@ public void testFreeMultipleColumnOptionsWithPredefinedOptions() throws Exceptio public void testFreeSharedResourcesAfterClose() throws Exception { LRUCache cache = new LRUCache(1024L); WriteBufferManager wbm = new WriteBufferManager(1024L, cache); - RocksDBSharedResources sharedResources = new RocksDBSharedResources(cache, wbm, 1024L); + RocksDBSharedResources sharedResources = + new RocksDBSharedResources(cache, wbm, 1024L, false); final ThrowingRunnable disposer = sharedResources::close; OpaqueMemoryResource opaqueResource = new OpaqueMemoryResource<>(sharedResources, 1024L, disposer); @@ -264,4 +272,54 @@ public void testFreeWriteReadOptionsAfterClose() throws Exception { assertThat(writeOptions.isOwningHandle(), is(false)); assertThat(readOptions.isOwningHandle(), is(false)); } + + @Test + public void testGetColumnFamilyOptionsWithPartitionedIndex() throws Exception { + LRUCache cache = new LRUCache(1024L); + WriteBufferManager wbm = new WriteBufferManager(1024L, cache); + RocksDBSharedResources sharedResources = + new RocksDBSharedResources(cache, wbm, 1024L, true); + final ThrowingRunnable disposer = sharedResources::close; + OpaqueMemoryResource opaqueResource = + new OpaqueMemoryResource<>(sharedResources, 1024L, disposer); + BloomFilter blockBasedFilter = new BloomFilter(); + RocksDBOptionsFactory blockBasedBloomFilterOptionFactory = + new RocksDBOptionsFactory() { + + @Override + public DBOptions createDBOptions( + DBOptions currentOptions, Collection handlesToClose) { + return currentOptions; + } + + @Override + public ColumnFamilyOptions createColumnOptions( + ColumnFamilyOptions currentOptions, + Collection handlesToClose) { + TableFormatConfig tableFormatConfig = currentOptions.tableFormatConfig(); + BlockBasedTableConfig blockBasedTableConfig = + tableFormatConfig == null + ? new BlockBasedTableConfig() + : (BlockBasedTableConfig) tableFormatConfig; + blockBasedTableConfig.setFilter(blockBasedFilter); + handlesToClose.add(blockBasedFilter); + currentOptions.setTableFormatConfig(blockBasedTableConfig); + return currentOptions; + } + }; + try (RocksDBResourceContainer container = + new RocksDBResourceContainer( + PredefinedOptions.DEFAULT, + blockBasedBloomFilterOptionFactory, + opaqueResource)) { + ColumnFamilyOptions columnOptions = container.getColumnOptions(); + BlockBasedTableConfig actual = + (BlockBasedTableConfig) columnOptions.tableFormatConfig(); + assertThat(actual.indexType(), is(IndexType.kTwoLevelIndexSearch)); + assertThat(actual.partitionFilters(), is(true)); + assertThat(actual.pinTopLevelIndexAndFilter(), is(true)); + assertThat(getFilterFromBlockBasedTableConfig(actual), not(blockBasedFilter)); + } + assertFalse("Block based filter is left unclosed.", blockBasedFilter.isOwningHandle()); + } }