Skip to content

Commit

Permalink
[FLINK-13469][state] Ensure resource used by StateMapSnapshot will be…
Browse files Browse the repository at this point in the history
… released if snapshot fails

This closes apache#9301
  • Loading branch information
banmoy authored and StephanEwen committed Nov 6, 2019
1 parent 051692a commit 02c4f20
Show file tree
Hide file tree
Showing 9 changed files with 156 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,13 +88,6 @@ abstract class AbstractStateTableSnapshot<K, N, S>
*/
protected abstract StateMapSnapshot<K, N, S, ? extends StateMap<K, N, S>> getStateMapSnapshotForKeyGroup(int keyGroup);

/**
* Optional hook to release resources for this snapshot at the end of its lifecycle.
*/
@Override
public void release() {
}

@Nonnull
@Override
public StateMetaInfoSnapshot getMetaInfoSnapshot() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Iterator;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
Expand Down Expand Up @@ -784,6 +785,11 @@ public void releaseSnapshot(StateMapSnapshot<K, N, S, ? extends StateMap<K, N, S
releaseSnapshot(copyOnWriteStateMapSnapshot.getSnapshotVersion());
}

@VisibleForTesting
Set<Integer> getSnapshotVersions() {
return snapshotVersions;
}

// Meta data setter / getter and toString -----------------------------------------------------

public TypeSerializer<S> getStateSerializer() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,11 @@ public class CopyOnWriteStateMapSnapshot<K, N, S>
@Nonnegative
private final int numberOfEntriesInSnapshotData;

/**
* Whether this snapshot has been released.
*/
private boolean released;

/**
* Creates a new {@link CopyOnWriteStateMapSnapshot}.
*
Expand All @@ -81,11 +86,19 @@ public class CopyOnWriteStateMapSnapshot<K, N, S>
this.snapshotData = owningStateMap.snapshotMapArrays();
this.snapshotVersion = owningStateMap.getStateMapVersion();
this.numberOfEntriesInSnapshotData = owningStateMap.size();
this.released = false;
}

@Override
public void release() {
owningStateMap.releaseSnapshot(this);
if (!released) {
owningStateMap.releaseSnapshot(this);
released = true;
}
}

public boolean isReleased() {
return released;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,4 +79,13 @@ public class CopyOnWriteStateTableSnapshot<K, N, S> extends AbstractStateTableSn

return stateMapSnapshot;
}

@Override
public void release() {
for (CopyOnWriteStateMapSnapshot snapshot : stateMapSnapshots) {
if (!snapshot.isReleased()) {
snapshot.release();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,5 +95,9 @@ static class NestedMapsStateTableSnapshot<K, N, S>

return stateMap.stateSnapshot();
}

@Override
public void release() {
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ public int getKeyGroupOffset() {
}

@VisibleForTesting
protected StateMap<K, N, S> getMapForKeyGroup(int keyGroupIndex) {
StateMap<K, N, S> getMapForKeyGroup(int keyGroupIndex) {
final int pos = indexToOffset(keyGroupIndex);
if (pos >= 0 && pos < keyGroupedStateMaps.length) {
return keyGroupedStateMaps[pos];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@

import org.junit.Assert;
import org.junit.Test;
import org.mockito.Matchers;
import org.mockito.Mockito;

import java.util.ArrayList;
import java.util.Arrays;
Expand Down Expand Up @@ -394,6 +396,30 @@ public void testCopyOnWriteContracts() {
Assert.assertTrue(originalState5 == stateMap.get(5, 1));
}

/**
* This tests that snapshot can be released correctly.
*/
@Test
public void testSnapshotRelease() {
final CopyOnWriteStateMap<Integer, Integer, Integer> stateMap =
Mockito.spy(new CopyOnWriteStateMap<>(IntSerializer.INSTANCE));

for (int i = 0; i < 10; i++) {
stateMap.put(i, i, i);
}

CopyOnWriteStateMapSnapshot<Integer, Integer, Integer> snapshot = stateMap.stateSnapshot();
Assert.assertFalse(snapshot.isReleased());

snapshot.release();
Assert.assertTrue(snapshot.isReleased());
Mockito.verify(stateMap, Mockito.times(1)).releaseSnapshot(Matchers.same(snapshot));

// verify that snapshot will release itself only once
snapshot.release();
Mockito.verify(stateMap, Mockito.times(1)).releaseSnapshot(Matchers.same(snapshot));
}

@SuppressWarnings("unchecked")
private static <K, N, S> Tuple3<K, N, S>[] convert(CopyOnWriteStateMap.StateMapEntry<K, N, S>[] snapshot, int mapSize) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,19 @@
package org.apache.flink.runtime.state.heap;

import org.apache.flink.api.common.state.StateDescriptor;
import org.apache.flink.api.common.typeutils.base.FloatSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.core.memory.ByteArrayOutputStreamWithPos;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.DataOutputViewStreamWrapper;
import org.apache.flink.runtime.state.RegisteredKeyValueStateBackendMetaInfo;
import org.apache.flink.runtime.state.StateSnapshot;

import org.junit.Assert;
import org.junit.Test;

import java.io.IOException;
import java.util.concurrent.ThreadLocalRandom;

/**
* Test for {@link CopyOnWriteStateTable}.
Expand Down Expand Up @@ -71,4 +76,87 @@ public void testSerializerDuplicationInSnapshot() throws IOException {
new DataOutputViewStreamWrapper(
new ByteArrayOutputStreamWithPos(1024)), 0);
}

/**
* This tests that resource can be released for a successful snapshot.
*/
@Test
public void testReleaseForSuccessfulSnapshot() throws IOException {
int numberOfKeyGroups = 10;
CopyOnWriteStateTable<Integer, Integer, Float> table = createStateTableForSnapshotRelease(numberOfKeyGroups);

ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos = new ByteArrayOutputStreamWithPos();
DataOutputView dataOutputView = new DataOutputViewStreamWrapper(byteArrayOutputStreamWithPos);

CopyOnWriteStateTableSnapshot<Integer, Integer, Float> snapshot = table.stateSnapshot();
for (int group = 0; group < numberOfKeyGroups; group++) {
snapshot.writeStateInKeyGroup(dataOutputView, group);
// resource used by one key group should be released after the snapshot is successful
Assert.assertTrue(isResourceReleasedForKeyGroup(table, group));
}
snapshot.release();
verifyResourceIsReleasedForAllKeyGroup(table, 1);
}

/**
* This tests that resource can be released for a failed snapshot.
*/
@Test
public void testReleaseForFailedSnapshot() throws IOException {
int numberOfKeyGroups = 10;
CopyOnWriteStateTable<Integer, Integer, Float> table = createStateTableForSnapshotRelease(numberOfKeyGroups);

ByteArrayOutputStreamWithPos byteArrayOutputStreamWithPos = new ByteArrayOutputStreamWithPos();
DataOutputView dataOutputView = new DataOutputViewStreamWrapper(byteArrayOutputStreamWithPos);

CopyOnWriteStateTableSnapshot<Integer, Integer, Float> snapshot = table.stateSnapshot();
// only snapshot part of key groups to simulate a failed snapshot
for (int group = 0; group < numberOfKeyGroups / 2; group++) {
snapshot.writeStateInKeyGroup(dataOutputView, group);
Assert.assertTrue(isResourceReleasedForKeyGroup(table, group));
}
for (int group = numberOfKeyGroups / 2; group < numberOfKeyGroups; group++) {
Assert.assertFalse(isResourceReleasedForKeyGroup(table, group));
}
snapshot.release();
verifyResourceIsReleasedForAllKeyGroup(table, 2);
}

private CopyOnWriteStateTable<Integer, Integer, Float> createStateTableForSnapshotRelease(int numberOfKeyGroups) {
RegisteredKeyValueStateBackendMetaInfo<Integer, Float> metaInfo =
new RegisteredKeyValueStateBackendMetaInfo<>(
StateDescriptor.Type.VALUE,
"test",
IntSerializer.INSTANCE,
FloatSerializer.INSTANCE);

MockInternalKeyContext<Integer> mockKeyContext =
new MockInternalKeyContext<>(0, numberOfKeyGroups - 1, numberOfKeyGroups);
CopyOnWriteStateTable<Integer, Integer, Float> table =
new CopyOnWriteStateTable<>(mockKeyContext, metaInfo, IntSerializer.INSTANCE);

ThreadLocalRandom random = ThreadLocalRandom.current();
for (int i = 0; i < 1000; i++) {
mockKeyContext.setCurrentKeyAndKeyGroup(i);
table.put(random.nextInt(), random.nextFloat());
}

return table;
}

private void verifyResourceIsReleasedForAllKeyGroup(
CopyOnWriteStateTable table,
int snapshotVersion) {
StateMap[] stateMaps = table.getState();
for (StateMap map : stateMaps) {
Assert.assertFalse(((CopyOnWriteStateMap) map).getSnapshotVersions().contains(snapshotVersion));
}
}

private boolean isResourceReleasedForKeyGroup(
CopyOnWriteStateTable table,
int keyGroup) {
CopyOnWriteStateMap stateMap = (CopyOnWriteStateMap) table.getMapForKeyGroup(keyGroup);
return !stateMap.getSnapshotVersions().contains(1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,23 @@
package org.apache.flink.runtime.state.heap;

import org.apache.flink.runtime.state.KeyGroupRange;
import org.apache.flink.runtime.state.KeyGroupRangeAssignment;

/**
* Mock {@link InternalKeyContext}.
*/
public class MockInternalKeyContext<K> extends InternalKeyContextImpl<K> {

MockInternalKeyContext() {
super(new KeyGroupRange(0, 0), 1);
}

@Override
public void setCurrentKey(K key) {
MockInternalKeyContext(int startKeyGroup, int endKeyGroup, int numberOfKeyGroups) {
super(new KeyGroupRange(startKeyGroup, endKeyGroup), numberOfKeyGroups);
}

public void setCurrentKeyAndKeyGroup(K key) {
super.setCurrentKey(key);
super.setCurrentKeyGroupIndex(0);
super.setCurrentKeyGroupIndex(KeyGroupRangeAssignment.assignToKeyGroup(key, getNumberOfKeyGroups()));
}
}

0 comments on commit 02c4f20

Please sign in to comment.