Skip to content

Commit

Permalink
[FLINK-5823] [checkpoints] Make RocksDB state backend configurable
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Jan 18, 2018
1 parent 1931993 commit d19525e
Show file tree
Hide file tree
Showing 6 changed files with 327 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,20 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.execution.Environment;
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.runtime.state.ConfigurableStateBackend;
import org.apache.flink.runtime.state.DefaultOperatorStateBackend;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.OperatorStateBackend;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.util.AbstractID;

Expand Down Expand Up @@ -67,7 +72,7 @@
* using the methods {@link #setPredefinedOptions(PredefinedOptions)} and
* {@link #setOptions(OptionsFactory)}.
*/
public class RocksDBStateBackend extends AbstractStateBackend {
public class RocksDBStateBackend extends AbstractStateBackend implements ConfigurableStateBackend {

private static final long serialVersionUID = 1L;

Expand All @@ -83,9 +88,11 @@ public class RocksDBStateBackend extends AbstractStateBackend {
// -- configuration values, set in the application / configuration

/** The state backend that we use for creating checkpoint streams. */
private final AbstractStateBackend checkpointStreamBackend;
private final StateBackend checkpointStreamBackend;

/** Base paths for RocksDB directory, as configured. May be null. */
/** Base paths for RocksDB directory, as configured.
* Null if not yet set, in which case the configuration values will be used.
* The configuration defaults to the TaskManager's temp directories. */
@Nullable
private Path[] localRocksDbDirectories;

Expand All @@ -96,8 +103,10 @@ public class RocksDBStateBackend extends AbstractStateBackend {
@Nullable
private OptionsFactory optionsFactory;

/** True if incremental checkpointing is enabled. */
private boolean enableIncrementalCheckpointing;
/** True if incremental checkpointing is enabled.
* Null if not yet set, in which case the configuration values will be used. */
@Nullable
private Boolean enableIncrementalCheckpointing;

// -- runtime values, set on TaskManager when initializing / using the backend

Expand All @@ -107,8 +116,9 @@ public class RocksDBStateBackend extends AbstractStateBackend {
/** JobID for uniquifying backup paths. */
private transient JobID jobId;

/** The index of the next directory to be used from {@link #initializedDbBasePaths}.*/
private transient int nextDirectory;

/** Whether we already lazily initialized our local storage directories. */
private transient boolean isInitialized;

Expand All @@ -127,7 +137,7 @@ public class RocksDBStateBackend extends AbstractStateBackend {
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
public RocksDBStateBackend(String checkpointDataUri) throws IOException {
this(new Path(checkpointDataUri).toUri(), false);
this(new Path(checkpointDataUri).toUri());
}

/**
Expand Down Expand Up @@ -160,7 +170,7 @@ public RocksDBStateBackend(String checkpointDataUri, boolean enableIncrementalCh
* @throws IOException Thrown, if no file system can be found for the scheme in the URI.
*/
public RocksDBStateBackend(URI checkpointDataUri) throws IOException {
this(new FsStateBackend(checkpointDataUri), false);
this(new FsStateBackend(checkpointDataUri));
}

/**
Expand Down Expand Up @@ -188,7 +198,7 @@ public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheck
* <p>The snapshots of the RocksDB state will be stored using the given backend's
* {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
*
* @param checkpointStreamBackend The backend to store the
* @param checkpointStreamBackend The backend write the checkpoint streams to.
*/
public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
Expand All @@ -202,18 +212,92 @@ public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
* <p>The snapshots of the RocksDB state will be stored using the given backend's
* {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
*
* @param checkpointStreamBackend The backend to store the
* @param enableIncrementalCheckpointing True if incremental checkponting is enabled
* @param checkpointStreamBackend The backend write the checkpoint streams to.
* @param enableIncrementalCheckpointing True if incremental checkpointing is enabled.
*/
public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) {
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
}

/**
* Private constructor that creates a re-configured copy of the state backend.
*
* @param original The state backend to re-configure.
* @param config The configuration.
*/
private RocksDBStateBackend(RocksDBStateBackend original, Configuration config) {
// reconfigure the state backend backing the streams
final StateBackend originalStreamBackend = original.checkpointStreamBackend;
this.checkpointStreamBackend = originalStreamBackend instanceof ConfigurableStateBackend ?
((ConfigurableStateBackend) originalStreamBackend).configure(config) :
originalStreamBackend;

// configure incremental checkpoints
if (original.enableIncrementalCheckpointing != null) {
this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing;
}
else {
this.enableIncrementalCheckpointing =
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS);
}

// configure local directories
if (original.localRocksDbDirectories != null) {
this.localRocksDbDirectories = original.localRocksDbDirectories;
}
else {
final String rocksdbLocalPaths = config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
if (rocksdbLocalPaths != null) {
String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);

try {
setDbStoragePaths(directories);
}
catch (IllegalArgumentException e) {
throw new IllegalConfigurationException("Invalid configuration for RocksDB state " +
"backend's local storage directories: " + e.getMessage(), e);
}
}
}

// copy remaining settings
this.predefinedOptions = original.predefinedOptions;
this.optionsFactory = original.optionsFactory;
}

// ------------------------------------------------------------------------
// Reconfiguration
// ------------------------------------------------------------------------

/**
* Creates a copy of this state backend that uses the values defined in the configuration
* for fields where that were not yet specified in this state backend.
*
* @param config the configuration
* @return The re-configured variant of the state backend
*/
@Override
public RocksDBStateBackend configure(Configuration config) {
return new RocksDBStateBackend(this, config);
}

// ------------------------------------------------------------------------
// State backend methods
// ------------------------------------------------------------------------

/**
* Gets the state backend that this RocksDB state backend uses to persist
* its bytes to.
*
* <p>This RocksDB state backend only implements the RocksDB specific parts, it
* relies on the 'CheckpointBackend' to persist the checkpoint and savepoint bytes
* streams.
*/
public StateBackend getCheckpointBackend() {
return checkpointStreamBackend;
}

private void lazyInitializeForJob(
Environment env,
String operatorIdentifier) throws IOException {
Expand Down Expand Up @@ -314,7 +398,20 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
numberOfKeyGroups,
keyGroupRange,
env.getExecutionConfig(),
enableIncrementalCheckpointing);
isIncrementalCheckpointsEnabled());
}

@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) throws Exception {

//the default for RocksDB; eventually there can be a operator state backend based on RocksDB, too.
final boolean asyncSnapshots = true;
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
asyncSnapshots);
}

// ------------------------------------------------------------------------
Expand Down Expand Up @@ -390,6 +487,18 @@ public String[] getDbStoragePaths() {
}
}

/**
* Gets whether incremental checkpoints are enabled for this state backend.
*/
public boolean isIncrementalCheckpointsEnabled() {
if (enableIncrementalCheckpointing != null) {
return enableIncrementalCheckpointing;
}
else {
return CheckpointingOptions.INCREMENTAL_CHECKPOINTS.defaultValue();
}
}

// ------------------------------------------------------------------------
// Parametrize with RocksDB Options
// ------------------------------------------------------------------------
Expand Down Expand Up @@ -480,27 +589,17 @@ public ColumnFamilyOptions getColumnOptions() {
return opt;
}

@Override
public OperatorStateBackend createOperatorStateBackend(
Environment env,
String operatorIdentifier) throws Exception {

//the default for RocksDB; eventually there can be a operator state backend based on RocksDB, too.
final boolean asyncSnapshots = true;
return new DefaultOperatorStateBackend(
env.getUserClassLoader(),
env.getExecutionConfig(),
asyncSnapshots);
}
// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------

@Override
public String toString() {
return "RocksDB State Backend {" +
"isInitialized=" + isInitialized +
", configuredDbBasePaths=" + Arrays.toString(localRocksDbDirectories) +
", initializedDbBasePaths=" + Arrays.toString(initializedDbBasePaths) +
", checkpointStreamBackend=" + checkpointStreamBackend +
'}';
return "RocksDBStateBackend{" +
"checkpointStreamBackend=" + checkpointStreamBackend +
", localRocksDbDirectories=" + Arrays.toString(localRocksDbDirectories) +
", enableIncrementalCheckpointing=" + enableIncrementalCheckpointing +
'}';
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,8 @@
import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.IllegalConfigurationException;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.state.StateBackendFactory;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.IOException;

/**
Expand All @@ -36,39 +31,19 @@
*/
public class RocksDBStateBackendFactory implements StateBackendFactory<RocksDBStateBackend> {

protected static final Logger LOG = LoggerFactory.getLogger(RocksDBStateBackendFactory.class);

@Override
public RocksDBStateBackend createFromConfig(Configuration config)
throws IllegalConfigurationException, IOException {

// we need to explicitly read the checkpoint directory here, because that
// is a required constructor parameter
final String checkpointDirURI = config.getString(CheckpointingOptions.CHECKPOINTS_DIRECTORY);
final String rocksdbLocalPaths = config.getString(CheckpointingOptions.ROCKSDB_LOCAL_DIRECTORIES);
final boolean incrementalCheckpoints = config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS);

if (checkpointDirURI == null) {
throw new IllegalConfigurationException(
"Cannot create the RocksDB state backend: The configuration does not specify the " +
"checkpoint directory '" + CheckpointingOptions.CHECKPOINTS_DIRECTORY.key() + '\'');
}

try {
final Path path = new Path(checkpointDirURI);
final RocksDBStateBackend backend = new RocksDBStateBackend(path.toUri(), incrementalCheckpoints);

if (rocksdbLocalPaths != null) {
String[] directories = rocksdbLocalPaths.split(",|" + File.pathSeparator);
backend.setDbStoragePaths(directories);
}

LOG.info("State backend is set to RocksDB (configured DB storage paths {}, checkpoints to filesystem {} ) ",
backend.getDbStoragePaths(), path);

return backend;
}
catch (IllegalArgumentException e) {
throw new IllegalConfigurationException(
"Cannot initialize RocksDB State Backend with URI '" + checkpointDirURI + '.', e);
}
return new RocksDBStateBackend(checkpointDirURI).configure(config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
Expand Down Expand Up @@ -416,6 +417,12 @@ static class BlockingStreamMemoryStateBackend extends MemoryStateBackend {
public CheckpointStreamFactory createStreamFactory(JobID jobId, String operatorIdentifier) throws IOException {
return blockerCheckpointStreamFactory;
}

@Override
public BlockingStreamMemoryStateBackend configure(Configuration config) {
// retain this instance, no re-configuration!
return this;
}
}

private static class AsyncCheckpointOperator
Expand Down
Loading

0 comments on commit d19525e

Please sign in to comment.