Skip to content

Commit

Permalink
[FLINK-20496][state backends] RocksDB partitioned index/filters option.
Browse files Browse the repository at this point in the history
Configure partitioned index and filters options according to 'https://github.com/facebook/rocksdb/wiki/Partitioned-Index-Filters'.
  • Loading branch information
liuyufei committed Dec 8, 2020
1 parent bceac42 commit 3058421
Show file tree
Hide file tree
Showing 14 changed files with 129 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,12 @@
<td>MemorySize</td>
<td>The amount of the cache for data blocks in RocksDB. RocksDB has default block-cache size as '8MB'.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.block.metadata-blocksize</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>MemorySize</td>
<td>Approximate size (in bytes) of partitioned metadata packed per block.Currently applied to indexes block when partitioned index/filters option is enabled. RocksDB has default metadata blocksize as '4KB'.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.compaction.level.max-size-level-base</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
8 changes: 7 additions & 1 deletion docs/_includes/generated/rocksdb_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,12 @@
<td>Boolean</td>
<td>If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.memory.partitioned-index-filters</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. The partitioned index/filter then uses the top-level index to load on demand into the block cache the partitions that are required to perform the index/filter query. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.memory.write-buffer-ratio</h5></td>
<td style="word-wrap: break-word;">0.5</td>
Expand All @@ -46,7 +52,7 @@
</tr>
<tr>
<td><h5>state.backend.rocksdb.options-factory</h5></td>
<td style="word-wrap: break-word;">"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory"</td>
<td style="word-wrap: break-word;">"org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory"</td>RocksDBResourceContainerTest.java
<td>String</td>
<td>The options factory class for RocksDB to create DBOptions and ColumnFamilyOptions. The default options factory is org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory, and it would read the configured options which provided in 'RocksDBConfigurableOptions'.</td>
</tr>
Expand Down
6 changes: 6 additions & 0 deletions docs/_includes/generated/state_backend_rocksdb_section.html
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
<td>Boolean</td>
<td>If set, the RocksDB state backend will automatically configure itself to use the managed memory budget of the task slot, and divide the memory over write buffers, indexes, block caches, etc. That way, the three major uses of memory of RocksDB will be capped.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.memory.partitioned-index-filters</h5></td>
<td style="word-wrap: break-word;">true</td>
<td>Boolean</td>
<td>With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. The partitioned index/filter then uses the top-level index to load on demand into the block cache the partitions that are required to perform the index/filter query. This option only has an effect when 'state.backend.rocksdb.memory.managed' or 'state.backend.rocksdb.memory.fixed-per-slot' are configured.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.memory.write-buffer-ratio</h5></td>
<td style="word-wrap: break-word;">0.5</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_OPEN_FILES;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_SIZE_LEVEL_BASE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MAX_WRITE_BUFFER_NUMBER;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.METADATA_BLOCK_SIZE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.MIN_WRITE_BUFFER_NUMBER_TO_MERGE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.TARGET_FILE_SIZE_BASE;
import static org.apache.flink.contrib.streaming.state.RocksDBConfigurableOptions.USE_DYNAMIC_LEVEL_SIZE;
Expand Down Expand Up @@ -121,6 +122,10 @@ public ColumnFamilyOptions createColumnOptions(ColumnFamilyOptions currentOption
blockBasedTableConfig.setBlockSize(getBlockSize());
}

if (isOptionConfigured(METADATA_BLOCK_SIZE)) {
blockBasedTableConfig.setMetadataBlockSize(getMetadataBlockSize());
}

if (isOptionConfigured(BLOCK_CACHE_SIZE)) {
blockBasedTableConfig.setBlockCacheSize(getBlockCacheSize());
}
Expand Down Expand Up @@ -285,6 +290,22 @@ public DefaultConfigurableOptionsFactory setBlockSize(String blockSize) {
return this;
}

//--------------------------------------------------------------------------
// Approximate size of partitioned metadata packed per block.
// Currently applied to indexes block when partitioned index/filters option is enabled.
//--------------------------------------------------------------------------

private long getMetadataBlockSize() {
return MemorySize.parseBytes(getInternal(METADATA_BLOCK_SIZE.key()));
}

public DefaultConfigurableOptionsFactory setMetadataBlockSize(String metadataBlockSize) {
Preconditions.checkArgument(MemorySize.parseBytes(metadataBlockSize) > 0,
"Invalid configuration " + metadataBlockSize + " for metadata block size.");
setInternal(METADATA_BLOCK_SIZE.key(), metadataBlockSize);
return this;
}

//--------------------------------------------------------------------------
// The amount of the cache for data blocks in RocksDB
//--------------------------------------------------------------------------
Expand Down Expand Up @@ -315,6 +336,7 @@ public DefaultConfigurableOptionsFactory setBlockCacheSize(String blockCacheSize
MAX_WRITE_BUFFER_NUMBER,
MIN_WRITE_BUFFER_NUMBER_TO_MERGE,
BLOCK_SIZE,
METADATA_BLOCK_SIZE,
BLOCK_CACHE_SIZE
};

Expand All @@ -329,6 +351,7 @@ public DefaultConfigurableOptionsFactory setBlockCacheSize(String blockCacheSize
MAX_SIZE_LEVEL_BASE,
WRITE_BUFFER_SIZE,
BLOCK_SIZE,
METADATA_BLOCK_SIZE,
BLOCK_CACHE_SIZE
));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,14 @@ public class RocksDBConfigurableOptions implements Serializable {
.withDescription("The approximate size (in bytes) of user data packed per block. " +
"RocksDB has default blocksize as '4KB'.");

public static final ConfigOption<MemorySize> METADATA_BLOCK_SIZE =
key("state.backend.rocksdb.block.metadata-blocksize")
.memoryType()
.noDefaultValue()
.withDescription("Approximate size (in bytes) of partitioned metadata packed per block." +
"Currently applied to indexes block when partitioned index/filters option is enabled. " +
"RocksDB has default metadata blocksize as '4KB'.");

public static final ConfigOption<MemorySize> BLOCK_CACHE_SIZE =
key("state.backend.rocksdb.block.cache-size")
.memoryType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,10 @@ public final class RocksDBMemoryConfiguration implements Serializable {
@Nullable
private Double highPriorityPoolRatio;

/** Flag whether to use partition index/filters. Null if not set. */
@Nullable
private Boolean usePartitionedIndexFilters;

// ------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -155,6 +159,15 @@ public double getHighPriorityPoolRatio() {
return highPriorityPoolRatio != null ? highPriorityPoolRatio : RocksDBOptions.HIGH_PRIORITY_POOL_RATIO.defaultValue();
}

/**
* Gets whether the state backend is configured to use partitioned index/filters for RocksDB.
*
* <p>See {@link RocksDBOptions#USE_PARTITIONED_INDEX_FILTERS} for details.
*/
public Boolean isUsingPartitionedIndexFilters() {
return usePartitionedIndexFilters != null ? usePartitionedIndexFilters : RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS.defaultValue();
}

// ------------------------------------------------------------------------

/**
Expand Down Expand Up @@ -203,6 +216,10 @@ public static RocksDBMemoryConfiguration fromOtherAndConfiguration(
? other.highPriorityPoolRatio
: config.get(RocksDBOptions.HIGH_PRIORITY_POOL_RATIO);

newConfig.usePartitionedIndexFilters = other.usePartitionedIndexFilters != null
? other.usePartitionedIndexFilters
: config.get(RocksDBOptions.USE_PARTITIONED_INDEX_FILTERS);

return newConfig;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,14 +37,15 @@ public class RocksDBMemoryControllerUtils {
* @param highPriorityPoolRatio The high priority pool ratio of cache.
* @return memory controllable RocksDB shared resources.
*/
public static RocksDBSharedResources allocateRocksDBSharedResources(long totalMemorySize, double writeBufferRatio, double highPriorityPoolRatio) {
public static RocksDBSharedResources allocateRocksDBSharedResources(
long totalMemorySize, double writeBufferRatio, double highPriorityPoolRatio, boolean usingPartitionedIndex) {
long calculatedCacheCapacity = RocksDBMemoryControllerUtils.calculateActualCacheCapacity(totalMemorySize, writeBufferRatio);
final Cache cache = RocksDBMemoryControllerUtils.createCache(calculatedCacheCapacity, highPriorityPoolRatio);

long writeBufferManagerCapacity = RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity(totalMemorySize, writeBufferRatio);
final WriteBufferManager wbm = RocksDBMemoryControllerUtils.createWriteBufferManager(writeBufferManagerCapacity, cache);

return new RocksDBSharedResources(cache, wbm, writeBufferManagerCapacity);
return new RocksDBSharedResources(cache, wbm, writeBufferManagerCapacity, usingPartitionedIndex);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,10 @@ public static OpaqueMemoryResource<RocksDBSharedResources> allocateSharedCachesI

final double highPriorityPoolRatio = memoryConfig.getHighPriorityPoolRatio();
final double writeBufferRatio = memoryConfig.getWriteBufferRatio();
final boolean usingPartitionedIndex = memoryConfig.isUsingPartitionedIndexFilters();

final LongFunctionWithException<RocksDBSharedResources, Exception> allocator = (size) ->
RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(size, writeBufferRatio, highPriorityPoolRatio);
RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(size, writeBufferRatio, highPriorityPoolRatio, usingPartitionedIndex);

try {
if (memoryConfig.isUsingFixedMemoryPerSlot()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,4 +129,18 @@ public class RocksDBOptions {
"compression dictionary blocks. This option only has an effect when '%s' or '%s' are configured.",
USE_MANAGED_MEMORY.key(),
FIX_PER_SLOT_MEMORY_SIZE.key()));

@Documentation.Section(Documentation.Sections.STATE_BACKEND_ROCKSDB)
public static final ConfigOption<Boolean> USE_PARTITIONED_INDEX_FILTERS = ConfigOptions
.key("state.backend.rocksdb.memory.partitioned-index-filters")
.booleanType()
.defaultValue(false)
.withDescription(String.format(
"With partitioning, the index/filter block of an SST file is partitioned into smaller blocks with " +
"an additional top-level index on them. When reading an index/filter, only top-level index is loaded into memory. " +
"The partitioned index/filter then uses the top-level index to load on demand into the block cache " +
"the partitions that are required to perform the index/filter query. " +
"This option only has an effect when '%s' or '%s' are configured.",
USE_MANAGED_MEMORY.key(),
FIX_PER_SLOT_MEMORY_SIZE.key()));
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.IndexType;
import org.rocksdb.ReadOptions;
import org.rocksdb.TableFormatConfig;
import org.rocksdb.WriteOptions;
Expand Down Expand Up @@ -143,6 +144,11 @@ public ColumnFamilyOptions getColumnOptions() {
"We currently only support BlockBasedTableConfig When bounding total memory.");
blockBasedTableConfig = (BlockBasedTableConfig) tableFormatConfig;
}
if (rocksResources.isUsingPartitionedIndex()) {
blockBasedTableConfig.setIndexType(IndexType.kTwoLevelIndexSearch);
blockBasedTableConfig.setPartitionFilters(true);
blockBasedTableConfig.setPinTopLevelIndexAndFilter(true);
}
blockBasedTableConfig.setBlockCache(blockCache);
blockBasedTableConfig.setCacheIndexAndFilterBlocks(true);
blockBasedTableConfig.setCacheIndexAndFilterBlocksWithHighPriority(true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,13 @@ final class RocksDBSharedResources implements AutoCloseable {
private final WriteBufferManager writeBufferManager;
private final long writeBufferManagerCapacity;

RocksDBSharedResources(Cache cache, WriteBufferManager writeBufferManager, long writeBufferManagerCapacity) {
private final boolean usingPartitionedIndex;

RocksDBSharedResources(Cache cache, WriteBufferManager writeBufferManager, long writeBufferManagerCapacity, boolean usingPartitionedIndex) {
this.cache = cache;
this.writeBufferManager = writeBufferManager;
this.writeBufferManagerCapacity = writeBufferManagerCapacity;
this.usingPartitionedIndex = usingPartitionedIndex;
}

public Cache getCache() {
Expand All @@ -50,6 +53,10 @@ public long getWriteBufferManagerCapacity() {
return writeBufferManagerCapacity;
}

public boolean isUsingPartitionedIndex() {
return usingPartitionedIndex;
}

@Override
public void close() {
writeBufferManager.close();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.powermock.api.mockito.PowerMockito.when;
Expand All @@ -65,7 +66,7 @@ public void testCreateSharedResourcesWithExpectedCapacity() {
final AtomicLong actualCacheCapacity = new AtomicLong(0L);
final AtomicLong actualWbmCapacity = new AtomicLong(0L);

when(RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(anyLong(), anyDouble(), anyDouble()))
when(RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(anyLong(), anyDouble(), anyDouble(), anyBoolean()))
.thenCallRealMethod();

when(RocksDBMemoryControllerUtils.calculateActualCacheCapacity(anyLong(), anyDouble()))
Expand Down Expand Up @@ -93,7 +94,7 @@ public void testCreateSharedResourcesWithExpectedCapacity() {
long totalMemorySize = 2048L;
double writeBufferRatio = 0.5;
double highPriPoolRatio = 0.1;
RocksDBSharedResources rocksDBSharedResources = RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(totalMemorySize, writeBufferRatio, highPriPoolRatio);
RocksDBSharedResources rocksDBSharedResources = RocksDBMemoryControllerUtils.allocateRocksDBSharedResources(totalMemorySize, writeBufferRatio, highPriPoolRatio, false);
long expectedCacheCapacity = RocksDBMemoryControllerUtils.calculateActualCacheCapacity(totalMemorySize, writeBufferRatio);
long expectedWbmCapacity = RocksDBMemoryControllerUtils.calculateWriteBufferManagerCapacity(totalMemorySize, writeBufferRatio);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.rocksdb.Cache;
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.IndexType;
import org.rocksdb.LRUCache;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.ReadOptions;
Expand Down Expand Up @@ -151,7 +152,7 @@ private OpaqueMemoryResource<RocksDBSharedResources> getSharedResources() {
final long cacheSize = 1024L, writeBufferSize = 512L;
final LRUCache cache = new LRUCache(cacheSize, -1, false, 0.1);
final WriteBufferManager wbm = new WriteBufferManager(writeBufferSize, cache);
RocksDBSharedResources rocksDBSharedResources = new RocksDBSharedResources(cache, wbm, writeBufferSize);
RocksDBSharedResources rocksDBSharedResources = new RocksDBSharedResources(cache, wbm, writeBufferSize, false);
return new OpaqueMemoryResource<>(rocksDBSharedResources, cacheSize, rocksDBSharedResources::close);
}

Expand Down Expand Up @@ -237,7 +238,7 @@ public void testFreeMultipleColumnOptionsWithPredefinedOptions() throws Exceptio
public void testFreeSharedResourcesAfterClose() throws Exception {
LRUCache cache = new LRUCache(1024L);
WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
RocksDBSharedResources sharedResources = new RocksDBSharedResources(cache, wbm, 1024L);
RocksDBSharedResources sharedResources = new RocksDBSharedResources(cache, wbm, 1024L, false);
final ThrowingRunnable<Exception> disposer = sharedResources::close;
OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
Expand All @@ -260,4 +261,22 @@ public void testFreeWriteReadOptionsAfterClose() throws Exception {
assertThat(writeOptions.isOwningHandle(), is(false));
assertThat(readOptions.isOwningHandle(), is(false));
}

@Test
public void testGetColumnFamilyOptionsWithPartitionedIndex() throws Exception {
LRUCache cache = new LRUCache(1024L);
WriteBufferManager wbm = new WriteBufferManager(1024L, cache);
RocksDBSharedResources sharedResources = new RocksDBSharedResources(cache, wbm, 1024L, true);
final ThrowingRunnable<Exception> disposer = sharedResources::close;
OpaqueMemoryResource<RocksDBSharedResources> opaqueResource =
new OpaqueMemoryResource<>(sharedResources, 1024L, disposer);
try (RocksDBResourceContainer container =
new RocksDBResourceContainer(PredefinedOptions.DEFAULT, null, opaqueResource)) {
ColumnFamilyOptions columnOptions = container.getColumnOptions();
BlockBasedTableConfig actual = (BlockBasedTableConfig) columnOptions.tableFormatConfig();
assertThat(actual.indexType(), is(IndexType.kTwoLevelIndexSearch));
assertThat(actual.partitionFilters(), is(true));
assertThat(actual.pinTopLevelIndexAndFilter(), is(true));
}
}
}
Loading

0 comments on commit 3058421

Please sign in to comment.