Skip to content

Commit

Permalink
[FLINK-1622][java-api][scala-api] add a GroupCombine operator
Browse files Browse the repository at this point in the history
The GroupCombine operator acts like a the optional combine step in the
GroupReduceFunction. It is more general because it combines from an
input to an arbitrary output type. Combining is performed on the
partitions with as much data in memory as possible. This may lead to
partial results.

The operator can be used to pre-combine elements into an intermediate
output format before applying a proper groupReduce to produce the final
output format.

* make Combine and FlatCombine generic by adding an output type

* add documentation

* Reuse GroupReduceCombineDriver and SynchronousChainedCombineDriver for GroupCombine operator
** make them more generic by specifying input and output type
** implement AllCombineDriver

* add Java tests
* add Scala test

This closes apache#466
  • Loading branch information
mxm committed Mar 18, 2015
1 parent 4a49a73 commit e93e0cb
Show file tree
Hide file tree
Showing 43 changed files with 2,187 additions and 164 deletions.
115 changes: 107 additions & 8 deletions docs/dataset_transformations.md
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ class WC(val word: String, val count: Int) {
}

val words: DataSet[WC] = // [...]
val wordCounts = words.groupBy { _.word } reduce {
val wordCounts = words.groupBy { _.word } reduce {
(w1, w2) => new WC(w1.word, w1.count + w2.count)
}
~~~
Expand Down Expand Up @@ -298,7 +298,7 @@ val reducedTuples = tuples.groupBy(0, 1).reduce { ... }

#### Reduce on DataSet grouped by Case Class Fields

When using Case Classes you can also specify the grouping key using the names of the fields:
When using Case Classes you can also specify the grouping key using the names of the fields:

~~~scala
case class MyClass(val a: String, b: Int, c: Double)
Expand Down Expand Up @@ -334,7 +334,7 @@ public class DistinctReduce

Set<String> uniqStrings = new HashSet<String>();
Integer key = null;

// add all strings of the group to the set
for (Tuple2<Integer, String> t : in) {
key = t.f0;
Expand Down Expand Up @@ -524,6 +524,99 @@ class MyCombinableGroupReducer
</div>
</div>

### GroupCombine on a Grouped DataSet

The GroupCombine transformation is the generalized form of the combine step in
the Combinable GroupReduceFunction. It is generalized in the sense that it
allows combining of input type `I` to an arbitrary output type `O`. In contrast,
the combine step in the GroupReduce only allows combining from input type `I` to
output type `I`. This is because the reduce step in the GroupReduceFunction
expects input type `I`.

In some applications, it is desirable to combine a DataSet into an intermediate
format before performing additional transformations (e.g. to reduce data
size). This can be achieved with a ComineGroup transformation with very little
costs.

**Note:** The GroupCombine on a Grouped DataSet is performed in memory with a
greedy strategy which may not process all data at once but in multiple
steps. It is also performed on the individual partitions without a data
exchange like in a GroupReduce transformation. This may lead to partial
results.

The following example demonstrates the use of a CombineGroup transformation for
an alternative WordCount implementation. In the implementation,

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">

~~~java
DataSet<String> input = [..] // The words received as input
DataSet<String> groupedInput = input.groupBy(0); // group identical words

DataSet<Tuple2<String, Integer>> combinedWords = groupedInput.combineGroup(new FlatCombineFunction<String, Tuple2<String, Integer>() {

public void combine(Iterable<String> words, Collector<Tuple2<String, Integer>>) { // combine
int count = 0;
for (String word : words) {
count++;
}
out.collect(new Tuple2(word, count));
}
});

DataSet<Tuple2<String, Integer>> groupedCombinedWords = combinedWords.groupBy(0); // group by words again

DataSet<Tuple2<String, Integer>> output = combinedWords.groupReduce(new GroupReduceFunction() { // group reduce with full data exchange

public void reduce(Iterable<Tuple2<String, Integer>>, Collector<Tuple2<String, Integer>>) {
int count = 0;
for (Tuple2<String, Integer> word : words) {
count++;
}
out.collect(new Tuple2(word, count));
}
});
~~~

</div>
<div data-lang="scala" markdown="1">

~~~scala
val input: DataSet[String] = [..] // The words received as input
val groupedInput: DataSet[String] = input.groupBy(0)

val combinedWords: DataSet[(String, Int)] = groupedInput.groupCombine {
(words, out: Collector[(String, Int)]) =>
var count = 0
for (word <- words) {
count++
}
out.collect(word, count)
}

val groupedCombinedWords: DataSet[(String, Int)] = combinedWords.groupBy(0)

val output: DataSet[(String, Int)] = groupedInput.groupCombine {
(words, out: Collector[(String, Int)]) =>
var count = 0
for ((word, Int) <- words) {
count++
}
out.collect(word, count)
}

~~~

</div>
</div>

The above alternative WordCount implementation demonstrates how the GroupCombine
combines words before performing the GroupReduce transformation. The above
example is just a proof of concept. Note, how the combine step changes the type
of the DataSet which would normally required an additional Map transformation
before executing the GroupReduce.

### Aggregate on Grouped Tuple DataSet

There are some common aggregation operations that are frequently used. The Aggregate transformation provides the following build-in aggregation functions:
Expand Down Expand Up @@ -558,7 +651,7 @@ val output = input.groupBy(1).aggregate(SUM, 0).and(MIN, 2)
</div>
</div>

To apply multiple aggregations on a DataSet it is necessary to use the `.and()` function after the first aggregate, that means `.aggregate(SUM, 0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the original DataSet.
To apply multiple aggregations on a DataSet it is necessary to use the `.and()` function after the first aggregate, that means `.aggregate(SUM, 0).and(MIN, 2)` produces the sum of field 0 and the minimum of field 2 of the original DataSet.
In contrast to that `.aggregate(SUM, 0).aggregate(MIN, 2)` will apply an aggregation on an aggregation. In the given example it would produce the minimum of field 2 after calculating the sum of field 0 grouped by field 1.

**Note:** The set of aggregation functions will be extended in the future.
Expand Down Expand Up @@ -632,6 +725,12 @@ group-reduce function is not combinable. Therefore, this can be a very compute i
See the paragraph on "Combineable Group-Reduce Functions" above to learn how to implement a
combinable group-reduce function.

### GroupCombine on a full DataSet

The GroupCombine on a full DataSet works similar to the GroupCombine on a
grouped DataSet. The data is partitioned on all nodes and then combined in a
greedy fashion (i.e. only data fitting into memory is combined at once).

### Aggregate on full Tuple DataSet

There are some common aggregation operations that are frequently used. The Aggregate transformation
Expand Down Expand Up @@ -898,7 +997,7 @@ to manually pick a strategy, in case you want to enforce a specific way of execu
DataSet<SomeType> input1 = // [...]
DataSet<AnotherType> input2 = // [...]

DataSet<Tuple2<SomeType, AnotherType> result =
DataSet<Tuple2<SomeType, AnotherType> result =
input1.join(input2, BROADCAST_HASH_FIRST)
.where("id").equalTo("key");
~~~
Expand Down Expand Up @@ -1199,7 +1298,7 @@ val out = in.rebalance().map { ... }

### Hash-Partition

Hash-partitions a DataSet on a given key.
Hash-partitions a DataSet on a given key.
Keys can be specified as key expressions or field position keys (see [Reduce examples](#reduce-on-grouped-dataset) for how to specify keys).

<div class="codetabs" markdown="1">
Expand Down Expand Up @@ -1235,7 +1334,7 @@ Partitions can be sorted on multiple fields by chaining `sortPartition()` calls.

~~~java
DataSet<Tuple2<String, Integer>> in = // [...]
// Locally sort partitions in ascending order on the second String field and
// Locally sort partitions in ascending order on the second String field and
// in descending order on the first String field.
// Apply a MapPartition transformation on the sorted partitions.
DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)
Expand All @@ -1248,7 +1347,7 @@ DataSet<Tuple2<String, String>> out = in.sortPartition(1, Order.ASCENDING)

~~~scala
val in: DataSet[(String, Int)] = // [...]
// Locally sort partitions in ascending order on the second String field and
// Locally sort partitions in ascending order on the second String field and
// in descending order on the first String field.
// Apply a MapPartition transformation on the sorted partitions.
val out = in.sortPartition(1, Order.ASCENDING)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,26 @@

package org.apache.flink.compiler;

import org.apache.flink.compiler.costs.CostEstimator;
import org.apache.flink.compiler.costs.DefaultCostEstimator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.flink.api.common.operators.base.BulkIterationBase;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.operators.base.CrossOperatorBase;
import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.api.common.operators.base.FilterOperatorBase;
import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.base.GroupCombineOperatorBase;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
Expand All @@ -28,35 +48,18 @@
import java.util.Map;
import java.util.Set;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.flink.api.common.ExecutionMode;
import org.apache.flink.api.common.operators.base.SortPartitionOperatorBase;
import org.apache.flink.compiler.dag.SortPartitionNode;
import org.apache.flink.api.common.InvalidProgramException;
import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.operators.GenericDataSinkBase;
import org.apache.flink.api.common.operators.GenericDataSourceBase;
import org.apache.flink.api.common.operators.Operator;
import org.apache.flink.api.common.operators.Union;
import org.apache.flink.api.common.operators.base.BulkIterationBase;
import org.apache.flink.api.common.operators.base.CoGroupOperatorBase;
import org.apache.flink.api.common.operators.base.CrossOperatorBase;
import org.apache.flink.api.common.operators.base.DeltaIterationBase;
import org.apache.flink.api.common.operators.base.FilterOperatorBase;
import org.apache.flink.api.common.operators.base.FlatMapOperatorBase;
import org.apache.flink.api.common.operators.base.GroupReduceOperatorBase;
import org.apache.flink.api.common.operators.base.JoinOperatorBase;
import org.apache.flink.api.common.operators.base.MapOperatorBase;
import org.apache.flink.api.common.operators.base.MapPartitionOperatorBase;
import org.apache.flink.api.common.operators.base.PartitionOperatorBase;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.api.common.operators.base.BulkIterationBase.PartialSolutionPlaceHolder;
import org.apache.flink.api.common.operators.base.DeltaIterationBase.SolutionSetPlaceHolder;
import org.apache.flink.api.common.operators.base.DeltaIterationBase.WorksetPlaceHolder;
import org.apache.flink.compiler.costs.CostEstimator;
import org.apache.flink.compiler.costs.DefaultCostEstimator;

import org.apache.flink.compiler.dag.BinaryUnionNode;
import org.apache.flink.compiler.dag.BulkIterationNode;
import org.apache.flink.compiler.dag.BulkPartialSolutionNode;
Expand All @@ -68,10 +71,11 @@
import org.apache.flink.compiler.dag.FilterNode;
import org.apache.flink.compiler.dag.FlatMapNode;
import org.apache.flink.compiler.dag.GroupReduceNode;
import org.apache.flink.compiler.dag.GroupCombineNode;
import org.apache.flink.compiler.dag.IterationNode;
import org.apache.flink.compiler.dag.JoinNode;
import org.apache.flink.compiler.dag.MapNode;
import org.apache.flink.compiler.dag.MapPartitionNode;
import org.apache.flink.compiler.dag.JoinNode;
import org.apache.flink.compiler.dag.OptimizerNode;
import org.apache.flink.compiler.dag.PactConnection;
import org.apache.flink.compiler.dag.PartitionNode;
Expand All @@ -97,11 +101,14 @@
import org.apache.flink.compiler.plan.WorksetIterationPlanNode;
import org.apache.flink.compiler.plan.WorksetPlanNode;
import org.apache.flink.compiler.postpass.OptimizerPostPass;

import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;

import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;

import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Visitor;

Expand Down Expand Up @@ -686,6 +693,9 @@ else if (c instanceof ReduceOperatorBase) {
else if (c instanceof GroupReduceOperatorBase) {
n = new GroupReduceNode((GroupReduceOperatorBase<?, ?, ?>) c);
}
else if (c instanceof GroupCombineOperatorBase) {
n = new GroupCombineNode((GroupCombineOperatorBase<?, ?, ?>) c);
}
else if (c instanceof JoinOperatorBase) {
n = new JoinNode((JoinOperatorBase<?, ?, ?, ?>) c);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,10 @@ public void costOperator(PlanNode n) {

case SORTED_GROUP_COMBINE:
// partial grouping is always local and main memory resident. we should add a relative cpu cost at some point


// partial grouping is always local and main memory resident. we should add a relative cpu cost at some point
case ALL_GROUP_COMBINE:

case UNION:
// pipelined local union is for free

Expand Down
Loading

0 comments on commit e93e0cb

Please sign in to comment.