Skip to content

Commit

Permalink
[FLINK-10267][state] Fix arbitrary iterator access on RocksDBMapIterator
Browse files Browse the repository at this point in the history
This closes apache#6638.
  • Loading branch information
Myasuka authored and StefanRRichter committed Sep 11, 2018
1 parent 1a94c20 commit 16dc597
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;

Expand Down Expand Up @@ -2916,6 +2917,52 @@ public void testMapState() throws Exception {
backend.dispose();
}

/**
* Verify iterator of {@link MapState} supporting arbitrary access, see [FLINK-10267] to know more details.
*/
@Test
public void testMapStateIteratorArbitraryAccess() throws Exception {
MapStateDescriptor<Integer, Long> kvId = new MapStateDescriptor<>("id", Integer.class, Long.class);

AbstractKeyedStateBackend<Integer> backend = createKeyedBackend(IntSerializer.INSTANCE);

try {
MapState<Integer, Long> state = backend.getPartitionedState(VoidNamespace.INSTANCE, VoidNamespaceSerializer.INSTANCE, kvId);
backend.setCurrentKey(1);
int stateSize = 4096;
for (int i = 0; i < stateSize; i++) {
state.put(i, i * 2L);
}
Iterator<Map.Entry<Integer, Long>> iterator = state.iterator();
int iteratorCount = 0;
while (iterator.hasNext()) {
Map.Entry<Integer, Long> entry = iterator.next();
assertEquals(iteratorCount, (int) entry.getKey());
switch (ThreadLocalRandom.current().nextInt() % 3) {
case 0: // remove twice
iterator.remove();
try {
iterator.remove();
fail();
} catch (IllegalStateException e) {
// ignore expected exception
}
break;
case 1: // hasNext -> remove
iterator.hasNext();
iterator.remove();
break;
case 2: // nothing to do
break;
}
iteratorCount++;
}
assertEquals(stateSize, iteratorCount);
} finally {
backend.dispose();
}
}

/**
* Verify that {@link ValueStateDescriptor} allows {@code null} as default.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,7 @@ private abstract class RocksDBMapIterator<T> implements Iterator<T> {
* have the same prefix, hence we can stop iterating once coming across an
* entry with a different prefix.
*/
@Nonnull
private final byte[] keyPrefixBytes;

/**
Expand All @@ -508,6 +509,9 @@ private abstract class RocksDBMapIterator<T> implements Iterator<T> {

/** A in-memory cache for the entries in the rocksdb. */
private ArrayList<RocksDBMapEntry> cacheEntries = new ArrayList<>();

/** The entry pointing to the current position which is last returned by calling {@link #nextEntry()}. */
private RocksDBMapEntry currentEntry;
private int cacheIndex = 0;

private final TypeSerializer<UK> keySerializer;
Expand Down Expand Up @@ -537,12 +541,11 @@ public boolean hasNext() {

@Override
public void remove() {
if (cacheIndex == 0 || cacheIndex > cacheEntries.size()) {
if (currentEntry == null || currentEntry.deleted) {
throw new IllegalStateException("The remove operation must be called after a valid next operation.");
}

RocksDBMapEntry lastEntry = cacheEntries.get(cacheIndex - 1);
lastEntry.remove();
currentEntry.remove();
}

final RocksDBMapEntry nextEntry() {
Expand All @@ -556,10 +559,10 @@ final RocksDBMapEntry nextEntry() {
return null;
}

RocksDBMapEntry entry = cacheEntries.get(cacheIndex);
this.currentEntry = cacheEntries.get(cacheIndex);
cacheIndex++;

return entry;
return currentEntry;
}

private void loadCache() {
Expand All @@ -577,23 +580,22 @@ private void loadCache() {
try (RocksIteratorWrapper iterator = RocksDBKeyedStateBackend.getRocksIterator(db, columnFamily)) {

/*
* The iteration starts from the prefix bytes at the first loading. The cache then is
* reloaded when the next entry to return is the last one in the cache. At that time,
* we will start the iterating from the last returned entry.
*/
RocksDBMapEntry lastEntry = cacheEntries.size() == 0 ? null : cacheEntries.get(cacheEntries.size() - 1);
byte[] startBytes = (lastEntry == null ? keyPrefixBytes : lastEntry.rawKeyBytes);
* The iteration starts from the prefix bytes at the first loading. After #nextEntry() is called,
* the currentEntry points to the last returned entry, and at that time, we will start
* the iterating from currentEntry if reloading cache is needed.
*/
byte[] startBytes = (currentEntry == null ? keyPrefixBytes : currentEntry.rawKeyBytes);

cacheEntries.clear();
cacheIndex = 0;

iterator.seek(startBytes);

/*
* If the last returned entry is not deleted, it will be the first entry in the
* iterating. Skip it to avoid redundant access in such cases.
* If the entry pointing to the current position is not removed, it will be the first entry in the
* new iterating. Skip it to avoid redundant access in such cases.
*/
if (lastEntry != null && !lastEntry.deleted) {
if (currentEntry != null && !currentEntry.deleted) {
iterator.next();
}

Expand Down

0 comments on commit 16dc597

Please sign in to comment.