Skip to content

Commit

Permalink
[refactor] Remove AbstractRocksDBRestoreOperation
Browse files Browse the repository at this point in the history
So far both the RocksFullSnapshotRestoreOperation and
RocksIncrementalRestoreOperation extended from
AbstractRocksDBRestoreOperation in order to share some functions.
However it required e.g. unnecessary parameters to be passed just to
fulfill the requirements of the base class. Moreover a base class makes
it harder to extend classes independently.

This commit changes sharing the common code to use composition instead
of inheritance.
  • Loading branch information
dawidwys committed Feb 24, 2021
1 parent 7f3aa39 commit 3ed5c1a
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.contrib.streaming.state.restore.AbstractRocksDBRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBFullRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBIncrementalRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBNoneRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreOperation;
import org.apache.flink.contrib.streaming.state.restore.RocksDBRestoreResult;
import org.apache.flink.contrib.streaming.state.snapshot.RocksDBSnapshotStrategyBase;
import org.apache.flink.contrib.streaming.state.snapshot.RocksFullSnapshotStrategy;
Expand Down Expand Up @@ -250,7 +250,7 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation =
new LinkedHashMap<>();
RocksDB db = null;
AbstractRocksDBRestoreOperation restoreOperation = null;
RocksDBRestoreOperation restoreOperation = null;
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
new RocksDbTtlCompactFiltersManager(ttlTimeProvider);

Expand Down Expand Up @@ -393,28 +393,20 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
writeBatchSize);
}

private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(
private RocksDBRestoreOperation getRocksDBRestoreOperation(
int keyGroupPrefixBytes,
CloseableRegistry cancelStreamRegistry,
LinkedHashMap<String, RocksDBKeyedStateBackend.RocksDbKvStateInfo> kvStateInformation,
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
DBOptions dbOptions = optionsContainer.getDbOptions();
if (restoreStateHandles.isEmpty()) {
return new RocksDBNoneRestoreOperation<>(
keyGroupRange,
keyGroupPrefixBytes,
numberOfTransferingThreads,
cancelStreamRegistry,
userCodeClassLoader,
kvStateInformation,
keySerializerProvider,
instanceBasePath,
instanceRocksDBPath,
dbOptions,
columnFamilyOptionsFactory,
nativeMetricOptions,
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager,
optionsContainer.getWriteBufferManagerCapacity());
}
Expand Down Expand Up @@ -442,13 +434,9 @@ private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(
} else {
return new RocksDBFullRestoreOperation<>(
keyGroupRange,
keyGroupPrefixBytes,
numberOfTransferingThreads,
cancelStreamRegistry,
userCodeClassLoader,
kvStateInformation,
keySerializerProvider,
instanceBasePath,
instanceRocksDBPath,
dbOptions,
columnFamilyOptionsFactory,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricOptions;
import org.apache.flink.contrib.streaming.state.RocksDBWriteBatchWrapper;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyedStateHandle;
Expand Down Expand Up @@ -51,23 +50,19 @@
import java.util.function.Function;
import java.util.stream.Collectors;

import static org.apache.flink.util.Preconditions.checkArgument;

/** Encapsulates the process of restoring a RocksDB instance from a full snapshot. */
public class RocksDBFullRestoreOperation<K> extends AbstractRocksDBRestoreOperation<K> {
public class RocksDBFullRestoreOperation<K> implements RocksDBRestoreOperation {
private final FullSnapshotRestoreOperation<K> savepointRestoreOperation;
/** Write batch size used in {@link RocksDBWriteBatchWrapper}. */
private final long writeBatchSize;

private final RocksDBHandle rocksHandle;

public RocksDBFullRestoreOperation(
KeyGroupRange keyGroupRange,
int keyGroupPrefixBytes,
int numberOfTransferringThreads,
CloseableRegistry cancelStreamRegistry,
ClassLoader userCodeClassLoader,
Map<String, RocksDbKvStateInfo> kvStateInformation,
StateSerializerProvider<K> keySerializerProvider,
File instanceBasePath,
File instanceRocksDBPath,
DBOptions dbOptions,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
Expand All @@ -77,25 +72,17 @@ public RocksDBFullRestoreOperation(
@Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
@Nonnegative long writeBatchSize,
Long writeBufferManagerCapacity) {
super(
keyGroupRange,
keyGroupPrefixBytes,
numberOfTransferringThreads,
cancelStreamRegistry,
userCodeClassLoader,
kvStateInformation,
keySerializerProvider,
instanceBasePath,
instanceRocksDBPath,
dbOptions,
columnFamilyOptionsFactory,
nativeMetricOptions,
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager,
writeBufferManagerCapacity);
checkArgument(writeBatchSize >= 0, "Write batch size have to be no negative.");
this.writeBatchSize = writeBatchSize;
this.rocksHandle =
new RocksDBHandle(
kvStateInformation,
instanceRocksDBPath,
dbOptions,
columnFamilyOptionsFactory,
nativeMetricOptions,
metricGroup,
ttlCompactFiltersManager,
writeBufferManagerCapacity);
this.savepointRestoreOperation =
new FullSnapshotRestoreOperation<>(
keyGroupRange,
Expand All @@ -108,15 +95,20 @@ public RocksDBFullRestoreOperation(
@Override
public RocksDBRestoreResult restore()
throws IOException, StateMigrationException, RocksDBException {
openDB();
rocksHandle.openDB();
try (ThrowingIterator<SavepointRestoreResult> restore =
savepointRestoreOperation.restore()) {
while (restore.hasNext()) {
applyRestoreResult(restore.next());
}
}
return new RocksDBRestoreResult(
this.db, defaultColumnFamilyHandle, nativeMetricMonitor, -1, null, null);
this.rocksHandle.getDb(),
this.rocksHandle.getDefaultColumnFamilyHandle(),
this.rocksHandle.getNativeMetricMonitor(),
-1,
null,
null);
}

private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult)
Expand All @@ -128,7 +120,7 @@ private void applyRestoreResult(SavepointRestoreResult savepointRestoreResult)
.map(
stateMetaInfoSnapshot -> {
RocksDbKvStateInfo registeredStateCFHandle =
getOrRegisterStateColumnFamilyHandle(
this.rocksHandle.getOrRegisterStateColumnFamilyHandle(
null, stateMetaInfoSnapshot);
return registeredStateCFHandle.columnFamilyHandle;
})
Expand All @@ -147,7 +139,7 @@ private void restoreKVStateData(
throws IOException, RocksDBException, StateMigrationException {
// for all key-groups in the current state handle...
try (RocksDBWriteBatchWrapper writeBatchWrapper =
new RocksDBWriteBatchWrapper(db, writeBatchSize)) {
new RocksDBWriteBatchWrapper(this.rocksHandle.getDb(), writeBatchSize)) {
while (keyGroups.hasNext()) {
KeyGroup keyGroup = keyGroups.next();
try (ThrowingIterator<KeyGroupEntry> groupEntries = keyGroup.getKeyGroupEntries()) {
Expand All @@ -160,4 +152,9 @@ private void restoreKVStateData(
}
}
}

@Override
public void close() throws Exception {
this.rocksHandle.close();
}
}
Loading

0 comments on commit 3ed5c1a

Please sign in to comment.