From 7709a3a6906d015e82c8d3a3fce30a5a90da5167 Mon Sep 17 00:00:00 2001 From: Gyula Fora Date: Tue, 21 Oct 2014 22:30:12 +0200 Subject: [PATCH] [FLINK-1188] [streaming] Updated aggregations to work also on arrays by default --- .../api/datastream/BatchedDataStream.java | 12 ++-- .../streaming/api/datastream/DataStream.java | 37 ++++++++-- .../aggregation/AggregationFunction.java | 12 +++- .../ComparableAggregationFunction.java | 29 ++++++-- .../aggregation/MaxAggregationFunction.java | 6 +- .../aggregation/MaxByAggregationFunction.java | 6 +- .../aggregation/MinAggregationFunction.java | 6 +- .../aggregation/MinByAggregationFunction.java | 19 ++++- .../aggregation/SumAggregationFunction.java | 69 +++++++++++-------- .../api/AggregationFunctionTest.java | 34 +++++---- 10 files changed, 160 insertions(+), 70 deletions(-) diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java index dbf436d8a8f54..75eadcffc2c63 100755 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/BatchedDataStream.java @@ -137,7 +137,7 @@ public BatchedDataStream groupBy(int keyPosition) { public SingleOutputStreamOperator sum(int positionToSum) { dataStream.checkFieldRange(positionToSum); return aggregate((AggregationFunction) SumAggregationFunction.getSumFunction( - positionToSum, dataStream.getClassAtPos(positionToSum))); + positionToSum, dataStream.getClassAtPos(positionToSum), dataStream.getOutputType())); } /** @@ -159,7 +159,7 @@ public BatchedDataStream groupBy(int keyPosition) { */ public SingleOutputStreamOperator min(int positionToMin) { dataStream.checkFieldRange(positionToMin); - return aggregate(new MinAggregationFunction(positionToMin)); + return aggregate(new MinAggregationFunction(positionToMin, dataStream.getOutputType())); } /** @@ -191,7 +191,8 @@ public BatchedDataStream groupBy(int keyPosition) { */ public SingleOutputStreamOperator minBy(int positionToMinBy, boolean first) { dataStream.checkFieldRange(positionToMinBy); - return aggregate(new MinByAggregationFunction(positionToMinBy, first)); + return aggregate(new MinByAggregationFunction(positionToMinBy, first, + dataStream.getOutputType())); } /** @@ -213,7 +214,7 @@ public BatchedDataStream groupBy(int keyPosition) { */ public SingleOutputStreamOperator max(int positionToMax) { dataStream.checkFieldRange(positionToMax); - return aggregate(new MaxAggregationFunction(positionToMax)); + return aggregate(new MaxAggregationFunction(positionToMax, dataStream.getOutputType())); } /** @@ -244,7 +245,8 @@ public BatchedDataStream groupBy(int keyPosition) { */ public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) { dataStream.checkFieldRange(positionToMaxBy); - return aggregate(new MaxByAggregationFunction(positionToMaxBy, first)); + return aggregate(new MaxByAggregationFunction(positionToMaxBy, first, + dataStream.getOutputType())); } /** diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index 36649cc958978..98058dfc89f49 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -32,6 +32,8 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichReduceFunction; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.api.java.tuple.Tuple; @@ -201,6 +203,31 @@ protected Class getClassAtPos(int pos) { TypeInformation outTypeInfo = outTypeWrapper.getTypeInfo(); if (outTypeInfo.isTupleType()) { type = ((TupleTypeInfo) outTypeInfo).getTypeAt(pos).getTypeClass(); + + } else if (outTypeInfo instanceof BasicArrayTypeInfo) { + + type = ((BasicArrayTypeInfo) outTypeInfo).getComponentTypeClass(); + + } else if (outTypeInfo instanceof PrimitiveArrayTypeInfo) { + Class clazz = outTypeInfo.getTypeClass(); + if (clazz == boolean[].class) { + type = Boolean.class; + } else if (clazz == short[].class) { + type = Short.class; + } else if (clazz == int[].class) { + type = Integer.class; + } else if (clazz == long[].class) { + type = Long.class; + } else if (clazz == float[].class) { + type = Float.class; + } else if (clazz == double[].class) { + type = Double.class; + } else if (clazz == char[].class) { + type = Character.class; + } else { + throw new IndexOutOfBoundsException("Type could not be determined for array"); + } + } else if (pos == 0) { type = outTypeInfo.getTypeClass(); } else { @@ -594,7 +621,7 @@ public WindowDataStream window(long windowSize) { public SingleOutputStreamOperator sum(int positionToSum) { checkFieldRange(positionToSum); return aggregate((AggregationFunction) SumAggregationFunction.getSumFunction( - positionToSum, getClassAtPos(positionToSum))); + positionToSum, getClassAtPos(positionToSum), getOutputType())); } /** @@ -616,7 +643,7 @@ public WindowDataStream window(long windowSize) { */ public SingleOutputStreamOperator min(int positionToMin) { checkFieldRange(positionToMin); - return aggregate(new MinAggregationFunction(positionToMin)); + return aggregate(new MinAggregationFunction(positionToMin, getOutputType())); } /** @@ -648,7 +675,7 @@ public WindowDataStream window(long windowSize) { */ public SingleOutputStreamOperator minBy(int positionToMinBy, boolean first) { checkFieldRange(positionToMinBy); - return aggregate(new MinByAggregationFunction(positionToMinBy, first)); + return aggregate(new MinByAggregationFunction(positionToMinBy, first, getOutputType())); } /** @@ -670,7 +697,7 @@ public WindowDataStream window(long windowSize) { */ public SingleOutputStreamOperator max(int positionToMax) { checkFieldRange(positionToMax); - return aggregate(new MaxAggregationFunction(positionToMax)); + return aggregate(new MaxAggregationFunction(positionToMax, getOutputType())); } /** @@ -702,7 +729,7 @@ public WindowDataStream window(long windowSize) { */ public SingleOutputStreamOperator maxBy(int positionToMaxBy, boolean first) { checkFieldRange(positionToMaxBy); - return aggregate(new MaxByAggregationFunction(positionToMaxBy, first)); + return aggregate(new MaxByAggregationFunction(positionToMaxBy, first, getOutputType())); } /** diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java index 512853a78015d..825b4dbed0828 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/AggregationFunction.java @@ -18,17 +18,23 @@ package org.apache.flink.streaming.api.function.aggregation; import org.apache.flink.api.common.functions.ReduceFunction; +import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; public abstract class AggregationFunction implements ReduceFunction { private static final long serialVersionUID = 1L; - + public int position; protected Tuple returnTuple; + protected boolean isTuple; + protected boolean isArray; - public AggregationFunction(int pos) { + public AggregationFunction(int pos, TypeInformation type) { this.position = pos; + this.isTuple = type.isTupleType(); + this.isArray = type instanceof BasicArrayTypeInfo || type instanceof PrimitiveArrayTypeInfo; } - } diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java index 93444df1fb191..383c39c107d76 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/ComparableAggregationFunction.java @@ -17,37 +17,56 @@ package org.apache.flink.streaming.api.function.aggregation; +import java.lang.reflect.Array; + +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; public abstract class ComparableAggregationFunction extends AggregationFunction { private static final long serialVersionUID = 1L; - public ComparableAggregationFunction(int positionToAggregate) { - super(positionToAggregate); + public ComparableAggregationFunction(int positionToAggregate, TypeInformation type) { + super(positionToAggregate, type); } @SuppressWarnings("unchecked") @Override public T reduce(T value1, T value2) throws Exception { - if (value1 instanceof Tuple) { + if (isTuple) { Tuple t1 = (Tuple) value1; Tuple t2 = (Tuple) value2; compare(t1, t2); return (T) returnTuple; + } else if (isArray) { + return compareArray(value1, value2); } else if (value1 instanceof Comparable) { if (isExtremal((Comparable) value1, value2)) { return value1; - }else{ + } else { return value2; } } else { - throw new RuntimeException("The values " + value1 + " and "+ value2 + " cannot be compared."); + throw new RuntimeException("The values " + value1 + " and " + value2 + + " cannot be compared."); } } + @SuppressWarnings("unchecked") + public T compareArray(T array1, T array2) { + Object v1 = Array.get(array1, position); + Object v2 = Array.get(array2, position); + if (isExtremal((Comparable) v1, v2)) { + Array.set(array2, position, v1); + } else { + Array.set(array2, position, v2); + } + + return array2; + } + public void compare(Tuple tuple1, Tuple tuple2) throws InstantiationException, IllegalAccessException { diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java index dd63b2d502cc2..d013162eefbf7 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxAggregationFunction.java @@ -17,12 +17,14 @@ package org.apache.flink.streaming.api.function.aggregation; +import org.apache.flink.api.common.typeinfo.TypeInformation; + public class MaxAggregationFunction extends ComparableAggregationFunction { private static final long serialVersionUID = 1L; - public MaxAggregationFunction(int pos) { - super(pos); + public MaxAggregationFunction(int pos, TypeInformation type) { + super(pos, type); } @Override diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java index 274c8b6fa611a..4679028b4dfe4 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MaxByAggregationFunction.java @@ -17,12 +17,14 @@ package org.apache.flink.streaming.api.function.aggregation; +import org.apache.flink.api.common.typeinfo.TypeInformation; + public class MaxByAggregationFunction extends MinByAggregationFunction { private static final long serialVersionUID = 1L; - public MaxByAggregationFunction(int pos, boolean first) { - super(pos, first); + public MaxByAggregationFunction(int pos, boolean first, TypeInformation type) { + super(pos, first, type); } @Override diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java index ad903a883e354..83c20c7d0d440 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinAggregationFunction.java @@ -17,12 +17,14 @@ package org.apache.flink.streaming.api.function.aggregation; +import org.apache.flink.api.common.typeinfo.TypeInformation; + public class MinAggregationFunction extends ComparableAggregationFunction { private static final long serialVersionUID = 1L; - public MinAggregationFunction(int pos) { - super(pos); + public MinAggregationFunction(int pos, TypeInformation type) { + super(pos, type); } @Override diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java index a4a328c5cad66..31d6b372f6c41 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/MinByAggregationFunction.java @@ -17,6 +17,9 @@ package org.apache.flink.streaming.api.function.aggregation; +import java.lang.reflect.Array; + +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; public class MinByAggregationFunction extends ComparableAggregationFunction { @@ -24,8 +27,8 @@ public class MinByAggregationFunction extends ComparableAggregationFunction type) { + super(pos, type); this.first = first; } @@ -43,6 +46,18 @@ public void compare(Tuple tuple1, Tuple tuple2) throws InstantiationExceptio } } + @Override + @SuppressWarnings("unchecked") + public T compareArray(T array1, T array2) { + Object v1 = Array.get(array1, position); + Object v2 = Array.get(array2, position); + if (isExtremal((Comparable) v1, v2)) { + return array1; + } else { + return array2; + } + } + @Override public boolean isExtremal(Comparable o1, R o2) { if (first) { diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java index 0429cdbc7b8ff..cd5007296577c 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/aggregation/SumAggregationFunction.java @@ -17,20 +17,23 @@ package org.apache.flink.streaming.api.function.aggregation; +import java.lang.reflect.Array; + +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; public abstract class SumAggregationFunction extends AggregationFunction { private static final long serialVersionUID = 1L; - public SumAggregationFunction(int pos) { - super(pos); + public SumAggregationFunction(int pos, TypeInformation type) { + super(pos, type); } @SuppressWarnings("unchecked") @Override public T reduce(T value1, T value2) throws Exception { - if (value1 instanceof Tuple) { + if (isTuple) { Tuple tuple1 = (Tuple) value1; Tuple tuple2 = (Tuple) value2; @@ -39,6 +42,11 @@ public T reduce(T value1, T value2) throws Exception { position); return (T) returnTuple; + } else if (isArray) { + Object v1 = Array.get(value1, position); + Object v2 = Array.get(value2, position); + Array.set(value2, position, add(v1, v2)); + return value2; } else { return (T) add(value1, value2); } @@ -47,23 +55,24 @@ public T reduce(T value1, T value2) throws Exception { protected abstract Object add(Object value1, Object value2); @SuppressWarnings("rawtypes") - public static SumAggregationFunction getSumFunction(int pos, Class type) { - - if (type == Integer.class) { - return new IntSum(pos); - } else if (type == Long.class) { - return new LongSum(pos); - } else if (type == Short.class) { - return new ShortSum(pos); - } else if (type == Double.class) { - return new DoubleSum(pos); - } else if (type == Float.class) { - return new FloatSum(pos); - } else if (type == Byte.class) { - return new ByteSum(pos); + public static SumAggregationFunction getSumFunction(int pos, Class classAtPos, + TypeInformation typeInfo) { + + if (classAtPos == Integer.class) { + return new IntSum(pos, typeInfo); + } else if (classAtPos == Long.class) { + return new LongSum(pos, typeInfo); + } else if (classAtPos == Short.class) { + return new ShortSum(pos, typeInfo); + } else if (classAtPos == Double.class) { + return new DoubleSum(pos, typeInfo); + } else if (classAtPos == Float.class) { + return new FloatSum(pos, typeInfo); + } else if (classAtPos == Byte.class) { + return new ByteSum(pos, typeInfo); } else { throw new RuntimeException("DataStream cannot be summed because the class " - + type.getSimpleName() + " does not support the + operator."); + + classAtPos.getSimpleName() + " does not support the + operator."); } } @@ -71,8 +80,8 @@ public static SumAggregationFunction getSumFunction(int pos, Class type) private static class IntSum extends SumAggregationFunction { private static final long serialVersionUID = 1L; - public IntSum(int pos) { - super(pos); + public IntSum(int pos, TypeInformation type) { + super(pos, type); } @Override @@ -84,8 +93,8 @@ protected Object add(Object value1, Object value2) { private static class LongSum extends SumAggregationFunction { private static final long serialVersionUID = 1L; - public LongSum(int pos) { - super(pos); + public LongSum(int pos, TypeInformation type) { + super(pos, type); } @Override @@ -98,8 +107,8 @@ private static class DoubleSum extends SumAggregationFunction { private static final long serialVersionUID = 1L; - public DoubleSum(int pos) { - super(pos); + public DoubleSum(int pos, TypeInformation type) { + super(pos, type); } @Override @@ -111,8 +120,8 @@ protected Object add(Object value1, Object value2) { private static class ShortSum extends SumAggregationFunction { private static final long serialVersionUID = 1L; - public ShortSum(int pos) { - super(pos); + public ShortSum(int pos, TypeInformation type) { + super(pos, type); } @Override @@ -124,8 +133,8 @@ protected Object add(Object value1, Object value2) { private static class FloatSum extends SumAggregationFunction { private static final long serialVersionUID = 1L; - public FloatSum(int pos) { - super(pos); + public FloatSum(int pos, TypeInformation type) { + super(pos, type); } @Override @@ -137,8 +146,8 @@ protected Object add(Object value1, Object value2) { private static class ByteSum extends SumAggregationFunction { private static final long serialVersionUID = 1L; - public ByteSum(int pos) { - super(pos); + public ByteSum(int pos, TypeInformation type) { + super(pos, type); } @Override diff --git a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java index 07f11850b2c3b..70e6118a22106 100644 --- a/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java +++ b/flink-addons/flink-streaming/flink-streaming-core/src/test/java/org/apache/flink/streaming/api/AggregationFunctionTest.java @@ -23,7 +23,9 @@ import java.util.ArrayList; import java.util.List; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.function.aggregation.MaxAggregationFunction; import org.apache.flink.streaming.api.function.aggregation.MaxByAggregationFunction; @@ -85,18 +87,21 @@ public void groupSumIntegerTest() { expectedGroupMaxList.add(new Tuple2(i % 3, i)); } + TypeInformation type1 = TypeExtractor.getForObject(new Tuple2(0, 0)); + TypeInformation type2 = TypeExtractor.getForObject(2); + @SuppressWarnings("unchecked") SumAggregationFunction> sumFunction = SumAggregationFunction - .getSumFunction(1, Integer.class); + .getSumFunction(1, Integer.class, type1); @SuppressWarnings("unchecked") SumAggregationFunction sumFunction0 = SumAggregationFunction.getSumFunction(0, - Integer.class); + Integer.class, type2); MinAggregationFunction> minFunction = new MinAggregationFunction>( - 1); - MinAggregationFunction minFunction0 = new MinAggregationFunction(0); + 1, type1); + MinAggregationFunction minFunction0 = new MinAggregationFunction(0, type2); MaxAggregationFunction> maxFunction = new MaxAggregationFunction>( - 1); - MaxAggregationFunction maxFunction0 = new MaxAggregationFunction(0); + 1, type1); + MaxAggregationFunction maxFunction0 = new MaxAggregationFunction(0, type2); List> sumList = MockInvokable.createAndExecute( new StreamReduceInvokable>(sumFunction), getInputList()); @@ -156,14 +161,14 @@ public void groupSumIntegerTest() { } MaxByAggregationFunction> maxByFunctionFirst = new MaxByAggregationFunction>( - 0, true); + 0, true, type1); MaxByAggregationFunction> maxByFunctionLast = new MaxByAggregationFunction>( - 0, false); + 0, false, type1); MinByAggregationFunction> minByFunctionFirst = new MinByAggregationFunction>( - 0, true); + 0, true, type1); MinByAggregationFunction> minByFunctionLast = new MinByAggregationFunction>( - 0, false); + 0, false, type1); List> maxByFirstExpected = new ArrayList>(); maxByFirstExpected.add(new Tuple2(0, 0)); @@ -226,16 +231,17 @@ public void groupSumIntegerTest() { @Test public void minMaxByTest() { + TypeInformation type1 = TypeExtractor.getForObject(new Tuple2(0, 0)); MaxByAggregationFunction> maxByFunctionFirst = new MaxByAggregationFunction>( - 0, true); + 0, true, type1); MaxByAggregationFunction> maxByFunctionLast = new MaxByAggregationFunction>( - 0, false); + 0, false, type1); MinByAggregationFunction> minByFunctionFirst = new MinByAggregationFunction>( - 0, true); + 0, true, type1); MinByAggregationFunction> minByFunctionLast = new MinByAggregationFunction>( - 0, false); + 0, false, type1); List> maxByFirstExpected = new ArrayList>(); maxByFirstExpected.add(new Tuple2(0, 0));