Skip to content

Commit

Permalink
[hotfix][statebackend] Reduce and simplify code for column creation i…
Browse files Browse the repository at this point in the history
…n RocksDB backend

This closes apache#7830.
  • Loading branch information
azagrebin authored and StefanRRichter committed Mar 6, 2019
1 parent 753bc10 commit 953a5ff
Show file tree
Hide file tree
Showing 9 changed files with 78 additions and 140 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,6 @@ <K, N, SV, S extends State, IS extends S> IS createState(
RocksDBKeyedStateBackend<K> backend) throws Exception;
}

/** String that identifies the operator that owns this backend. */
private final String operatorIdentifier;

/** Factory function to create column family options from state name. */
private final Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory;

Expand Down Expand Up @@ -158,9 +155,6 @@ <K, N, SV, S extends State, IS extends S> IS createState(
/** Number of bytes required to prefix the key groups. */
private final int keyGroupPrefixBytes;

/** Thread number used to transfer state files while restoring/snapshotting. */
private final int numberOfTransferingThreads;

/**
* We are not using the default column family for Flink state ops, but we still need to remember this handle so that
* we can close it properly when the backend is closed. Note that the one returned by {@link RocksDB#open(String)}
Expand Down Expand Up @@ -201,7 +195,6 @@ <K, N, SV, S extends State, IS extends S> IS createState(
private final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager;

public RocksDBKeyedStateBackend(
String operatorIdentifier,
ClassLoader userCodeClassLoader,
File instanceBasePath,
DBOptions dbOptions,
Expand All @@ -211,7 +204,6 @@ public RocksDBKeyedStateBackend(
int numberOfKeyGroups,
KeyGroupRange keyGroupRange,
ExecutionConfig executionConfig,
int numberOfTransferingThreads,
TtlTimeProvider ttlTimeProvider,
RocksDB db,
LinkedHashMap<String, RocksDbKvStateInfo> kvStateInformation,
Expand All @@ -233,10 +225,6 @@ public RocksDBKeyedStateBackend(

this.ttlCompactFiltersManager = ttlCompactFiltersManager;

this.operatorIdentifier = Preconditions.checkNotNull(operatorIdentifier);

this.numberOfTransferingThreads = numberOfTransferingThreads;

// ensure that we use the right merge operator, because other code relies on this
this.columnFamilyOptionsFactory = Preconditions.checkNotNull(columnFamilyOptionsFactory);

Expand Down Expand Up @@ -500,8 +488,8 @@ private <N, S extends State, SV, SEV> Tuple2<ColumnFamilyHandle, RegisteredKeyVa
stateSerializer,
StateSnapshotTransformFactory.noTransform());

newRocksStateInfo = RocksDBOperationUtils.createStateInfo(newMetaInfo, ttlCompactFiltersManager,
ttlTimeProvider, db, columnFamilyOptionsFactory);
newRocksStateInfo = RocksDBOperationUtils.createStateInfo(
newMetaInfo, db, columnFamilyOptionsFactory, ttlCompactFiltersManager);
RocksDBOperationUtils.registerKvStateInformation(this.kvStateInformation, this.nativeMetricMonitor,
stateDesc.getName(), newRocksStateInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
RocksDB db = null;
AbstractRocksDBRestoreOperation restoreOperation = null;
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager =
new RocksDbTtlCompactFiltersManager(enableTtlCompactionFilter);
new RocksDbTtlCompactFiltersManager(enableTtlCompactionFilter, ttlTimeProvider);

ResourceGuard rocksDBResourceGuard = new ResourceGuard();
SnapshotStrategy<K> snapshotStrategy;
Expand Down Expand Up @@ -324,7 +324,6 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
}
}
return new RocksDBKeyedStateBackend<>(
this.operatorIdentifier,
this.userCodeClassLoader,
this.instanceBasePath,
this.dbOptions,
Expand All @@ -334,7 +333,6 @@ public RocksDBKeyedStateBackend<K> build() throws BackendBuildingException {
this.numberOfKeyGroups,
this.keyGroupRange,
this.executionConfig,
this.numberOfTransferingThreads,
this.ttlTimeProvider,
db,
kvStateInformation,
Expand Down Expand Up @@ -374,8 +372,7 @@ private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(
nativeMetricOptions,
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager,
ttlTimeProvider);
ttlCompactFiltersManager);
}
KeyedStateHandle firstStateHandle = restoreStateHandles.iterator().next();
if (firstStateHandle instanceof IncrementalKeyedStateHandle) {
Expand All @@ -395,8 +392,7 @@ private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(
nativeMetricOptions,
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager,
ttlTimeProvider);
ttlCompactFiltersManager);
} else {
return new RocksDBFullRestoreOperation<>(
keyGroupRange,
Expand All @@ -413,8 +409,7 @@ private AbstractRocksDBRestoreOperation<K> getRocksDBRestoreOperation(
nativeMetricOptions,
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager,
ttlTimeProvider);
ttlCompactFiltersManager);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.contrib.streaming.state.ttl.RocksDbTtlCompactFiltersManager;
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
Expand All @@ -32,6 +31,8 @@
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

import javax.annotation.Nullable;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -79,38 +80,11 @@ public static RocksDB openDB(
return dbRef;
}

public static ColumnFamilyDescriptor createColumnFamilyDescriptor(String stateName, ColumnFamilyOptions columnOptions) {
byte[] nameBytes = stateName.getBytes(ConfigConstants.DEFAULT_CHARSET);
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
"The chosen state name 'default' collides with the name of the default column family!");

return new ColumnFamilyDescriptor(nameBytes, columnOptions);
}

public static ColumnFamilyHandle createColumnFamily(ColumnFamilyDescriptor columnDescriptor, RocksDB db) {
try {
return db.createColumnFamily(columnDescriptor);
} catch (RocksDBException e) {
IOUtils.closeQuietly(columnDescriptor.getOptions());
throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e);
}
}

public static ColumnFamilyHandle createColumnFamily(
String stateName,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
RocksDB db) {
ColumnFamilyOptions options = createColumnFamilyOptions(columnFamilyOptionsFactory, stateName);
return createColumnFamily(createColumnFamilyDescriptor(stateName, options), db);
}

public static RocksIteratorWrapper getRocksIterator(RocksDB db) {
return new RocksIteratorWrapper(db.newIterator());
}

public static RocksIteratorWrapper getRocksIterator(
RocksDB db,
ColumnFamilyHandle columnFamilyHandle) {
public static RocksIteratorWrapper getRocksIterator(RocksDB db, ColumnFamilyHandle columnFamilyHandle) {
return new RocksIteratorWrapper(db.newIterator(columnFamilyHandle));
}

Expand All @@ -119,35 +93,67 @@ public static void registerKvStateInformation(
RocksDBNativeMetricMonitor nativeMetricMonitor,
String columnFamilyName,
RocksDBKeyedStateBackend.RocksDbKvStateInfo registeredColumn) {
kvStateInformation.put(columnFamilyName, registeredColumn);

kvStateInformation.put(columnFamilyName, registeredColumn);
if (nativeMetricMonitor != null) {
nativeMetricMonitor.registerColumnFamily(columnFamilyName, registeredColumn.columnFamilyHandle);
}
}

/**
* Creates a state info from a new meta info to use with a k/v state.
*
* <p>Creates the column family for the state.
* Sets TTL compaction filter if {@code ttlCompactFiltersManager} is not {@code null}.
*/
public static RocksDBKeyedStateBackend.RocksDbKvStateInfo createStateInfo(
RegisteredStateMetaInfoBase metaInfoBase,
RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
TtlTimeProvider ttlTimeProvider,
RocksDB db,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory) {
ColumnFamilyOptions options = createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(ttlTimeProvider, metaInfoBase, options);
ColumnFamilyDescriptor columnFamilyDescriptor = createColumnFamilyDescriptor(metaInfoBase.getName(), options);
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {

ColumnFamilyDescriptor columnFamilyDescriptor = createColumnFamilyDescriptor(
metaInfoBase, columnFamilyOptionsFactory, ttlCompactFiltersManager);
return new RocksDBKeyedStateBackend.RocksDbKvStateInfo(createColumnFamily(columnFamilyDescriptor, db), metaInfoBase);
}

public static ColumnFamilyOptions createColumnFamilyOptions(
/**
* Creates a column descriptor for sate column family.
*
* <p>Sets TTL compaction filter if {@code ttlCompactFiltersManager} is not {@code null}.
*/
public static ColumnFamilyDescriptor createColumnFamilyDescriptor(
RegisteredStateMetaInfoBase metaInfoBase,
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory,
String stateName) {
@Nullable RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {

ColumnFamilyOptions options = createColumnFamilyOptions(columnFamilyOptionsFactory, metaInfoBase.getName());
if (ttlCompactFiltersManager != null) {
ttlCompactFiltersManager.setAndRegisterCompactFilterIfStateTtl(metaInfoBase, options);
}
byte[] nameBytes = metaInfoBase.getName().getBytes(ConfigConstants.DEFAULT_CHARSET);
Preconditions.checkState(!Arrays.equals(RocksDB.DEFAULT_COLUMN_FAMILY, nameBytes),
"The chosen state name 'default' collides with the name of the default column family!");

return new ColumnFamilyDescriptor(nameBytes, options);
}

public static ColumnFamilyOptions createColumnFamilyOptions(
Function<String, ColumnFamilyOptions> columnFamilyOptionsFactory, String stateName) {

// ensure that we use the right merge operator, because other code relies on this
return columnFamilyOptionsFactory.apply(stateName).setMergeOperatorName(MERGE_OPERATOR_NAME);
}

private static ColumnFamilyHandle createColumnFamily(ColumnFamilyDescriptor columnDescriptor, RocksDB db) {
try {
return db.createColumnFamily(columnDescriptor);
} catch (RocksDBException e) {
IOUtils.closeQuietly(columnDescriptor.getOptions());
throw new FlinkRuntimeException("Error creating ColumnFamilyHandle.", e);
}
}

public static void addColumnFamilyOptionsToCloseLater(
List<ColumnFamilyOptions> columnFamilyOptions, ColumnFamilyHandle columnFamilyHandle) {
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,12 +145,9 @@ private <T> RocksDBKeyedStateBackend.RocksDbKvStateInfo tryRegisterPriorityQueue
if (stateInfo == null) {
// Currently this class is for timer service and TTL feature is not applicable here,
// so no need to register compact filter when creating column family
final ColumnFamilyHandle columnFamilyHandle =
RocksDBOperationUtils.createColumnFamily(stateName, columnFamilyOptionsFactory, this.db);
RegisteredPriorityQueueStateBackendMetaInfo<T> metaInfo =
new RegisteredPriorityQueueStateBackendMetaInfo<>(stateName, byteOrderedElementSerializer);

stateInfo = new RocksDBKeyedStateBackend.RocksDbKvStateInfo(columnFamilyHandle, metaInfo);
stateInfo = RocksDBOperationUtils.createStateInfo(metaInfo, db, columnFamilyOptionsFactory, null);
RocksDBOperationUtils.registerKvStateInformation(kvStateInformation, nativeMetricMonitor, stateName, stateInfo);
} else {
// TODO we implement the simple way of supporting the current functionality, mimicking keyed state
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import org.apache.flink.runtime.state.RegisteredStateMetaInfoBase;
import org.apache.flink.runtime.state.StateSerializerProvider;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.StateMigrationException;

Expand All @@ -42,7 +41,6 @@
import org.rocksdb.ColumnFamilyOptions;
import org.rocksdb.DBOptions;
import org.rocksdb.RocksDB;
import org.rocksdb.RocksDBException;

import javax.annotation.Nonnull;

Expand Down Expand Up @@ -87,7 +85,6 @@ public abstract class AbstractRocksDBRestoreOperation<K> implements RocksDBResto
// - Full restore
// - data ingestion after db open: #getOrRegisterStateColumnFamilyHandle before creating column family
protected final RocksDbTtlCompactFiltersManager ttlCompactFiltersManager;
protected final TtlTimeProvider ttlTimeProvider;

protected RocksDB db;
protected ColumnFamilyHandle defaultColumnFamilyHandle;
Expand All @@ -109,8 +106,7 @@ protected AbstractRocksDBRestoreOperation(
RocksDBNativeMetricOptions nativeMetricOptions,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> stateHandles,
@Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
TtlTimeProvider ttlTimeProvider) {
@Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
this.keyGroupRange = keyGroupRange;
this.keyGroupPrefixBytes = keyGroupPrefixBytes;
this.numberOfTransferringThreads = numberOfTransferringThreads;
Expand All @@ -127,12 +123,11 @@ protected AbstractRocksDBRestoreOperation(
this.metricGroup = metricGroup;
this.restoreStateHandles = stateHandles;
this.ttlCompactFiltersManager = ttlCompactFiltersManager;
this.ttlTimeProvider = ttlTimeProvider;
this.columnFamilyHandles = new ArrayList<>(1);
this.columnFamilyDescriptors = Collections.emptyList();
}

public void openDB() throws IOException {
void openDB() throws IOException {
db = RocksDBOperationUtils.openDB(
dbPath,
columnFamilyDescriptors,
Expand All @@ -150,9 +145,9 @@ public RocksDB getDb() {
return this.db;
}

protected RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle(
RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle(
ColumnFamilyHandle columnFamilyHandle,
StateMetaInfoSnapshot stateMetaInfoSnapshot) throws RocksDBException {
StateMetaInfoSnapshot stateMetaInfoSnapshot) {

RocksDbKvStateInfo registeredStateMetaInfoEntry =
kvStateInformation.get(stateMetaInfoSnapshot.getName());
Expand All @@ -163,8 +158,8 @@ protected RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle(
RegisteredStateMetaInfoBase stateMetaInfo =
RegisteredStateMetaInfoBase.fromMetaInfoSnapshot(stateMetaInfoSnapshot);
if (columnFamilyHandle == null) {
registeredStateMetaInfoEntry = RocksDBOperationUtils.createStateInfo(stateMetaInfo,
ttlCompactFiltersManager, ttlTimeProvider, db, columnFamilyOptionsFactory);
registeredStateMetaInfoEntry = RocksDBOperationUtils.createStateInfo(
stateMetaInfo, db, columnFamilyOptionsFactory, ttlCompactFiltersManager);
} else {
registeredStateMetaInfoEntry = new RocksDbKvStateInfo(columnFamilyHandle, stateMetaInfo);
}
Expand All @@ -181,7 +176,7 @@ protected RocksDbKvStateInfo getOrRegisterStateColumnFamilyHandle(
return registeredStateMetaInfoEntry;
}

protected KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView)
KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView)
throws IOException, StateMigrationException {
// isSerializerPresenceRequired flag is set to false, since for the RocksDB state backend,
// deserialization of state happens lazily during runtime; we depend on the fact
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import org.apache.flink.runtime.state.StreamCompressionDecorator;
import org.apache.flink.runtime.state.UncompressedStreamCompressionDecorator;
import org.apache.flink.runtime.state.metainfo.StateMetaInfoSnapshot;
import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
import org.apache.flink.util.IOUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.StateMigrationException;
Expand Down Expand Up @@ -103,8 +102,7 @@ public RocksDBFullRestoreOperation(
RocksDBNativeMetricOptions nativeMetricOptions,
MetricGroup metricGroup,
@Nonnull Collection<KeyedStateHandle> restoreStateHandles,
@Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager,
TtlTimeProvider ttlTimeProvider) {
@Nonnull RocksDbTtlCompactFiltersManager ttlCompactFiltersManager) {
super(
keyGroupRange,
keyGroupPrefixBytes,
Expand All @@ -120,8 +118,7 @@ public RocksDBFullRestoreOperation(
nativeMetricOptions,
metricGroup,
restoreStateHandles,
ttlCompactFiltersManager,
ttlTimeProvider);
ttlCompactFiltersManager);
}

/**
Expand Down Expand Up @@ -169,7 +166,7 @@ private void restoreKeyGroupsInStateHandle()
/**
* Restore the KV-state / ColumnFamily meta data for all key-groups referenced by the current state handle.
*/
private void restoreKVStateMetaData() throws IOException, StateMigrationException, RocksDBException {
private void restoreKVStateMetaData() throws IOException, StateMigrationException {
KeyedBackendSerializationProxy<K> serializationProxy = readMetaData(currentStateHandleInView);

this.keygroupStreamCompressionDecorator = serializationProxy.isUsingKeyGroupCompression() ?
Expand Down
Loading

0 comments on commit 953a5ff

Please sign in to comment.