Skip to content

Commit

Permalink
[FLINK-13216][FLINK-13153][table-planner-blink] Fix Max_Retract and M…
Browse files Browse the repository at this point in the history
…in_Retract may produce incorrect result

The reason is we didn't set null to acc.max/min which may have an old value when we need to get a new max/min from an empty MapView.
And the old value will be output to downstream instead of a null value. This influences the final result.

This causes many unstable cases, including:
- AggregateITCase.testNestedGroupByAgg
- SplitAggregateITCase.testMinMaxWithRetraction

This closes apache#9120
  • Loading branch information
wuchong committed Jul 16, 2019
1 parent a7bdbb9 commit 1bede7e
Show file tree
Hide file tree
Showing 3 changed files with 50 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,16 @@ private void updateMax(MaxWithRetractAccumulator<T> acc) throws Exception {
// when both of them are expired.
if (!hasMax) {
acc.mapSize = 0L;
// we should also override max value, because it may have an old value.
acc.max = null;
}
}

public void merge(MaxWithRetractAccumulator<T> acc, Iterable<MaxWithRetractAccumulator<T>> its) throws Exception {
boolean needUpdateMax = false;
for (MaxWithRetractAccumulator<T> a : its) {
// set max element
if (acc.mapSize == 0 || (a.max != null && acc.max.compareTo(a.max) < 0)) {
if (acc.mapSize == 0 || (a.mapSize > 0 && a.max != null && acc.max.compareTo(a.max) < 0)) {
acc.max = a.max;
}
// merge the count for each key
Expand Down Expand Up @@ -195,7 +197,7 @@ public void resetAccumulator(MaxWithRetractAccumulator<T> acc) {

@Override
public T getValue(MaxWithRetractAccumulator<T> acc) {
if (acc.mapSize != 0) {
if (acc.mapSize > 0) {
return acc.max;
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,14 +135,16 @@ private void updateMin(MinWithRetractAccumulator<T> acc) throws Exception {
// when both of them are expired.
if (!hasMin) {
acc.mapSize = 0L;
// we should also override min value, because it may have an old value.
acc.min = null;
}
}

public void merge(MinWithRetractAccumulator<T> acc, Iterable<MinWithRetractAccumulator<T>> its) throws Exception {
boolean needUpdateMin = false;
for (MinWithRetractAccumulator<T> a : its) {
// set min element
if (acc.mapSize == 0 || (a.min != null && acc.min.compareTo(a.min) > 0)) {
if (acc.mapSize == 0 || (a.mapSize > 0 && a.min != null && acc.min.compareTo(a.min) > 0)) {
acc.min = a.min;
}
// merge the count for each key
Expand Down Expand Up @@ -195,7 +197,7 @@ public void resetAccumulator(MinWithRetractAccumulator<T> acc) {

@Override
public T getValue(MinWithRetractAccumulator<T> acc) {
if (acc.mapSize != 0) {
if (acc.mapSize > 0) {
return acc.min;
} else {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -142,6 +143,47 @@ public void testAggregateWithMerge() throws NoSuchMethodException, InvocationTar
}
}

@Test
public void testMergeReservedAccumulator() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
AggregateFunction<T, ACC> aggregator = getAggregator();
boolean hasMerge = UserDefinedFunctionUtils.ifMethodExistInFunction("merge", aggregator);
boolean hasRetract = UserDefinedFunctionUtils.ifMethodExistInFunction("retract", aggregator);
if (!hasMerge || !hasRetract) {
// this test only verify AggregateFunctions which has merge() and retract() method
return;
}

Method mergeFunc = aggregator.getClass().getMethod("merge", getAccClass(), Iterable.class);
List<List<T>> inputValueSets = getInputValueSets();
int size = getInputValueSets().size();

// iterate over input sets
for (int i = 0; i < size; ++i) {
List<T> inputValues = inputValueSets.get(i);
List<ACC> accumulators = new ArrayList<>();
List<ACC> reversedAccumulators = new ArrayList<>();
// prepare accumulators
accumulators.add(accumulateValues(inputValues));
// prepare reversed accumulators
ACC retractedAcc = aggregator.createAccumulator();
retractValues(retractedAcc, inputValues);
reversedAccumulators.add(retractedAcc);
// prepare accumulator only contain two elements
ACC accWithSubset = accumulateValues(inputValues.subList(0, 2));
T expectedValue = aggregator.getValue(accWithSubset);

// merge
ACC acc = aggregator.createAccumulator();
mergeFunc.invoke(aggregator, acc, accumulators);
mergeFunc.invoke(aggregator, acc, reversedAccumulators);
mergeFunc.invoke(aggregator, accWithSubset, Collections.singleton(acc));

// getValue
T result = aggregator.getValue(accWithSubset);
validateResult(expectedValue, result);
}
}

@Test
public void testResetAccumulator() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
AggregateFunction<T, ACC> aggregator = getAggregator();
Expand Down

0 comments on commit 1bede7e

Please sign in to comment.