Skip to content

Commit

Permalink
[FLINK-10461][rocksdb] Support multiple threads for downloading resto…
Browse files Browse the repository at this point in the history
…red state
  • Loading branch information
klion26 authored and zentol committed Dec 20, 2018
1 parent 1708260 commit 6431b7e
Show file tree
Hide file tree
Showing 8 changed files with 364 additions and 72 deletions.
5 changes: 5 additions & 0 deletions docs/_includes/generated/rocks_db_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,11 @@
</tr>
</thead>
<tbody>
<tr>
<td><h5>state.backend.rocksdb.checkpoint.restore.thread.num</h5></td>
<td style="word-wrap: break-word;">1</td>
<td>The number of threads used to download files from DFS in RocksDBStateBackend.</td>
</tr>
<tr>
<td><h5>state.backend.rocksdb.localdir</h5></td>
<td style="word-wrap: break-word;">(none)</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public void testListSerialization() throws Exception {
new KeyGroupRange(0, 0),
new ExecutionConfig(),
false,
1,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
TtlTimeProvider.DEFAULT,
Expand Down Expand Up @@ -126,6 +127,7 @@ public void testMapSerialization() throws Exception {
new KeyGroupRange(0, 0),
new ExecutionConfig(),
false,
1,
TestLocalRecoveryConfig.disabled(),
RocksDBStateBackend.PriorityQueueStateType.HEAP,
TtlTimeProvider.DEFAULT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy;
import org.apache.flink.contrib.streaming.state.snapshot.RocksIncrementalSnapshotStrategy;
import org.apache.flink.core.fs.FSDataInputStream;
import org.apache.flink.core.fs.FSDataOutputStream;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
Expand Down Expand Up @@ -127,6 +126,7 @@
import java.util.stream.Stream;
import java.util.stream.StreamSupport;

import static org.apache.flink.contrib.streaming.state.RocksDbStateDataTransfer.transferAllStateDataToDirectory;
import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.END_OF_KEY_GROUP_MARK;
import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.SST_FILE_SUFFIX;
import static org.apache.flink.contrib.streaming.state.snapshot.RocksSnapshotUtil.clearMetaDataFollowsFlag;
Expand Down Expand Up @@ -217,6 +217,9 @@ <K, N, SV, S extends State, IS extends S> IS createState(
/** True if incremental checkpointing is enabled. */
private final boolean enableIncrementalCheckpointing;

/** Thread number used to download from DFS when restore. */
private final int restoringThreadNum;

/** The configuration of local recovery. */
private final LocalRecoveryConfig localRecoveryConfig;

Expand Down Expand Up @@ -251,6 +254,7 @@ public RocksDBKeyedStateBackend(
KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig,
boolean enableIncrementalCheckpointing,
int restoringThreadNum,
LocalRecoveryConfig localRecoveryConfig,
RocksDBStateBackend.PriorityQueueStateType priorityQueueStateType,
TtlTimeProvider ttlTimeProvider,
Expand All @@ -264,6 +268,7 @@ public RocksDBKeyedStateBackend(
this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);

this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
this.restoringThreadNum = restoringThreadNum;
this.rocksDBResourceGuard = new ResourceGuard();

// ensure that we use the right merge operator, because other code relies on this
Expand Down Expand Up @@ -494,7 +499,7 @@ public void restore(Collection<KeyedStateHandle> restoreState) throws Exception
LOG.info("Initializing RocksDB keyed state backend.");

if (LOG.isDebugEnabled()) {
LOG.debug("Restoring snapshot from state handles: {}.", restoreState);
LOG.debug("Restoring snapshot from state handles: {}, will use {} thread(s) to download files from DFS.", restoreState, restoringThreadNum);
}

// clear all meta data
Expand Down Expand Up @@ -876,7 +881,7 @@ void restoreWithoutRescaling(KeyedStateHandle rawStateHandle) throws Exception {
IncrementalKeyedStateHandle restoreStateHandle = (IncrementalKeyedStateHandle) rawStateHandle;

// read state data.
transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath);
transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath, stateBackend.restoringThreadNum, stateBackend.cancelStreamRegistry);

stateMetaInfoSnapshots = readMetaData(restoreStateHandle.getMetaStateHandle());
columnFamilyDescriptors = createAndRegisterColumnFamilyDescriptors(stateMetaInfoSnapshots);
Expand Down Expand Up @@ -1029,7 +1034,7 @@ private RestoredDBInstance restoreDBInstanceFromStateHandle(
IncrementalKeyedStateHandle restoreStateHandle,
Path temporaryRestoreInstancePath) throws Exception {

transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath);
transferAllStateDataToDirectory(restoreStateHandle, temporaryRestoreInstancePath, stateBackend.restoringThreadNum, stateBackend.cancelStreamRegistry);

// read meta data
List<StateMetaInfoSnapshot> stateMetaInfoSnapshots =
Expand Down Expand Up @@ -1274,74 +1279,6 @@ private List<StateMetaInfoSnapshot> readMetaData(
}
}
}

private void transferAllStateDataToDirectory(
IncrementalKeyedStateHandle restoreStateHandle,
Path dest) throws IOException {

final Map<StateHandleID, StreamStateHandle> sstFiles =
restoreStateHandle.getSharedState();
final Map<StateHandleID, StreamStateHandle> miscFiles =
restoreStateHandle.getPrivateState();

transferAllDataFromStateHandles(sstFiles, dest);
transferAllDataFromStateHandles(miscFiles, dest);
}

/**
* Copies all the files from the given stream state handles to the given path, renaming the files w.r.t. their
* {@link StateHandleID}.
*/
private void transferAllDataFromStateHandles(
Map<StateHandleID, StreamStateHandle> stateHandleMap,
Path restoreInstancePath) throws IOException {

for (Map.Entry<StateHandleID, StreamStateHandle> entry : stateHandleMap.entrySet()) {
StateHandleID stateHandleID = entry.getKey();
StreamStateHandle remoteFileHandle = entry.getValue();
copyStateDataHandleData(new Path(restoreInstancePath, stateHandleID.toString()), remoteFileHandle);
}

}

/**
* Copies the file from a single state handle to the given path.
*/
private void copyStateDataHandleData(
Path restoreFilePath,
StreamStateHandle remoteFileHandle) throws IOException {

FileSystem restoreFileSystem = restoreFilePath.getFileSystem();

FSDataInputStream inputStream = null;
FSDataOutputStream outputStream = null;

try {
inputStream = remoteFileHandle.openInputStream();
stateBackend.cancelStreamRegistry.registerCloseable(inputStream);

outputStream = restoreFileSystem.create(restoreFilePath, FileSystem.WriteMode.OVERWRITE);
stateBackend.cancelStreamRegistry.registerCloseable(outputStream);

byte[] buffer = new byte[8 * 1024];
while (true) {
int numBytes = inputStream.read(buffer);
if (numBytes == -1) {
break;
}

outputStream.write(buffer, 0, numBytes);
}
} finally {
if (stateBackend.cancelStreamRegistry.unregisterCloseable(inputStream)) {
inputStream.close();
}

if (stateBackend.cancelStreamRegistry.unregisterCloseable(outputStream)) {
outputStream.close();
}
}
}
}

// ------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,12 @@ public class RocksDBOptions {
.withDescription(String.format("This determines the factory for timer service state implementation. Options " +
"are either %s (heap-based, default) or %s for an implementation based on RocksDB .",
HEAP.name(), ROCKSDB.name()));

/**
* The number of threads used to download files from DFS in RocksDBStateBackend.
*/
public static final ConfigOption<Integer> CHECKPOINT_RESTORE_THREAD_NUM = ConfigOptions
.key("state.backend.rocksdb.checkpoint.restore.thread.num")
.defaultValue(1)
.withDescription("The number of threads used to download files from DFS in RocksDBStateBackend.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.AbstractID;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TernaryBoolean;

import org.rocksdb.ColumnFamilyOptions;
Expand Down Expand Up @@ -97,6 +98,8 @@ public enum PriorityQueueStateType {
/** Flag whether the native library has been loaded. */
private static boolean rocksDbInitialized = false;

private static final int UNDEFINED_NUMBER_OF_RESTORING_THREADS = -1;

// ------------------------------------------------------------------------

// -- configuration values, set in the application / configuration
Expand All @@ -120,6 +123,9 @@ public enum PriorityQueueStateType {
/** This determines if incremental checkpointing is enabled. */
private final TernaryBoolean enableIncrementalCheckpointing;

/** Thread number used to download from DFS when restore, default value: 1. */
private int numberOfRestoringThreads;

/** This determines the type of priority queue state. */
private final PriorityQueueStateType priorityQueueStateType;

Expand Down Expand Up @@ -238,6 +244,7 @@ public RocksDBStateBackend(StateBackend checkpointStreamBackend) {
public RocksDBStateBackend(StateBackend checkpointStreamBackend, TernaryBoolean enableIncrementalCheckpointing) {
this.checkpointStreamBackend = checkNotNull(checkpointStreamBackend);
this.enableIncrementalCheckpointing = enableIncrementalCheckpointing;
this.numberOfRestoringThreads = UNDEFINED_NUMBER_OF_RESTORING_THREADS;
// for now, we use still the heap-based implementation as default
this.priorityQueueStateType = PriorityQueueStateType.HEAP;
this.defaultMetricOptions = new RocksDBNativeMetricOptions();
Expand Down Expand Up @@ -276,6 +283,12 @@ private RocksDBStateBackend(RocksDBStateBackend original, Configuration config)
this.enableIncrementalCheckpointing = original.enableIncrementalCheckpointing.resolveUndefined(
config.getBoolean(CheckpointingOptions.INCREMENTAL_CHECKPOINTS));

if (original.numberOfRestoringThreads == UNDEFINED_NUMBER_OF_RESTORING_THREADS) {
this.numberOfRestoringThreads = config.getInteger(RocksDBOptions.CHECKPOINT_RESTORE_THREAD_NUM);
} else {
this.numberOfRestoringThreads = original.numberOfRestoringThreads;
}

final String priorityQueueTypeString = config.getString(TIMER_SERVICE_FACTORY);

this.priorityQueueStateType = priorityQueueTypeString.length() > 0 ?
Expand Down Expand Up @@ -452,6 +465,7 @@ public <K> AbstractKeyedStateBackend<K> createKeyedStateBackend(
keyGroupRange,
env.getExecutionConfig(),
isIncrementalCheckpointsEnabled(),
getNumberOfRestoringThreads(),
localRecoveryConfig,
priorityQueueStateType,
ttlTimeProvider,
Expand Down Expand Up @@ -686,6 +700,20 @@ public RocksDBNativeMetricOptions getMemoryWatcherOptions() {
return options;
}

/**
* Gets the thread number will used for downloading files from DFS when restore.
*/
public int getNumberOfRestoringThreads() {
return numberOfRestoringThreads == UNDEFINED_NUMBER_OF_RESTORING_THREADS ?
RocksDBOptions.CHECKPOINT_RESTORE_THREAD_NUM.defaultValue() : numberOfRestoringThreads;
}

public void setNumberOfRestoringThreads(int numberOfRestoringThreads) {
Preconditions.checkArgument(numberOfRestoringThreads > 0,
"The number of threads used to download files from DFS in RocksDBStateBackend should > 0.");
this.numberOfRestoringThreads = numberOfRestoringThreads;
}

// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------
Expand All @@ -696,6 +724,7 @@ public String toString() {
"checkpointStreamBackend=" + checkpointStreamBackend +
", localRocksDbDirectories=" + Arrays.toString(localRocksDbDirectories) +
", enableIncrementalCheckpointing=" + enableIncrementalCheckpointing +
", numberOfRestoringThreads=" + numberOfRestoringThreads +
'}';
}

Expand Down
Loading

0 comments on commit 6431b7e

Please sign in to comment.