Skip to content

Commit

Permalink
[FLINK-19748] [test] Adjust raw keyed state test to only write some k…
Browse files Browse the repository at this point in the history
…ey groups

This closes apache#13772..
  • Loading branch information
tzulitai committed Nov 7, 2020
1 parent c775358 commit 3367c23
Showing 1 changed file with 13 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
import org.apache.flink.streaming.util.KeyedOneInputStreamOperatorTestHarness;
import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
import org.apache.flink.util.Preconditions;

import org.hamcrest.Description;
import org.hamcrest.Matcher;
Expand Down Expand Up @@ -443,10 +444,11 @@ public void testCustomRawKeyedStateSnapshotAndRestore() throws Exception {
final int maxParallelism = 10;
final int numSubtasks = 1;
final int subtaskIndex = 0;
final KeyGroupRange keyGroupRange = KeyGroupRange.of(0, maxParallelism - 1);
final List<Integer> keyGroupsToWrite = Arrays.asList(2, 3, 8);

final byte[] testSnapshotData = "TEST".getBytes();
final CustomRawKeyedStateTestOperator testOperator = new CustomRawKeyedStateTestOperator(testSnapshotData);
final CustomRawKeyedStateTestOperator testOperator =
new CustomRawKeyedStateTestOperator(testSnapshotData, keyGroupsToWrite);

// snapshot and then restore
OperatorSubtaskState snapshot;
Expand Down Expand Up @@ -474,7 +476,7 @@ public void testCustomRawKeyedStateSnapshotAndRestore() throws Exception {
testHarness.open();
}

assertThat(testOperator.restoredRawKeyedState, hasRestoredKeyGroupsWith(testSnapshotData, keyGroupRange));
assertThat(testOperator.restoredRawKeyedState, hasRestoredKeyGroupsWith(testSnapshotData, keyGroupsToWrite));
}

/**
Expand Down Expand Up @@ -578,11 +580,13 @@ private static class CustomRawKeyedStateTestOperator
private static final long serialVersionUID = 1L;

private final byte[] snapshotBytes;
private final List<Integer> keyGroupsToWrite;

private Map<Integer, byte[]> restoredRawKeyedState;

CustomRawKeyedStateTestOperator(byte[] snapshotBytes) {
CustomRawKeyedStateTestOperator(byte[] snapshotBytes, List<Integer> keyGroupsToWrite) {
this.snapshotBytes = Arrays.copyOf(snapshotBytes, snapshotBytes.length);
this.keyGroupsToWrite = Preconditions.checkNotNull(keyGroupsToWrite);
}

@Override
Expand All @@ -599,7 +603,7 @@ protected boolean isUsingCustomRawKeyedState() {
public void snapshotState(StateSnapshotContext context) throws Exception {
super.snapshotState(context);
KeyedStateCheckpointOutputStream rawKeyedStateStream = context.getRawKeyedOperatorStateOutput();
for (int keyGroupId : rawKeyedStateStream.getKeyGroupList()) {
for (int keyGroupId : keyGroupsToWrite) {
rawKeyedStateStream.startNewKeyGroup(keyGroupId);
rawKeyedStateStream.write(snapshotBytes);
}
Expand Down Expand Up @@ -628,15 +632,15 @@ private static int getKeyInKeyGroupRange(KeyGroupRange range, int maxParallelism
return result;
}

private static Matcher<Map<Integer, byte[]>> hasRestoredKeyGroupsWith(byte[] testSnapshotData, KeyGroupRange range) {
private static Matcher<Map<Integer, byte[]>> hasRestoredKeyGroupsWith(byte[] testSnapshotData, List<Integer> writtenKeyGroups) {
return new TypeSafeMatcher<Map<Integer, byte[]>>() {
@Override
protected boolean matchesSafely(Map<Integer, byte[]> restored) {
if (restored.size() != range.getNumberOfKeyGroups()) {
if (restored.size() != writtenKeyGroups.size()) {
return false;
}

for (int writtenKeyGroupId : range) {
for (int writtenKeyGroupId : writtenKeyGroups) {
if (!Arrays.equals(restored.get(writtenKeyGroupId), testSnapshotData)) {
return false;
}
Expand All @@ -647,7 +651,7 @@ protected boolean matchesSafely(Map<Integer, byte[]> restored) {

@Override
public void describeTo(Description description) {
description.appendText("Key groups: " + range + " with snapshot data " + Arrays.toString(testSnapshotData));
description.appendText("Key groups: " + writtenKeyGroups + " with snapshot data " + Arrays.toString(testSnapshotData));
}
};
}
Expand Down

0 comments on commit 3367c23

Please sign in to comment.