Skip to content

Commit

Permalink
[hotfix] Consolidate RocksDB configuration options in RocksDBOptions
Browse files Browse the repository at this point in the history
Rename from backend.rocksdb.priority_queue_state_type into state.backend.rocksdb.timer-service.impl
  • Loading branch information
tillrohrmann committed Jul 16, 2018
1 parent a88d6ef commit a4b4cb7
Show file tree
Hide file tree
Showing 8 changed files with 51 additions and 31 deletions.
5 changes: 0 additions & 5 deletions docs/_includes/generated/checkpointing_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,6 @@
<td style="word-wrap: break-word;">false</td>
<td></td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.localdir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
</tr>
<tr>
<td><h5>state.checkpoints.dir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
21 changes: 21 additions & 0 deletions docs/_includes/generated/rocks_db_configuration.html
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 65%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>state.backend.rocksdb.localdir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>The local directory (on the TaskManager) where RocksDB puts its files.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.timer-service.impl</h5></td>
<td style="word-wrap: break-word;">"HEAP"</td>
<td>This determines the timer service implementation. Options are either HEAP (heap-based, default) or ROCKSDB for an implementation based on RocksDB.</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -115,15 +115,4 @@ public class CheckpointingOptions {
.defaultValue(1024)
.withDescription("The minimum size of state data files. All state chunks smaller than that are stored" +
" inline in the root checkpoint metadata file.");

// ------------------------------------------------------------------------
// Options specific to the RocksDB state backend
// ------------------------------------------------------------------------

/** The local directory (on the TaskManager) where RocksDB puts its files. */
public static final ConfigOption<String> ROCKSDB_LOCAL_DIRECTORIES = ConfigOptions
.key("state.backend.rocksdb.localdir")
.noDefaultValue()
.withDeprecatedKeys("state.backend.rocksdb.checkpointdir")
.withDescription("The local directory (on the TaskManager) where RocksDB puts its files.");
}
5 changes: 5 additions & 0 deletions flink-docs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,11 @@ under the License.
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-jackson-module-jsonSchema</artifactId>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ public class ConfigOptionsDocGenerator {
new OptionsClassLocation("flink-yarn", "org.apache.flink.yarn.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.configuration"),
new OptionsClassLocation("flink-mesos", "org.apache.flink.mesos.runtime.clusterframework"),
new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus")
new OptionsClassLocation("flink-metrics/flink-metrics-prometheus", "org.apache.flink.metrics.prometheus"),
new OptionsClassLocation("flink-state-backends/flink-statebackend-rocksdb", "org.apache.flink.contrib.streaming.state")
};

static final Set<String> EXCLUSIONS = new HashSet<>(Arrays.asList(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,18 +21,27 @@
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.HEAP;
import static org.apache.flink.contrib.streaming.state.RocksDBStateBackend.PriorityQueueStateType.ROCKS;

/**
* Configuration options for the RocksDB backend.
*/
public class RockDBBackendOptions {
public class RocksDBOptions {

/** The local directory (on the TaskManager) where RocksDB puts its files. */
public static final ConfigOption<String> LOCAL_DIRECTORIES = ConfigOptions
.key("state.backend.rocksdb.localdir")
.noDefaultValue()
.withDeprecatedKeys("state.backend.rocksdb.checkpointdir")
.withDescription("The local directory (on the TaskManager) where RocksDB puts its files.");

/**
* Choice of implementation for priority queue state (e.g. timers).
* Choice of timer service implementation.
*/
public static final ConfigOption<String> PRIORITY_QUEUE_STATE_TYPE = ConfigOptions
.key("backend.rocksdb.priority_queue_state_type")
.defaultValue(RocksDBStateBackend.PriorityQueueStateType.HEAP.name())
.withDescription("This determines the implementation for the priority queue state (e.g. timers). Options are" +
"either " + RocksDBStateBackend.PriorityQueueStateType.HEAP.name() + " (heap-based, default) or " +
RocksDBStateBackend.PriorityQueueStateType.ROCKS.name() + " for in implementation based on RocksDB.");
public static final ConfigOption<String> TIMER_SERVICE_IMPL = ConfigOptions
.key("state.backend.rocksdb.timer-service.impl")
.defaultValue(HEAP.name())
.withDescription(String.format("This determines the timer service implementation. Options are either %s " +
"(heap-based, default) or %s for an implementation based on RocksDB.", HEAP.name(), ROCKS.name()));
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@
import java.util.Random;
import java.util.UUID;

import static org.apache.flink.contrib.streaming.state.RockDBBackendOptions.PRIORITY_QUEUE_STATE_TYPE;
import static org.apache.flink.contrib.streaming.state.RocksDBOptions.TIMER_SERVICE_IMPL;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
Expand Down Expand Up @@ -271,7 +271,7 @@ private RocksDBStateBackend(RocksDBStateBackend original, Configuration config)
this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));

final String priorityQueueTypeString = config.getString(PRIORITY_QUEUE_STATE_TYPE.key(), "");
final String priorityQueueTypeString = config.getString(TIMER_SERVICE_IMPL.key(), "");

this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
PriorityQueueStateType.valueOf(priorityQueueTypeString.toUpperCase()) : original.priorityQueueStateType;
Expand All @@ -281,7 +281,7 @@ private RocksDBStateBackend(RocksDBStateBackend original, Configuration config)
this.localRocksDbDirectories = original.localRocksDbDirectories;
}
else {
final String rocksdbLocalPaths = config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
final String rocksdbLocalPaths = config.getString(RocksDBOptions.LOCAL_DIRECTORIES);
if (rocksdbLocalPaths != null) {
String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,14 @@ public void testLoadFileSystemStateBackend() throws Exception {
config1.setString(backendKey, "rocksdb");
config1.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config1.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config1.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
config1.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDirs);
config1.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);

final Configuration config2 = new Configuration();
config2.setString(backendKey, RocksDBStateBackendFactory.class.getName());
config2.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir);
config2.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config2.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDirs);
config2.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDirs);
config2.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, incremental);

StateBackend backend1 = StateBackendLoader.loadStateBackendFromConfig(config1, cl, null);
Expand Down Expand Up @@ -143,7 +143,7 @@ public void testLoadFileSystemStateBackendMixed() throws Exception {
config.setString(CheckpointingOptions.CHECKPOINTS_DIRECTORY, checkpointDir); // this should not be picked up
config.setString(CheckpointingOptions.SAVEPOINT_DIRECTORY, savepointDir);
config.setBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS, !incremental); // this should not be picked up
config.setString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES, localDir3 + ":" + localDir4); // this should not be picked up
config.setString(RocksDBOptions.LOCAL_DIRECTORIES, localDir3 + ":" + localDir4); // this should not be picked up

final StateBackend loadedBackend =
StateBackendLoader.fromApplicationOrConfigOrDefault(backend, config, cl, null);
Expand Down

0 comments on commit a4b4cb7

Please sign in to comment.