Skip to content

Commit

Permalink
[FLINK-3198][dataSet] Renames and documents better the use of the get…
Browse files Browse the repository at this point in the history
…DataSet() in Grouping.

This closes apache#1548
  • Loading branch information
kl0u authored and fhueske committed Jan 26, 2016
1 parent 5914e9a commit 902d420
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -94,16 +94,16 @@ public AggregateOperator(DataSet<IN> input, Aggregations function, int field, St
* @param field
*/
public AggregateOperator(Grouping<IN> input, Aggregations function, int field, String aggregateLocationName) {
super(Preconditions.checkNotNull(input).getDataSet(), input.getDataSet().getType());
super(Preconditions.checkNotNull(input).getInputDataSet(), input.getInputDataSet().getType());
Preconditions.checkNotNull(function);

this.aggregateLocationName = aggregateLocationName;

if (!input.getDataSet().getType().isTupleType()) {
if (!input.getInputDataSet().getType().isTupleType()) {
throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types.");
}

TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getDataSet().getType();
TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getInputDataSet().getType();

if (field < 0 || field >= inType.getArity()) {
throw new IllegalArgumentException("Aggregation field position is out of range.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public GroupCombineOperator(DataSet<IN> input, TypeInformation<OUT> resultType,
* @param function The user-defined GroupReduce function.
*/
public GroupCombineOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupCombineFunction<IN, OUT> function, String defaultName) {
super(input != null ? input.getDataSet() : null, resultType);
super(input != null ? input.getInputDataSet() : null, resultType);

this.function = function;
this.grouper = input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ public GroupReduceOperator(DataSet<IN> input, TypeInformation<OUT> resultType, G
* @param function The user-defined GroupReduce function.
*/
public GroupReduceOperator(Grouping<IN> input, TypeInformation<OUT> resultType, GroupReduceFunction<IN, OUT> function, String defaultName) {
super(input != null ? input.getDataSet() : null, resultType);
super(input != null ? input.getInputDataSet() : null, resultType);

this.function = function;
this.grouper = input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
*/
public abstract class Grouping<T> {

protected final DataSet<T> dataSet;
protected final DataSet<T> inputDataSet;

protected final Keys<T> keys;

Expand All @@ -53,13 +53,23 @@ public Grouping(DataSet<T> set, Keys<T> keys) {
throw new InvalidProgramException("The grouping keys must not be empty.");
}

this.dataSet = set;
this.inputDataSet = set;
this.keys = keys;
}


public DataSet<T> getDataSet() {
return this.dataSet;
/**
* Returns the input DataSet of a grouping operation, that is the one before the grouping. This means that
* if it is applied directly to the result of a grouping operation, it will cancel its effect. As an example, in the
* following snippet:
* <pre><code>
* DataSet<X> notGrouped = input.groupBy().getDataSet();
* DataSet<Y> allReduced = notGrouped.reduce()
* </pre></code>
* the <code>groupBy()</code> is as if it never happened, as the <code>notGrouped</code> DataSet corresponds
* to the input of the <code>groupBy()</code> (because of the <code>getDataset()</code>).
* */
public DataSet<T> getInputDataSet() {
return this.inputDataSet;
}

public Keys<T> getKeys() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public ReduceOperator(DataSet<IN> input, ReduceFunction<IN> function, String def


public ReduceOperator(Grouping<IN> input, ReduceFunction<IN> function, String defaultName) {
super(input.getDataSet(), input.getDataSet().getType());
super(input.getInputDataSet(), input.getInputDataSet().getType());

this.function = function;
this.grouper = input;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,12 @@ public class SortedGrouping<T> extends Grouping<T> {
public SortedGrouping(DataSet<T> set, Keys<T> keys, int field, Order order) {
super(set, keys);

if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) {
if (!Keys.ExpressionKeys.isSortKey(field, inputDataSet.getType())) {
throw new InvalidProgramException("Selected sort key is not a sortable type");
}

// use int-based expression key to properly resolve nested tuples for grouping
ExpressionKeys<T> ek = new ExpressionKeys<>(field, dataSet.getType());
ExpressionKeys<T> ek = new ExpressionKeys<>(field, inputDataSet.getType());

this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
this.groupSortOrders = new Order[groupSortKeyPositions.length];
Expand All @@ -75,12 +75,12 @@ public SortedGrouping(DataSet<T> set, Keys<T> keys, int field, Order order) {
public SortedGrouping(DataSet<T> set, Keys<T> keys, String field, Order order) {
super(set, keys);

if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) {
if (!Keys.ExpressionKeys.isSortKey(field, inputDataSet.getType())) {
throw new InvalidProgramException("Selected sort key is not a sortable type");
}

// resolve String-field to int using the expression keys
ExpressionKeys<T> ek = new ExpressionKeys<>(field, dataSet.getType());
ExpressionKeys<T> ek = new ExpressionKeys<>(field, inputDataSet.getType());

this.groupSortKeyPositions = ek.computeLogicalKeyPositions();
this.groupSortOrders = new Order[groupSortKeyPositions.length];
Expand Down Expand Up @@ -168,8 +168,8 @@ public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reduc
throw new NullPointerException("GroupReduce function must not be null.");
}
TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer,
this.getDataSet().getType(), Utils.getCallLocationName(), true);
return new GroupReduceOperator<>(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName());
inputDataSet.getType(), Utils.getCallLocationName(), true);
return new GroupReduceOperator<>(this, resultType, inputDataSet.clean(reducer), Utils.getCallLocationName());
}

/**
Expand All @@ -189,9 +189,9 @@ public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> co
throw new NullPointerException("GroupCombine function must not be null.");
}
TypeInformation<R> resultType = TypeExtractor.getGroupCombineReturnTypes(combiner,
this.getDataSet().getType(), Utils.getCallLocationName(), true);
this.getInputDataSet().getType(), Utils.getCallLocationName(), true);

return new GroupCombineOperator<>(this, resultType, dataSet.clean(combiner), Utils.getCallLocationName());
return new GroupCombineOperator<>(this, resultType, inputDataSet.clean(combiner), Utils.getCallLocationName());
}


Expand Down Expand Up @@ -228,11 +228,11 @@ public SortedGrouping<T> sortGroup(int field, Order order) {
if (groupSortSelectorFunctionKey != null) {
throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not supported");
}
if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) {
if (!Keys.ExpressionKeys.isSortKey(field, inputDataSet.getType())) {
throw new InvalidProgramException("Selected sort key is not a sortable type");
}

ExpressionKeys<T> ek = new ExpressionKeys<>(field, dataSet.getType());
ExpressionKeys<T> ek = new ExpressionKeys<>(field, inputDataSet.getType());

addSortGroupInternal(ek, order);
return this;
Expand All @@ -254,11 +254,11 @@ public SortedGrouping<T> sortGroup(String field, Order order) {
if (groupSortSelectorFunctionKey != null) {
throw new InvalidProgramException("Chaining sortGroup with KeySelector sorting is not supported");
}
if (!Keys.ExpressionKeys.isSortKey(field, dataSet.getType())) {
if (!Keys.ExpressionKeys.isSortKey(field, inputDataSet.getType())) {
throw new InvalidProgramException("Selected sort key is not a sortable type");
}

ExpressionKeys<T> ek = new ExpressionKeys<>(field, dataSet.getType());
ExpressionKeys<T> ek = new ExpressionKeys<>(field, inputDataSet.getType());

addSortGroupInternal(ek, order);
return this;
Expand All @@ -278,5 +278,4 @@ private void addSortGroupInternal(ExpressionKeys<T> ek, Order order) {
this.groupSortOrders[pos] = order; // use the same order
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ public ReduceOperator<T> reduce(ReduceFunction<T> reducer) {
if (reducer == null) {
throw new NullPointerException("Reduce function must not be null.");
}
return new ReduceOperator<T>(this, dataSet.clean(reducer), Utils.getCallLocationName());
return new ReduceOperator<T>(this, inputDataSet.clean(reducer), Utils.getCallLocationName());
}

/**
Expand All @@ -157,9 +157,9 @@ public <R> GroupReduceOperator<T, R> reduceGroup(GroupReduceFunction<T, R> reduc
throw new NullPointerException("GroupReduce function must not be null.");
}
TypeInformation<R> resultType = TypeExtractor.getGroupReduceReturnTypes(reducer,
this.getDataSet().getType(), Utils.getCallLocationName(), true);
this.getInputDataSet().getType(), Utils.getCallLocationName(), true);

return new GroupReduceOperator<T, R>(this, resultType, dataSet.clean(reducer), Utils.getCallLocationName());
return new GroupReduceOperator<T, R>(this, resultType, inputDataSet.clean(reducer), Utils.getCallLocationName());
}

/**
Expand All @@ -179,9 +179,9 @@ public <R> GroupCombineOperator<T, R> combineGroup(GroupCombineFunction<T, R> co
throw new NullPointerException("GroupCombine function must not be null.");
}
TypeInformation<R> resultType = TypeExtractor.getGroupCombineReturnTypes(combiner,
this.getDataSet().getType(), Utils.getCallLocationName(), true);
this.getInputDataSet().getType(), Utils.getCallLocationName(), true);

return new GroupCombineOperator<T, R>(this, resultType, dataSet.clean(combiner), Utils.getCallLocationName());
return new GroupCombineOperator<T, R>(this, resultType, inputDataSet.clean(combiner), Utils.getCallLocationName());
}

/**
Expand Down Expand Up @@ -210,12 +210,12 @@ public GroupReduceOperator<T, T> first(int n) {
public ReduceOperator<T> minBy(int... fields) {

// Check for using a tuple
if(!this.dataSet.getType().isTupleType()) {
if(!this.inputDataSet.getType().isTupleType()) {
throw new InvalidProgramException("Method minBy(int) only works on tuples.");
}

return new ReduceOperator<T>(this, new SelectByMinFunction(
(TupleTypeInfo) this.dataSet.getType(), fields), Utils.getCallLocationName());
(TupleTypeInfo) this.inputDataSet.getType(), fields), Utils.getCallLocationName());
}

/**
Expand All @@ -231,12 +231,12 @@ public ReduceOperator<T> minBy(int... fields) {
public ReduceOperator<T> maxBy(int... fields) {

// Check for using a tuple
if(!this.dataSet.getType().isTupleType()) {
if(!this.inputDataSet.getType().isTupleType()) {
throw new InvalidProgramException("Method maxBy(int) only works on tuples.");
}

return new ReduceOperator<T>(this, new SelectByMaxFunction(
(TupleTypeInfo) this.dataSet.getType(), fields), Utils.getCallLocationName());
(TupleTypeInfo) this.inputDataSet.getType(), fields), Utils.getCallLocationName());
}
// --------------------------------------------------------------------------------------------
// Group Operations
Expand All @@ -259,7 +259,7 @@ public SortedGrouping<T> sortGroup(int field, Order order) {
throw new InvalidProgramException("KeySelector grouping keys and field index group-sorting keys cannot be used together.");
}

SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, this.keys, field, order);
SortedGrouping<T> sg = new SortedGrouping<T>(this.inputDataSet, this.keys, field, order);
sg.customPartitioner = getCustomPartitioner();
return sg;
}
Expand All @@ -280,7 +280,7 @@ public SortedGrouping<T> sortGroup(String field, Order order) {
throw new InvalidProgramException("KeySelector grouping keys and field expression group-sorting keys cannot be used together.");
}

SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, this.keys, field, order);
SortedGrouping<T> sg = new SortedGrouping<T>(this.inputDataSet, this.keys, field, order);
sg.customPartitioner = getCustomPartitioner();
return sg;
}
Expand All @@ -301,8 +301,8 @@ public <K> SortedGrouping<T> sortGroup(KeySelector<T, K> keySelector, Order orde
throw new InvalidProgramException("KeySelector group-sorting keys can only be used with KeySelector grouping keys.");
}

TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, this.dataSet.getType());
SortedGrouping<T> sg = new SortedGrouping<T>(this.dataSet, this.keys, new Keys.SelectorFunctionKeys<T, K>(keySelector, this.dataSet.getType(), keyType), order);
TypeInformation<K> keyType = TypeExtractor.getKeySelectorTypes(keySelector, this.inputDataSet.getType());
SortedGrouping<T> sg = new SortedGrouping<T>(this.inputDataSet, this.keys, new Keys.SelectorFunctionKeys<T, K>(keySelector, this.inputDataSet.getType(), keyType), order);
sg.customPartitioner = getCustomPartitioner();
return sg;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testUnion2() {
final int NUM_INPUTS = 4;

// construct the plan it will be multiple flat maps, all unioned
// and the "unioned" dataSet will be grouped
// and the "unioned" inputDataSet will be grouped
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<String> source = env.readTextFile(IN_FILE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,15 +95,15 @@ public ScalaAggregateOperator(org.apache.flink.api.java.DataSet<IN> input, Aggre
* @param field
*/
public ScalaAggregateOperator(Grouping<IN> input, Aggregations function, int field) {
super(Preconditions.checkNotNull(input).getDataSet(), input.getDataSet().getType());
super(Preconditions.checkNotNull(input).getInputDataSet(), input.getInputDataSet().getType());

Preconditions.checkNotNull(function);

if (!input.getDataSet().getType().isTupleType()) {
if (!input.getInputDataSet().getType().isTupleType()) {
throw new InvalidProgramException("Aggregating on field positions is only possible on tuple data types.");
}

TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getDataSet().getType();
TupleTypeInfoBase<?> inType = (TupleTypeInfoBase<?>) input.getInputDataSet().getType();

if (field < 0 || field >= inType.getArity()) {
throw new IllegalArgumentException("Aggregation field position is out of range.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class BatchScalaAPICompletenessTest extends ScalaAPICompletenessTestBase {
"org.apache.flink.api.java.DataSet.getType",
"org.apache.flink.api.java.operators.Operator.getResultType",
"org.apache.flink.api.java.operators.Operator.getName",
"org.apache.flink.api.java.operators.Grouping.getDataSet",
"org.apache.flink.api.java.operators.Grouping.getInputDataSet",
"org.apache.flink.api.java.operators.Grouping.getKeys",
"org.apache.flink.api.java.operators.SingleInputOperator.getInput",
"org.apache.flink.api.java.operators.SingleInputOperator.getInputType",
Expand Down

0 comments on commit 902d420

Please sign in to comment.