Skip to content

Commit

Permalink
[hotfix] [rocksdb] Clean up RocksDB state backend code
Browse files Browse the repository at this point in the history
  - arrange variables to properly express configuration (client side) versus runtime (task manager side)
  - make all runtime-only fields properly transient
  - fix confusing variable name for local directories
  • Loading branch information
StephanEwen committed Jan 18, 2018
1 parent fa03e78 commit 1931993
Showing 1 changed file with 37 additions and 36 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,12 @@
import org.rocksdb.DBOptions;
import org.rocksdb.NativeLibraryLoader;
import org.rocksdb.RocksDB;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.File;
import java.io.IOException;
import java.lang.reflect.Field;
Expand All @@ -49,7 +52,7 @@
import java.util.Random;
import java.util.UUID;

import static java.util.Objects.requireNonNull;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A State Backend that stores its state in {@code RocksDB}. This state backend can
Expand All @@ -75,43 +78,41 @@ public class RocksDBStateBackend extends AbstractStateBackend {

private static boolean rocksDbInitialized = false;

// ------------------------------------------------------------------------
// Static configuration values
// ------------------------------------------------------------------------

// -- configuration values, set in the application / configuration

/** The state backend that we use for creating checkpoint streams. */
private final AbstractStateBackend checkpointStreamBackend;

/** Operator identifier that is used to uniquify the RocksDB storage path. */
private String operatorIdentifier;

/** JobID for uniquifying backup paths. */
private JobID jobId;

// DB storage directories

/** Base paths for RocksDB directory, as configured. May be null. */
private Path[] configuredDbBasePaths;

/** Base paths for RocksDB directory, as initialized. */
private File[] initializedDbBasePaths;

private int nextDirectory;

// RocksDB options
@Nullable
private Path[] localRocksDbDirectories;

/** The pre-configured option settings. */
private PredefinedOptions predefinedOptions = PredefinedOptions.DEFAULT;

/** The options factory to create the RocksDB options in the cluster. */
@Nullable
private OptionsFactory optionsFactory;

/** Whether we already lazily initialized our local storage directories. */
private transient boolean isInitialized = false;

/** True if incremental checkpointing is enabled. */
private boolean enableIncrementalCheckpointing;

// -- runtime values, set on TaskManager when initializing / using the backend

/** Base paths for RocksDB directory, as initialized. */
private transient File[] initializedDbBasePaths;

/** JobID for uniquifying backup paths. */
private transient JobID jobId;

private transient int nextDirectory;

/** Whether we already lazily initialized our local storage directories. */
private transient boolean isInitialized;

// ------------------------------------------------------------------------

/**
* Creates a new {@code RocksDBStateBackend} that stores its checkpoint data in the
Expand Down Expand Up @@ -190,7 +191,7 @@ public RocksDBStateBackend(URI checkpointDataUri, boolean enableIncrementalCheck
* @param checkpointStreamBackend The backend to store the
*/
public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend);
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
}

/**
Expand All @@ -202,10 +203,10 @@ public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend) {
* {@link AbstractStateBackend#createStreamFactory(JobID, String) checkpoint stream}.
*
* @param checkpointStreamBackend The backend to store the
* @param enableIncrementalCheckpointing True if incremental checkpointing is enabled
* @param enableIncrementalCheckpointing True if incremental checkponting is enabled
*/
public RocksDBStateBackend(AbstractStateBackend checkpointStreamBackend, boolean enableIncrementalCheckpointing) {
this.checkpointStreamBackend = requireNonNull(checkpointStreamBackend);
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
}

Expand All @@ -221,19 +222,18 @@ private void lazyInitializeForJob(
return;
}

this.operatorIdentifier = operatorIdentifier.replace(" ", "");
this.jobId = env.getJobID();

// initialize the paths where the local RocksDB files should be stored
if (configuredDbBasePaths == null) {
if (localRocksDbDirectories == null) {
// initialize from the temp directories
initializedDbBasePaths = env.getIOManager().getSpillingDirectories();
}
else {
List<File> dirs = new ArrayList<>(configuredDbBasePaths.length);
List<File> dirs = new ArrayList<>(localRocksDbDirectories.length);
String errorMessage = "";

for (Path path : configuredDbBasePaths) {
for (Path path : localRocksDbDirectories) {
File f = new File(path.toUri().getPath());
File testDir = new File(f, UUID.randomUUID().toString());
if (!testDir.mkdirs()) {
Expand All @@ -244,6 +244,7 @@ private void lazyInitializeForJob(
} else {
dirs.add(f);
}
//noinspection ResultOfMethodCallIgnored
testDir.delete();
}

Expand Down Expand Up @@ -349,7 +350,7 @@ public void setDbStoragePath(String path) {
*/
public void setDbStoragePaths(String... paths) {
if (paths == null) {
configuredDbBasePaths = null;
localRocksDbDirectories = null;
}
else if (paths.length == 0) {
throw new IllegalArgumentException("empty paths");
Expand All @@ -369,7 +370,7 @@ else if (paths.length == 0) {
}
}

configuredDbBasePaths = pp;
localRocksDbDirectories = pp;
}
}

Expand All @@ -378,12 +379,12 @@ else if (paths.length == 0) {
* @return The configured DB storage paths, or null, if none were configured.
*/
public String[] getDbStoragePaths() {
if (configuredDbBasePaths == null) {
if (localRocksDbDirectories == null) {
return null;
} else {
String[] paths = new String[configuredDbBasePaths.length];
String[] paths = new String[localRocksDbDirectories.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = configuredDbBasePaths[i].toString();
paths[i] = localRocksDbDirectories[i].toString();
}
return paths;
}
Expand All @@ -403,7 +404,7 @@ public String[] getDbStoragePaths() {
* @param options The options to set (must not be null).
*/
public void setPredefinedOptions(PredefinedOptions options) {
predefinedOptions = requireNonNull(options);
predefinedOptions = checkNotNull(options);
}

/**
Expand Down Expand Up @@ -496,7 +497,7 @@ public OperatorStateBackend createOperatorStateBackend(
public String toString() {
return "RocksDB State Backend {" +
"isInitialized=" + isInitialized +
", configuredDbBasePaths=" + Arrays.toString(configuredDbBasePaths) +
", configuredDbBasePaths=" + Arrays.toString(localRocksDbDirectories) +
", initializedDbBasePaths=" + Arrays.toString(initializedDbBasePaths) +
", checkpointStreamBackend=" + checkpointStreamBackend +
'}';
Expand Down

0 comments on commit 1931993

Please sign in to comment.