Skip to content

Commit

Permalink
[FLINK-8699][checkpointing] Create deep copy of state meta data to av…
Browse files Browse the repository at this point in the history
…oid concurrency problem with checkpoints
  • Loading branch information
StefanRRichter committed Feb 25, 2018
1 parent 08d0881 commit 0f27116
Showing 1 changed file with 9 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1818,6 +1818,7 @@ static class RocksDBFullSnapshotOperation<K>

private Snapshot snapshot;
private ReadOptions readOptions;
private List<Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> kvStateInformationCopy;
private List<Tuple2<RocksIterator, Integer>> kvStateIterators;

private CheckpointStreamWithResultProvider checkpointStreamWithResultProvider;
Expand All @@ -1841,7 +1842,7 @@ static class RocksDBFullSnapshotOperation<K>
*/
public void takeDBSnapShot() {
Preconditions.checkArgument(snapshot == null, "Only one ongoing snapshot allowed!");
this.kvStateIterators = new ArrayList<>(stateBackend.kvStateInformation.size());
this.kvStateInformationCopy = new ArrayList<>(stateBackend.kvStateInformation.values());
this.snapshot = stateBackend.db.getSnapshot();
}

Expand Down Expand Up @@ -1928,20 +1929,22 @@ public void releaseSnapshotResources() {
private void writeKVStateMetaData() throws IOException {

List<RegisteredKeyedBackendStateMetaInfo.Snapshot<?, ?>> metaInfoSnapshots =
new ArrayList<>(stateBackend.kvStateInformation.size());
new ArrayList<>(kvStateInformationCopy.size());

this.kvStateIterators = new ArrayList<>(kvStateInformationCopy.size());

int kvStateId = 0;
for (Map.Entry<String, Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>>> column :
stateBackend.kvStateInformation.entrySet()) {
for (Tuple2<ColumnFamilyHandle, RegisteredKeyedBackendStateMetaInfo<?, ?>> column :
kvStateInformationCopy) {

metaInfoSnapshots.add(column.getValue().f1.snapshot());
metaInfoSnapshots.add(column.f1.snapshot());

//retrieve iterator for this k/v states
readOptions = new ReadOptions();
readOptions.setSnapshot(snapshot);

kvStateIterators.add(
new Tuple2<>(stateBackend.db.newIterator(column.getValue().f0, readOptions), kvStateId));
new Tuple2<>(stateBackend.db.newIterator(column.f0, readOptions), kvStateId));

++kvStateId;
}
Expand Down

0 comments on commit 0f27116

Please sign in to comment.