Skip to content

Commit

Permalink
[FLINK-15637][state backends] Make RocksDB the default store for time…
Browse files Browse the repository at this point in the history
…rs when using RocksDBStateBackend

This closes apache#10893.
  • Loading branch information
klion26 authored and carp84 committed Jan 21, 2020
1 parent a22b9d8 commit a02facb
Show file tree
Hide file tree
Showing 7 changed files with 13 additions and 13 deletions.
2 changes: 1 addition & 1 deletion docs/_includes/generated/rocks_db_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
</tr>
<tr>
<td><h5>state.backend.rocksdb.timer-service.factory</h5></td>
<td style="word-wrap: break-word;">"HEAP"</td>
<td style="word-wrap: break-word;">"ROCKSDB"</td>
<td>String</td>
<td>This determines the factory for timer service state implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB .</td>
</tr>
Expand Down
2 changes: 1 addition & 1 deletion docs/ops/state/large_state_tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ by default. To enable this feature, users can instantiate a `RocksDBStateBackend

**RocksDB Timers**

For RocksDB, a user can chose whether timers are stored on the heap (default) or inside RocksDB. Heap-based timers can have a better performance for smaller numbers of
For RocksDB, a user can chose whether timers are stored on the heap or inside RocksDB (default). Heap-based timers can have a better performance for smaller numbers of
timers, while storing timers inside RocksDB offers higher scalability as the number of timers in RocksDB can exceed the available main memory (spilling to disk).

When using RockDB as state backend, the type of timer storage can be selected through Flink's configuration via option key `state.backend.rocksdb.timer-service.factory`.
Expand Down
2 changes: 1 addition & 1 deletion docs/ops/state/large_state_tuning.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ by default. To enable this feature, users can instantiate a `RocksDBStateBackend

**RocksDB Timers**

For RocksDB, a user can chose whether timers are stored on the heap (default) or inside RocksDB. Heap-based timers can have a better performance for smaller numbers of
For RocksDB, a user can chose whether timers are stored on the heap or inside RocksDB (default). Heap-based timers can have a better performance for smaller numbers of
timers, while storing timers inside RocksDB offers higher scalability as the number of timers in RocksDB can exceed the available main memory (spilling to disk).

When using RockDB as state backend, the type of timer storage can be selected through Flink's configuration via option key `state.backend.rocksdb.timer-service.factory`.
Expand Down
6 changes: 3 additions & 3 deletions flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ ORIGINAL_DOP=$1
NEW_DOP=$2
STATE_BACKEND_TYPE=${3:-file}
STATE_BACKEND_FILE_ASYNC=${4:-true}
STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-heap}
STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE=${5:-rocks}

if (( $ORIGINAL_DOP >= $NEW_DOP )); then
NUM_SLOTS=$ORIGINAL_DOP
Expand All @@ -57,8 +57,8 @@ fi

set_config_key "taskmanager.numberOfTaskSlots" "${NUM_SLOTS}"

if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'rocks' ]; then
set_config_key "state.backend.rocksdb.timer-service.factory" "rocksdb"
if [ $STATE_BACKEND_ROCKS_TIMER_SERVICE_TYPE == 'heap' ]; then
set_config_key "state.backend.rocksdb.timer-service.factory" "heap"
fi
set_config_key "metrics.fetcher.update-interval" "2000"

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class RocksDBOptions {
*/
public static final ConfigOption<String> TIMER_SERVICE_FACTORY = ConfigOptions
.key("state.backend.rocksdb.timer-service.factory")
.defaultValue(HEAP.name())
.defaultValue(ROCKSDB.name())
.withDescription(String.format("This determines the factory for timer service state implementation. Options " +
"are either %s (heap-based, default) or %s for an implementation based on RocksDB .",
HEAP.name(), ROCKSDB.name()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,8 +274,8 @@ public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
this.numberOfTransferThreads = UNDEFINED_NUMBER_OF_TRANSFER_THREADS;
// for now, we use still the heap-based implementation as default
this.priorityQueueStateType = PriorityQueueStateType.HEAP;
// use RocksDB-based implementation as default from FLINK-15637
this.priorityQueueStateType = PriorityQueueStateType.ROCKSDB;
this.defaultMetricOptions = new RocksDBNativeMetricOptions();
this.enableTtlCompactionFilter = TernaryBoolean.UNDEFINED;
this.memoryConfiguration = new RocksDBMemoryConfiguration();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,24 +153,24 @@ public void testConfigureTimerService() throws Exception {

// Fix the default
Assert.assertEquals(
RocksDBStateBackend.PriorityQueueStateType.HEAP.toString(),
RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString(),
RocksDBOptions.TIMER_SERVICE_FACTORY.defaultValue());

RocksDBStateBackend rocksDbBackend = new RocksDBStateBackend(tempFolder.newFolder().toURI().toString());

RocksDBKeyedStateBackend<Integer> keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
Assert.assertEquals(HeapPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass());
Assert.assertEquals(RocksDBPriorityQueueSetFactory.class, keyedBackend.getPriorityQueueFactory().getClass());
keyedBackend.dispose();

Configuration conf = new Configuration();
conf.setString(
RocksDBOptions.TIMER_SERVICE_FACTORY,
RocksDBStateBackend.PriorityQueueStateType.ROCKSDB.toString());
RocksDBStateBackend.PriorityQueueStateType.HEAP.toString());

rocksDbBackend = rocksDbBackend.configure(conf, Thread.currentThread().getContextClassLoader());
keyedBackend = createKeyedStateBackend(rocksDbBackend, env);
Assert.assertEquals(
RocksDBPriorityQueueSetFactory.class,
HeapPriorityQueueSetFactory.class,
keyedBackend.getPriorityQueueFactory().getClass());
keyedBackend.dispose();
}
Expand Down

0 comments on commit a02facb

Please sign in to comment.