Skip to content

Commit

Permalink
[FLINK-9804][state] Fix KeyedStateBackend.getKeys() for RocksDBMapState.
Browse files Browse the repository at this point in the history
This closes apache#6306.
  • Loading branch information
sihuazhou committed Jul 12, 2018
1 parent f1ac0f2 commit def2aed
Show file tree
Hide file tree
Showing 2 changed files with 85 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3840,6 +3840,69 @@ public String fold(String acc, Integer value) throws Exception {
}
}

@Test
public void testMapStateGetKeys() throws Exception {
final int namespace1ElementsNum = 1000;
final int namespace2ElementsNum = 1000;
String fieldName = "get-keys-test";
AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);
try {
final String ns1 = "ns1";
MapState<String, Integer> keyedState1 = backend.getPartitionedState(
ns1,
StringSerializer.INSTANCE,
new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
);

for (int key = 0; key < namespace1ElementsNum; key++) {
backend.setCurrentKey(key);
keyedState1.put("he", key * 2);
keyedState1.put("ho", key * 2);
}

final String ns2 = "ns2";
MapState<String, Integer> keyedState2 = backend.getPartitionedState(
ns2,
StringSerializer.INSTANCE,
new MapStateDescriptor<>(fieldName, StringSerializer.INSTANCE, IntSerializer.INSTANCE)
);

for (int key = namespace1ElementsNum; key < namespace1ElementsNum + namespace2ElementsNum; key++) {
backend.setCurrentKey(key);
keyedState2.put("he", key * 2);
keyedState2.put("ho", key * 2);
}

// valid for namespace1
try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns1).sorted()) {
PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();

for (int expectedKey = 0; expectedKey < namespace1ElementsNum; expectedKey++) {
assertTrue(actualIterator.hasNext());
assertEquals(expectedKey, actualIterator.nextInt());
}

assertFalse(actualIterator.hasNext());
}

// valid for namespace2
try (Stream<Integer> keysStream = backend.getKeys(fieldName, ns2).sorted()) {
PrimitiveIterator.OfInt actualIterator = keysStream.mapToInt(value -> value.intValue()).iterator();

for (int expectedKey = namespace1ElementsNum; expectedKey < namespace1ElementsNum + namespace2ElementsNum; expectedKey++) {
assertTrue(actualIterator.hasNext());
assertEquals(expectedKey, actualIterator.nextInt());
}

assertFalse(actualIterator.hasNext());
}
}
finally {
IOUtils.closeQuietly(backend);
backend.dispose();
}
}

@Test
public void testCheckConcurrencyProblemWhenPerformingCheckpointAsync() throws Exception {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1641,6 +1641,7 @@ static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseabl
private final byte[] namespaceBytes;
private final boolean ambiguousKeyPossible;
private K nextKey;
private K previousKey;

RocksIteratorForKeysWrapper(
RocksIteratorWrapper iterator,
Expand All @@ -1655,6 +1656,7 @@ static class RocksIteratorForKeysWrapper<K> implements Iterator<K>, AutoCloseabl
this.keyGroupPrefixBytes = Preconditions.checkNotNull(keyGroupPrefixBytes);
this.namespaceBytes = Preconditions.checkNotNull(namespaceBytes);
this.nextKey = null;
this.previousKey = null;
this.ambiguousKeyPossible = ambiguousKeyPossible;
}

Expand All @@ -1664,15 +1666,22 @@ public boolean hasNext() {
while (nextKey == null && iterator.isValid()) {

byte[] key = iterator.key();
if (isMatchingNameSpace(key)) {
ByteArrayInputStreamWithPos inputStream =
new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);
DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);
K value = RocksDBKeySerializationUtils.readKey(
keySerializer,
inputStream,
dataInput,
ambiguousKeyPossible);

ByteArrayInputStreamWithPos inputStream =
new ByteArrayInputStreamWithPos(key, keyGroupPrefixBytes, key.length - keyGroupPrefixBytes);

DataInputViewStreamWrapper dataInput = new DataInputViewStreamWrapper(inputStream);

K value = RocksDBKeySerializationUtils.readKey(
keySerializer,
inputStream,
dataInput,
ambiguousKeyPossible);

int namespaceByteStartPos = inputStream.getPosition();

if (isMatchingNameSpace(key, namespaceByteStartPos) && !Objects.equals(previousKey, value)) {
previousKey = value;
nextKey = value;
}
iterator.next();
Expand All @@ -1694,12 +1703,12 @@ public K next() {
return tmpKey;
}

private boolean isMatchingNameSpace(@Nonnull byte[] key) {
private boolean isMatchingNameSpace(@Nonnull byte[] key, int beginPos) {
final int namespaceBytesLength = namespaceBytes.length;
final int basicLength = namespaceBytesLength + keyGroupPrefixBytes;
final int basicLength = namespaceBytesLength + beginPos;
if (key.length >= basicLength) {
for (int i = 1; i <= namespaceBytesLength; ++i) {
if (key[key.length - i] != namespaceBytes[namespaceBytesLength - i]) {
for (int i = 0; i < namespaceBytesLength; ++i) {
if (key[beginPos + i] != namespaceBytes[i]) {
return false;
}
}
Expand Down

0 comments on commit def2aed

Please sign in to comment.