Skip to content

Commit

Permalink
[FLINK-20496][state backends] RocksDB partitioned index/filters option.
Browse files Browse the repository at this point in the history
Configure partitioned index and filters options according to 'https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters'.

This closes apache#14341.
  • Loading branch information
liuyufei authored and carp84 committed Mar 4, 2021
1 parent be78727 commit f743974
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 8 deletions.
6 changes: 6 additions & 0 deletions docs/layouts/shortcodes/generated/rocksdb_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<td>Boolean</td>
<td>If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.memory.partitioned-index-filters</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. The partitioned index/filter then uses the top-level index to load on demand into the block cache the partitions that are required to perform the index/filter query. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.memory.write-buffer-ratio</h5></td>
<td style="word-wrap: break-word;">0.5</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
<td>Boolean</td>
<td>If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.memory.partitioned-index-filters</h5></td>
<td style="word-wrap: break-word;">false</td>
<td>Boolean</td>
<td>With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. The partitioned index/filter then uses the top-level index to load on demand into the block cache the partitions that are required to perform the index/filter query. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.memory.write-buffer-ratio</h5></td>
<td style="word-wrap: break-word;">0.5</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ public final class RocksDBMemoryConfiguration implements Serializable {
*/
@Nullable private Double highPriorityPoolRatio;

/** Flag whether to use partition index/filters. Null if not set. */
@Nullable private Boolean usePartitionedIndexFilters;

// ------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -166,6 +169,17 @@ public double getHighPriorityPoolRatio() {
: RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue();
}

/**
* Gets whether the state backend is configured to use partitioned index/filters for RocksDB.
*
* <p>See {@link RocksDBOptions#USE_PARTITIONED_INDEX_FILTERS} for details.
*/
public Boolean isUsingPartitionedIndexFilters() {
return usePartitionedIndexFilters != null
? usePartitionedIndexFilters
: RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS.defaultValue();
}

// ------------------------------------------------------------------------

/** Validates if the configured options are valid with respect to one another. */
Expand Down Expand Up @@ -219,6 +233,11 @@ public static RocksDBMemoryConfiguration fromOtherAndConfiguration(
? other.highPriorityPoolRatio
: config.get(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO);

newConfig.usePartitionedIndexFilters =
other.usePartitionedIndexFilters != null
? other.usePartitionedIndexFilters
: config.get(RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS);

return newConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ public class RocksDBMemoryControllerUtils {
* @return memory controllable RocksDB shared resources.
*/
public static RocksDBSharedResources allocateRocksDBSharedResources(
long totalMemorySize, double writeBufferRatio, double highPriorityPoolRatio) {
long totalMemorySize,
double writeBufferRatio,
double highPriorityPoolRatio,
boolean usingPartitionedIndexFilters) {
long calculatedCacheCapacity =
RocksDBMemoryControllerUtils.calculateActualCacheCapacity(
totalMemorySize, writeBufferRatio);
Expand All @@ -54,7 +57,8 @@ public static RocksDBSharedResources allocateRocksDBSharedResources(
RocksDBMemoryControllerUtils.createWriteBufferManager(
writeBufferManagerCapacity, cache);

return new RocksDBSharedResources(cache, wbm, writeBufferManagerCapacity);
return new RocksDBSharedResources(
cache, wbm, writeBufferManagerCapacity, usingPartitionedIndexFilters);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,11 +270,15 @@ public static OpaqueMemoryResource<RocksDBSharedResources> allocateSharedCachesI

final double highPriorityPoolRatio = memoryConfig.getHighPriorityPoolRatio();
final double writeBufferRatio = memoryConfig.getWriteBufferRatio();
final boolean usingPartitionedIndexFilters = memoryConfig.isUsingPartitionedIndexFilters();

final LongFunctionWithException<RocksDBSharedResources, Exception> allocator =
(size) ->
RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(
size, writeBufferRatio, highPriorityPoolRatio);
size,
writeBufferRatio,
highPriorityPoolRatio,
usingPartitionedIndexFilters);

try {
if (memoryConfig.isUsingFixedMemoryPerSlot()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,4 +135,18 @@ public class RocksDBOptions {
"The fraction of cache memory that is reserved for high-priority data like index, filter, and "
+ "compression dictionary blocks. This option only has an effect when '%s' or '%s' are configured.",
USE_MANAGED_MEMORY.key(), FIX_PER_SLOT_MEMORY_SIZE.key()));

@Documentation.Section(Documentation.Sections.STATE_BACKEND_ROCKSDB)
public static final ConfigOption<Boolean> USE_PARTITIONED_INDEX_FILTERS =
ConfigOptions.key("state.backend.rocksdb.memory.partitioned-index-filters")
.booleanType()
.defaultValue(false)
.withDescription(
String.format(
"With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with "
+ "an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. "
+ "The partitioned index/filter then uses the top-level index to load on demand into the block cache "
+ "the partitions that are required to perform the index/filter query. "
+ "This option only has an effect when '%s' or '%s' are configured.",
USE_MANAGED_MEMORY.key(), FIX_PER_SLOT_MEMORY_SIZE.key()));
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,27 @@

package org.apache.flink.contrib.streaming.state;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.runtime.memory.OpaqueMemoryResource;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;

import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.Filter;
import org.rocksdb.IndexType;
import org.rocksdb.ReadOptions;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteOptions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.lang.reflect.Field;
import java.util.ArrayList;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand All @@ -44,6 +51,7 @@
* options, and should be properly (and necessarily) closed to prevent resource leak.
*/
public final class RocksDBResourceContainer implements AutoCloseable {
private static final Logger LOG = LoggerFactory.getLogger(RocksDBResourceContainer.class);

/** The pre-configured option settings. */
private final PredefinedOptions predefinedOptions;
Expand Down Expand Up @@ -143,6 +151,12 @@ public ColumnFamilyOptions getColumnOptions() {
"We currently only support BlockBasedTableConfig When bounding total memory.");
blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig;
}
if (rocksResources.isUsingPartitionedIndexFilters()
&& overwriteFilterIfExist(blockBasedTableConfig)) {
blockBasedTableConfig.setIndexType(IndexType.kTwoLevelIndexSearch);
blockBasedTableConfig.setPartitionFilters(true);
blockBasedTableConfig.setPinTopLevelIndexAndFilter(true);
}
blockBasedTableConfig.setBlockCache(blockCache);
blockBasedTableConfig.setCacheIndexAndFilterBlocks(true);
blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
Expand Down Expand Up @@ -207,4 +221,39 @@ public void close() throws Exception {
sharedResources.close();
}
}

/**
* Overwrite configured {@link Filter} if enable partitioned filter. Partitioned filter only
* worked in full bloom filter, not blocked based.
*/
private boolean overwriteFilterIfExist(BlockBasedTableConfig blockBasedTableConfig) {
Filter filter = null;
try {
filter = getFilterFromBlockBasedTableConfig(blockBasedTableConfig);
} catch (NoSuchFieldException | IllegalAccessException e) {
LOG.warn(
"Reflection exception occurred when getting filter from BlockBasedTableConfig, disable partition index filters!");
return false;
}
if (filter != null) {
// TODO Can get filter's config in the future RocksDB version, and build new filter use
// existing config.
BloomFilter newFilter = new BloomFilter(10, false);
LOG.info(
"Existing filter has been overwritten to full filters since partitioned index filters is enabled.");
blockBasedTableConfig.setFilter(newFilter);
handlesToClose.add(newFilter);
}
return true;
}

@VisibleForTesting
static Filter getFilterFromBlockBasedTableConfig(BlockBasedTableConfig blockBasedTableConfig)
throws NoSuchFieldException, IllegalAccessException {
Field filterField = blockBasedTableConfig.getClass().getDeclaredField("filter_");
filterField.setAccessible(true);
Object filter = filterField.get(blockBasedTableConfig);
filterField.setAccessible(false);
return filter == null ? null : (Filter) filter;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,17 @@ final class RocksDBSharedResources implements AutoCloseable {
private final WriteBufferManager writeBufferManager;
private final long writeBufferManagerCapacity;

private final boolean usingPartitionedIndexFilters;

RocksDBSharedResources(
Cache cache, WriteBufferManager writeBufferManager, long writeBufferManagerCapacity) {
Cache cache,
WriteBufferManager writeBufferManager,
long writeBufferManagerCapacity,
boolean usingPartitionedIndexFilters) {
this.cache = cache;
this.writeBufferManager = writeBufferManager;
this.writeBufferManagerCapacity = writeBufferManagerCapacity;
this.usingPartitionedIndexFilters = usingPartitionedIndexFilters;
}

public Cache getCache() {
Expand All @@ -51,6 +57,10 @@ public long getWriteBufferManagerCapacity() {
return writeBufferManagerCapacity;
}

public boolean isUsingPartitionedIndexFilters() {
return usingPartitionedIndexFilters;
}

@Override
public void close() {
writeBufferManager.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.powermock.api.mockito.PowerMockito.when;
Expand All @@ -64,7 +65,7 @@ public void testCreateSharedResourcesWithExpectedCapacity() {
final AtomicLong actualWbmCapacity = new AtomicLong(0L);

when(RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(
anyLong(), anyDouble(), anyDouble()))
anyLong(), anyDouble(), anyDouble(), anyBoolean()))
.thenCallRealMethod();

when(RocksDBMemoryControllerUtils.calculateActualCacheCapacity(anyLong(), anyDouble()))
Expand Down Expand Up @@ -99,7 +100,7 @@ public void testCreateSharedResourcesWithExpectedCapacity() {
double highPriPoolRatio = 0.1;
RocksDBSharedResources rocksDBSharedResources =
RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(
totalMemorySize, writeBufferRatio, highPriPoolRatio);
totalMemorySize, writeBufferRatio, highPriPoolRatio, false);
long expectedCacheCapacity =
RocksDBMemoryControllerUtils.calculateActualCacheCapacity(
totalMemorySize, writeBufferRatio);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,29 @@
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.rocksdb.BlockBasedTableConfig;
import org.rocksdb.BloomFilter;
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.IndexType;
import org.rocksdb.LRUCache;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.ReadOptions;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteBufferManager;
import org.rocksdb.WriteOptions;

import java.io.IOException;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;

import static org.apache.flink.contrib.streaming.state.RocksDBResourceContainer.getFilterFromBlockBasedTableConfig;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;

/** Tests to guard {@link RocksDBResourceContainer}. */
Expand Down Expand Up @@ -152,7 +159,7 @@ private OpaqueMemoryResource<RocksDBSharedResources> getSharedResources() {
final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1);
final WriteBufferManager wbm = new WriteBufferManager(writeBufferSize, cache);
RocksDBSharedResources rocksDBSharedResources =
new RocksDBSharedResources(cache, wbm, writeBufferSize);
new RocksDBSharedResources(cache, wbm, writeBufferSize, false);
return new OpaqueMemoryResource<>(
rocksDBSharedResources, cacheSize, rocksDBSharedResources::close);
}
Expand Down Expand Up @@ -240,7 +247,8 @@ public void testFreeMultipleColumnOptionsWithPredefinedOptions() throws Exceptio
public void testFreeSharedResourcesAfterClose() throws Exception {
LRUCache cache = new LRUCache(1024L);
WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
RocksDBSharedResources sharedResources = new RocksDBSharedResources(cache, wbm, 1024L);
RocksDBSharedResources sharedResources =
new RocksDBSharedResources(cache, wbm, 1024L, false);
final ThrowingRunnable<Exception> disposer = sharedResources::close;
OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
Expand All @@ -264,4 +272,54 @@ public void testFreeWriteReadOptionsAfterClose() throws Exception {
assertThat(writeOptions.isOwningHandle(), is(false));
assertThat(readOptions.isOwningHandle(), is(false));
}

@Test
public void testGetColumnFamilyOptionsWithPartitionedIndex() throws Exception {
LRUCache cache = new LRUCache(1024L);
WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
RocksDBSharedResources sharedResources =
new RocksDBSharedResources(cache, wbm, 1024L, true);
final ThrowingRunnable<Exception> disposer = sharedResources::close;
OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
BloomFilter blockBasedFilter = new BloomFilter();
RocksDBOptionsFactory blockBasedBloomFilterOptionFactory =
new RocksDBOptionsFactory() {

@Override
public DBOptions createDBOptions(
DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions;
}

@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions,
Collection<AutoCloseable> handlesToClose) {
TableFormatConfig tableFormatConfig = currentOptions.tableFormatConfig();
BlockBasedTableConfig blockBasedTableConfig =
tableFormatConfig == null
? new BlockBasedTableConfig()
: (BlockBasedTableConfig) tableFormatConfig;
blockBasedTableConfig.setFilter(blockBasedFilter);
handlesToClose.add(blockBasedFilter);
currentOptions.setTableFormatConfig(blockBasedTableConfig);
return currentOptions;
}
};
try (RocksDBResourceContainer container =
new RocksDBResourceContainer(
PredefinedOptions.DEFAULT,
blockBasedBloomFilterOptionFactory,
opaqueResource)) {
ColumnFamilyOptions columnOptions = container.getColumnOptions();
BlockBasedTableConfig actual =
(BlockBasedTableConfig) columnOptions.tableFormatConfig();
assertThat(actual.indexType(), is(IndexType.kTwoLevelIndexSearch));
assertThat(actual.partitionFilters(), is(true));
assertThat(actual.pinTopLevelIndexAndFilter(), is(true));
assertThat(getFilterFromBlockBasedTableConfig(actual), not(blockBasedFilter));
}
assertFalse("Block based filter is left unclosed.", blockBasedFilter.isOwningHandle());
}
}

0 comments on commit f743974

Please sign in to comment.