Skip to content

Commit

Permalink
[FLINK-14495][docs] Reorganize sections of RocksDB documentation
Browse files Browse the repository at this point in the history
This moves most in-depth RocksDB contents to te State Backends docs.
The "tuning large state" docs only refer to actual tuning and trouble-shooting.
  • Loading branch information
StephanEwen committed Feb 3, 2020
1 parent 45f534d commit 9afcf31
Show file tree
Hide file tree
Showing 3 changed files with 207 additions and 250 deletions.
152 changes: 31 additions & 121 deletions docs/ops/state/large_state_tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -118,160 +118,70 @@ Other state like keyed state is still snapshotted asynchronously. Please note th
The state storage workhorse of many large scale Flink streaming applications is the *RocksDB State Backend*.
The backend scales well beyond main memory and reliably stores large [keyed state](../../dev/stream/state/state.html).

Unfortunately, RocksDB's performance can vary with configuration, and there is little documentation on how to tune
RocksDB properly. For example, the default configuration is tailored towards SSDs and performs suboptimal
on spinning disks.
RocksDB's performance can vary with configuration, this section outlines some best-practices for tuning jobs that use the RocksDB State Backend.

### Incremental Checkpoints

Incremental checkpoints can dramatically reduce the checkpointing time in comparison to full checkpoints, at the cost of a (potentially) longer
recovery time. The core idea is that incremental checkpoints only record all changes to the previous completed checkpoint, instead of
producing a full, self-contained backup of the state backend. Like this, incremental checkpoints build upon previous checkpoints. Flink leverages
RocksDB's internal backup mechanism in a way that is self-consolidating over time. As a result, the incremental checkpoint history in Flink
does not grow indefinitely, and old checkpoints are eventually subsumed and pruned automatically.
When it comes to reducing the time that checkpoints take, activating incremental checkpoints should be one of the first considerations.
Incremental checkpoints can dramatically reduce the checkpointing time in comparison to full checkpoints, because incremental checkpoints only record the changes compared to the previous completed checkpoint, instead of producing a full, self-contained backup of the state backend.

While we strongly encourage the use of incremental checkpoints for large state, please note that this is a new feature and currently not enabled
by default. To enable this feature, users can instantiate a `RocksDBStateBackend` with the corresponding boolean flag in the constructor set to `true`, e.g.:
See [Incremental Checkpoints in RocksDB]({{ site.baseurl }}/ops/state/state_backends.html#incremental-checkpoints) for more background information.

{% highlight java %}
RocksDBStateBackend backend =
new RocksDBStateBackend(filebackend, true);
{% endhighlight %}
### Timers in RocksDB or on JVM Heap

### RocksDB Timers
Timers are stored in RocksDB by default, which is the more robust and scalable choice.

For RocksDB, a user can chose whether timers are stored on the heap or inside RocksDB (default). Heap-based timers can have a better performance for smaller numbers of
timers, while storing timers inside RocksDB offers higher scalability as the number of timers in RocksDB can exceed the available main memory (spilling to disk).
When performance-tuning jobs that have few timers only (no windows, not using timers in ProcessFunction), putting those timers on the heap can increase performance.
Use this feature carefully, as heap-based timers may increase checkpointing times and naturally cannot scale beyond memory.

When using RockDB as state backend, the type of timer storage can be selected through Flink's configuration via option key `state.backend.rocksdb.timer-service.factory`.
Possible choices are `heap` (to store timers on the heap, default) and `rocksdb` (to store timers in RocksDB).
See [this section]({{ site.baseurl }}/ops/state/state_backends.html#timers-heap-vs-rocksdb) for details on how to configure heap-based timers.

<span class="label label-info">Note</span> *The combination RocksDB state backend with heap-based timers currently does NOT support asynchronous snapshots for the timers state.
Other state like keyed state is still snapshotted asynchronously. Please note that this is not a regression from previous versions and will be resolved with `FLINK-10026`.*
### Tuning RocksDB Memory

### Predefined Options
The performance of the RocksDB State Backend much depends on the amount of memory that it has available. To increase performance, adding memory can help a lot, or adjusting to which functions memory goes.

Flink provides some predefined collections of option for RocksDB for different settings, and there existed two ways
to pass these predefined options to RocksDB:
- Configure the predefined options through `flink-conf.yaml` via option key `state.backend.rocksdb.predefined-options`.
The default value of this option is `DEFAULT` which means `PredefinedOptions.DEFAULT`.
- Set the predefined options programmatically, e.g. `RocksDBStateBackend.setPredefinedOptions(PredefinedOptions.SPINNING_DISK_OPTIMIZED_HIGH_MEM)`.
By default, the RocksDB State Backend uses Flink's managed memory budget for RocksDBs buffers and caches (`state.backend.rocksdb.memory.managed: true`). Please refer to the [RocksDB Memory Management]({{ site.baseurl }}/ops/state/state_backends.html#memory-management) for background on how that mechanism works.

We expect to accumulate more such profiles over time. Feel free to contribute such predefined option profiles when you
found a set of options that work well and seem representative for certain workloads.
To tune memory-related performance issues, the following steps may be helpful:

<span class="label label-info">Note</span> Predefined options which set programmatically would override the one configured via `flink-conf.yaml`.
- The first step to try and increase performance should be to increase the amount of managed memory. This usually improves the situation a lot, without opening up the complexity of tuning low-level RocksDB options.

Especially with large container/process sizes, much of the total memory can typically go to RocksDB, unless the application logic requires a lot of JVM heap itself. The default managed memory fraction *(0.4)* is conservative and can often be increased when using TaskManagers with multi-GB process sizes.

### Passing Options Factory to RocksDB
- The number of write buffers in RocksDB depends on the number of states you have in your application (states across all operators in the pipeline). Each state corresponds to one ColumnFamily, which needs its own write buffers. Hence, applications with many states typically need more memory for the same performance.

There existed two ways to pass options factory to RocksDB in Flink:
- You can try and compare the performance of RocksDB with managed memory to RocksDB with per-column-family memory by setting `state.backend.rocksdb.memory.managed: false`. Especially to test against a baseline (assuming no- or gracious container memory limits) or to test for regressions compared to earlier versions of Flink, this can be useful.

Compared to the managed memory setup (constant memory pool), not using managed memory means that RocksDB allocates memory proportional to the number of states in the application (memory footprint changes with application changes). As a rule of thumb, the non-managed mode has (unless ColumnFamily options are applied) an upper bound of roughly "140MB * num-states-across-all-tasks * num-slots". Timers count as state as well!

- Configure options factory through `flink-conf.yaml`. You could set the options factory class name via option key `state.backend.rocksdb.options-factory`.
The default value for this option is `org.apache.flink.contrib.streaming.state.DefaultConfigurableOptionsFactory`, and all candidate configurable options are defined in `RocksDBConfigurableOptions`.
Moreover, you could also define your customized and configurable options factory class like below and pass the class name to `state.backend.rocksdb.options-factory`.
- If your application has many states and you see frequent MemTable flushes (write-side bottleneck), but you cannot give more memory you can increase the ratio of memory going to the write buffers (`state.backend.rocksdb.memory.write-buffer-ratio`). See [RocksDB Memory Management]({{ site.baseurl }}/ops/state/state_backends.html#memory-management) for details.

{% highlight java %}
- An advanced option (*expert mode*) to reduce the number of MemTable flushes in setups with many states, is to tune RocksDB's ColumnFamily options (arena block size, max background flush threads, etc.) via a `RocksDBOptionsFactory`:

{% highlight java %}
public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {

private static final long DEFAULT_SIZE = 256 * 1024 * 1024; // 256 MB
private long blockCacheSize = DEFAULT_SIZE;


@Override
public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions.setIncreaseParallelism(4)
.setUseFsync(false);
// increase the max background flush threads when we have many states in one operator,
// which means we would have many column families in one DB instance.
return currentOptions.setMaxBackgroundFlushes(4);
}

@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
return currentOptions.setTableFormatConfig(
new BlockBasedTableConfig()
.setBlockCacheSize(blockCacheSize)
.setBlockSize(128 * 1024)); // 128 KB
// decrease the arena block size from default 8MB to 1MB.
return currentOptions.setArenaBlockSize(1024 * 1024);
}

@Override
public OptionsFactory configure(Configuration configuration) {
this.blockCacheSize =
configuration.getLong("my.custom.rocksdb.block.cache.size", DEFAULT_SIZE);
return this;
}
}
{% endhighlight %}

- Set the options factory programmatically, e.g. `RocksDBStateBackend.setOptions(new MyOptionsFactory());`

<span class="label label-info">Note</span> Options factory which set programmatically would override the one configured via `flink-conf.yaml`,
and options factory has a higher priority over the predefined options if ever configured or set.

<span class="label label-info">Note</span> RocksDB is a native library that allocates memory directly from the process,
and not from the JVM. Any memory you assign to RocksDB will have to be accounted for, typically by decreasing the JVM heap size
of the TaskManagers by the same amount. Not doing that may result in YARN/Mesos/etc terminating the JVM processes for
allocating more memory than configured.

### Bounding RocksDB Memory Usage

RocksDB allocates native memory outside of the JVM, which could lead the process to exceed the total memory budget.
This can be especially problematic in containerized environments such as Kubernetes that kill processes who exceed their memory budgets.

Flink limit total memory usage of RocksDB instance(s) per slot by leveraging shareable [cache](https://github.com/facebook/rocksdb/wiki/Block-Cache)
and [write buffer manager](https://github.com/facebook/rocksdb/wiki/Write-Buffer-Manager) among all instances in a single slot by default.
The shared cache will place an upper limit on the [three components](https://github.com/facebook/rocksdb/wiki/Memory-usage-in-RocksDB) that use the majority of memory
when RocksDB is deployed as a state backend: block cache, index and bloom filters, and MemTables.

This feature is enabled by default along with managed memory. Flink will use the managed memory budget as the per-slot memory limit for RocksDB state backend(s).

Flink also provides two parameters to tune the memory fraction of MemTable and index & filters along with the bounding RocksDB memory usage feature:
- `state.backend.rocksdb.memory.write-buffer-ratio`, by default `0.5`, which means 50% of the given memory would be used by write buffer manager.
- `state.backend.rocksdb.memory.high-prio-pool-ratio`, by default `0.1`, which means 10% of the given memory would be set as high priority for index and filters in shared block cache.
We strongly suggest not to set this to zero, to prevent index and filters from competing against data blocks for staying in cache and causing performance issues.
Moreover, the L0 level filter and index are pinned into the cache by default to mitigate performance problems,
more details please refer to the [RocksDB-documentation](https://github.com/facebook/rocksdb/wiki/Block-Cache#caching-index-filter-and-compression-dictionary-blocks).

<span class="label label-info">Note</span> When bounded RocksDB memory usage is enabled by default,
the shared `cache` and `write buffer manager` will override customized settings of block cache and write buffer via `PredefinedOptions` and `OptionsFactory`.

*Experts only*: To control memory manually instead of using managed memory, user can set `state.backend.rocksdb.memory.managed` as `false` and control via `ColumnFamilyOptions`.
Or to save some manual calculation, through the `state.backend.rocksdb.memory.fixed-per-slot` option which will override `state.backend.rocksdb.memory.managed` when configured.
With the later method, please tune down `taskmanager.memory.managed.size` or `taskmanager.memory.managed.fraction` to `0`
and increase `taskmanager.memory.task.off-heap.size` by "`taskmanager.numberOfTaskSlots` * `state.backend.rocksdb.memory.fixed-per-slot`" accordingly.

#### Tune performance when bounding RocksDB memory usage.

There might existed performance regression compared with previous no-memory-limit case if you have too many states per slot.
- If you observed this behavior and not running jobs in containerized environment or don't care about the over-limit memory usage.
The easiest way to wipe out the performance regression is to disable memory bound for RocksDB, e.g. turn `state.backend.rocksdb.memory.managed` as `false`.
Moreover, please refer to [memory configuration migration guide](WIP) to know how to keep backward compatibility to previous memory configuration.
- Otherwise you need to increase the upper memory for RocksDB by tuning up `taskmanager.memory.managed.size` or `taskmanager.memory.managed.fraction`, or increasing the total memory for task manager.

*Experts only*: Apart from increasing total memory, user could also tune RocksDB options (e.g. arena block size, max background flush threads, etc.) via `RocksDBOptionsFactory`:

{% highlight java %}
public class MyOptionsFactory implements ConfigurableRocksDBOptionsFactory {

@Override
public DBOptions createDBOptions(DBOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
// increase the max background flush threads when we have many states in one operator,
// which means we would have many column families in one DB instance.
return currentOptions.setMaxBackgroundFlushes(4);
}

@Override
public ColumnFamilyOptions createColumnOptions(
ColumnFamilyOptions currentOptions, Collection<AutoCloseable> handlesToClose) {
// decrease the arena block size from default 8MB to 1MB.
return currentOptions.setArenaBlockSize(1024 * 1024);
}

@Override
public OptionsFactory configure(Configuration configuration) {
return this;
}
}
{% endhighlight %}

## Capacity Planning

This section discusses how to decide how many resources should be used for a Flink job to run reliably.
Expand Down
Loading

0 comments on commit 9afcf31

Please sign in to comment.