Skip to content

Commit

Permalink
[FLINK-11280] [rocksdb] Lazily create RocksDBSerializedCompositeKeyBu…
Browse files Browse the repository at this point in the history
…ilder only after restore

Prior to this commit, the composite key builder was created in the
constructor of the RocksDBKeyedStateBackend. The creation of the builder
requires providing a key serializer.

This is problematic, because the key serializer may be reconfigured
during the restore phase, therefore invalidating the key serializer used
by the composite key builder.

This commit resolves this by lazily creating the composite key builder
only after the restore phase, which would be the point-in-time when we
are certain the key serializer will no longer be changed and is final.
  • Loading branch information
tzulitai committed Jan 8, 2019
1 parent d77151c commit 7a1147b
Showing 1 changed file with 16 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,14 @@ <K, N, SV, S extends State, IS extends S> IS createState(
/** The native metrics monitor. */
private RocksDBNativeMetricMonitor nativeMetricMonitor;

/** Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.*/
private final RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;
/**
* Helper to build the byte arrays of composite keys to address data in RocksDB. Shared across all states.
*
* <p>We create the builder after the restore phase in the {@link #restore(Object)} method. The timing of
* the creation is important, because only after the restore we are certain that the key serializer
* is final after potential reconfigurations during the restore.
*/
private RocksDBSerializedCompositeKeyBuilder<K> sharedRocksKeyBuilder;

public RocksDBKeyedStateBackend(
String operatorIdentifier,
Expand Down Expand Up @@ -297,7 +303,6 @@ public RocksDBKeyedStateBackend(
this.kvStateInformation = new LinkedHashMap<>();

this.writeOptions = new WriteOptions().setDisableWAL(true);
this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(keySerializer, keyGroupPrefixBytes, 32);

this.metricOptions = metricOptions;
this.metricGroup = metricGroup;
Expand Down Expand Up @@ -535,6 +540,14 @@ public void restore(Collection<KeyedStateHandle> restoreState) throws Exception
}
}

// it is important that we only create the key builder after the restore, and not before;
// restore operations may reconfigure the key serializer, so accessing the key serializer
// only now we can be certain that the key serializer used in the builder is final.
this.sharedRocksKeyBuilder = new RocksDBSerializedCompositeKeyBuilder<>(
getKeySerializer(),
keyGroupPrefixBytes,
32);

initializeSnapshotStrategy(incrementalRestoreOperation);
} catch (Exception ex) {
dispose();
Expand Down

0 comments on commit 7a1147b

Please sign in to comment.