Skip to content

Commit

Permalink
[FLINK-17625][table-runtime-blink] Fix ArrayIndexOutOfBoundsException…
Browse files Browse the repository at this point in the history
… in AppendOnlyTopNFunction

This closes apache#12303
  • Loading branch information
lsyldliu committed Jun 9, 2020
1 parent 87d6a76 commit 926523e
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 31 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -211,16 +211,22 @@ private void processElementWithoutRowNumber(RowData input, Collector<RowData> ou
if (buffer.getCurrentTopNum() > rankEnd) {
Map.Entry<RowData, Collection<RowData>> lastEntry = buffer.lastEntry();
RowData lastKey = lastEntry.getKey();
List<RowData> lastList = (List<RowData>) lastEntry.getValue();
Collection<RowData> lastList = lastEntry.getValue();
RowData lastElement = buffer.lastElement();
int size = lastList.size();
// remove last one
RowData lastElement = lastList.remove(lastList.size() - 1);
if (lastList.isEmpty()) {
if (size <= 1) {
buffer.removeAll(lastKey);
dataState.remove(lastKey);
} else {
dataState.put(lastKey, lastList);
buffer.removeLast();
// last element has been removed from lastList, we have to copy a new collection
// for lastList to avoid mutating state values, see CopyOnWriteStateMap,
// otherwise, the result might be corrupt.
// don't need to perform a deep copy, because RowData elements will not be updated
dataState.put(lastKey, new ArrayList<>(lastList));
}
if (input.equals(lastElement)) {
if (size == 0 || input.equals(lastElement)) {
return;
} else {
// lastElement shouldn't be null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.io.Serializable;
import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
Expand Down Expand Up @@ -94,12 +94,12 @@ public Collection<RowData> get(RowData sortKey) {
}

public void remove(RowData sortKey, RowData value) {
Collection<RowData> list = treeMap.get(sortKey);
if (list != null) {
if (list.remove(value)) {
Collection<RowData> collection = treeMap.get(sortKey);
if (collection != null) {
if (collection.remove(value)) {
currentTopNum -= 1;
}
if (list.size() == 0) {
if (collection.size() == 0) {
treeMap.remove(sortKey);
}
}
Expand All @@ -111,9 +111,9 @@ public void remove(RowData sortKey, RowData value) {
* @param sortKey key to remove
*/
void removeAll(RowData sortKey) {
Collection<RowData> list = treeMap.get(sortKey);
if (list != null) {
currentTopNum -= list.size();
Collection<RowData> collection = treeMap.get(sortKey);
if (collection != null) {
currentTopNum -= collection.size();
treeMap.remove(sortKey);
}
}
Expand All @@ -127,20 +127,47 @@ RowData removeLast() {
Map.Entry<RowData, Collection<RowData>> last = treeMap.lastEntry();
RowData lastElement = null;
if (last != null) {
Collection<RowData> list = last.getValue();
lastElement = getLastElement(list);
if (lastElement != null) {
if (list.remove(lastElement)) {
currentTopNum -= 1;
}
if (list.size() == 0) {
treeMap.remove(last.getKey());
Collection<RowData> collection = last.getValue();
if (collection != null) {
if (collection instanceof List) {
// optimization for List
List<RowData> list = (List<RowData>) collection;
if (!list.isEmpty()) {
lastElement = list.remove(list.size() - 1);
currentTopNum -= 1;
if (list.isEmpty()) {
treeMap.remove(last.getKey());
}
}
} else {
lastElement = getLastElement(collection);
if (lastElement != null) {
if (collection.remove(lastElement)) {
currentTopNum -= 1;
}
if (collection.size() == 0) {
treeMap.remove(last.getKey());
}
}
}
}
}
return lastElement;
}

/**
* Returns the last record of the last Entry in the buffer.
*/
RowData lastElement() {
Map.Entry<RowData, Collection<RowData>> last = treeMap.lastEntry();
RowData lastElement = null;
if (last != null) {
Collection<RowData> collection = last.getValue();
lastElement = getLastElement(collection);
}
return lastElement;
}

/**
* Gets record which rank is given value.
*
Expand All @@ -150,28 +177,32 @@ RowData removeLast() {
RowData getElement(int rank) {
int curRank = 0;
for (Map.Entry<RowData, Collection<RowData>> entry : treeMap.entrySet()) {
Collection<RowData> list = entry.getValue();

if (curRank + list.size() >= rank) {
for (RowData elem : list) {
Collection<RowData> collection = entry.getValue();
if (curRank + collection.size() >= rank) {
for (RowData elem : collection) {
curRank += 1;
if (curRank == rank) {
return elem;
}
}
} else {
curRank += list.size();
curRank += collection.size();
}
}
return null;
}

private RowData getLastElement(Collection<RowData> list) {
private RowData getLastElement(Collection<RowData> collection) {
RowData element = null;
if (list != null && !list.isEmpty()) {
Iterator<RowData> iter = list.iterator();
while (iter.hasNext()) {
element = iter.next();
if (collection != null && !collection.isEmpty()) {
if (collection instanceof List) {
// optimize for List
List<RowData> list = (List<RowData>) collection;
return list.get(list.size() - 1);
} else {
for (RowData data : collection) {
element = data;
}
}
}
return element;
Expand Down

0 comments on commit 926523e

Please sign in to comment.