Skip to content

Commit

Permalink
[hotfix] improve javadoc and logging of RocksDBKeyedStateBackend
Browse files Browse the repository at this point in the history
This closes apache#5366.
  • Loading branch information
bowenli86 authored and tillrohrmann committed Jan 26, 2018
1 parent 4e7f281 commit 71dee4e
Showing 1 changed file with 16 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@
import java.util.stream.StreamSupport;

/**
* A {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and will serialize state to
* An {@link AbstractKeyedStateBackend} that stores its state in {@code RocksDB} and serializes state to
* streams provided by a {@link org.apache.flink.runtime.state.CheckpointStreamFactory} upon
* checkpointing. This state backend can store very large state that exceeds memory and spills
* to disk. Except for the snapshotting, this class should be accessed as if it is not threadsafe.
Expand All @@ -139,7 +139,7 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** File suffix of sstable files. */
private static final String SST_FILE_SUFFIX = ".sst";

/** Bytes for the name of the column decriptor for the default column family. */
/** Bytes for the name of the column descriptor for the default column family. */
public static final byte[] DEFAULT_COLUMN_FAMILY_NAME_BYTES = "default".getBytes(ConfigConstants.DEFAULT_CHARSET);

/** String that identifies the operator that owns this backend. */
Expand All @@ -154,17 +154,17 @@ public class RocksDBKeyedStateBackend<K> extends AbstractKeyedStateBackend<K> {
/** Path where this configured instance stores its data directory. */
private final File instanceBasePath;

/** Path where this configured instance stores its RocksDB data base. */
/** Path where this configured instance stores its RocksDB database. */
private final File instanceRocksDBPath;

/**
* Protects access to RocksDB in other threads, like the checkpointing thread from parallel call that dispose the
* Protects access to RocksDB in other threads, like the checkpointing thread from parallel call that disposes the
* RocksDb object.
*/
private final ResourceGuard rocksDBResourceGuard;

/**
* Our RocksDB data base, this is used by the actual subclasses of {@link AbstractRocksDBState}
* Our RocksDB database, this is used by the actual subclasses of {@link AbstractRocksDBState}
* to store state. The different k/v states that we have don't each have their own RocksDB
* instance. They all write to this instance but to their own column family.
*/
Expand Down Expand Up @@ -242,7 +242,8 @@ public RocksDBKeyedStateBackend(
}

if (!instanceBasePath.mkdirs()) {
throw new IOException("Could not create RocksDB data directory.");
throw new IOException(
String.format("Could not create RocksDB data directory at %s.", instanceBasePath.getAbsolutePath()));
}

this.keyGroupPrefixBytes = getNumberOfKeyGroups() > (Byte.MAX_VALUE + 1) ? 2 : 1;
Expand Down Expand Up @@ -275,7 +276,7 @@ public <N> Stream<K> getKeys(String state, N namespace) {
public void dispose() {
super.dispose();

// This call will block until all clients that still acquired access to the RocksDB instance have released it,
// This call will block until all clients that still acquire access to the RocksDB instance have released it,
// so that we cannot release the native resources while clients are still working with it in parallel.
rocksDBResourceGuard.close();

Expand Down Expand Up @@ -361,8 +362,8 @@ private RunnableFuture<KeyedStateHandle> snapshotIncrementally(

if (kvStateInformation.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " +
checkpointTimestamp + " . Returning null.");
LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.",
checkpointTimestamp);
}
return DoneFuture.nullValue();
}
Expand Down Expand Up @@ -409,8 +410,7 @@ private RunnableFuture<KeyedStateHandle> snapshotFully(

if (kvStateInformation.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at " + timestamp +
" . Returning null.");
LOG.debug("Asynchronous RocksDB snapshot performed on empty keyed state at {}. Returning null.", timestamp);
}

return DoneFuture.nullValue();
Expand Down Expand Up @@ -472,8 +472,8 @@ public KeyGroupsStateHandle performOperation() throws Exception {
}
};

LOG.info("Asynchronous RocksDB snapshot (" + streamFactory + ", synchronous part) in thread " +
Thread.currentThread() + " took " + (System.currentTimeMillis() - startTime) + " ms.");
LOG.info("Asynchronous RocksDB snapshot ({}, synchronous part) in thread {} took {} ms.",
streamFactory, Thread.currentThread(), (System.currentTimeMillis() - startTime));

return AsyncStoppableTaskWithCallback.from(ioCallable);
}
Expand Down Expand Up @@ -1578,7 +1578,7 @@ void restore(Collection<KeyedStateHandle> restoreStateHandles) throws Exception

/**
* Creates a column family handle for use with a k/v state. When restoring from a snapshot
* we don't restore the individual k/v states, just the global RocksDB data base and the
* we don't restore the individual k/v states, just the global RocksDB database and the
* list of column families. When a k/v state is first requested we check here whether we
* already have a column family for that and return it or create a new one if it doesn't exist.
*
Expand Down Expand Up @@ -1723,7 +1723,7 @@ protected <N, UK, UV> InternalMapState<N, UK, UV> createMapState(
}

/**
* Wraps a RocksDB iterator to cache it's current key and assign an id for the key/value state to the iterator.
* Wraps a RocksDB iterator to cache it's current key and assigns an id for the key/value state to the iterator.
* Used by #MergeIterator.
*/
static final class MergeIterator implements AutoCloseable {
Expand Down Expand Up @@ -1877,7 +1877,6 @@ private void detectNewKeyGroup(byte[] oldKey) {
}

/**
* Returns the key-group for the current key.
* @return key-group for the current key
*/
public int keyGroup() {
Expand All @@ -1899,7 +1898,6 @@ public byte[] value() {
}

/**
* Returns the Id of the k/v state to which the current key belongs.
* @return Id of K/V state to which the current key belongs.
*/
public int kvStateId() {
Expand Down Expand Up @@ -1955,6 +1953,7 @@ public void close() {
/**
* Only visible for testing, DO NOT USE.
*/
@VisibleForTesting
public File getInstanceBasePath() {
return instanceBasePath;
}
Expand Down

0 comments on commit 71dee4e

Please sign in to comment.