From ce10e57bc163babd59005fa250c26e7604f23cf5 Mon Sep 17 00:00:00 2001 From: Greg Hogan Date: Sun, 16 Jul 2017 07:24:59 -0400 Subject: [PATCH] [FLINK-7204] [core] CombineHint.NONE Add a new option to CombineHint which excludes the creation of a combiner for a reduce function. Gelly now excludes the combiner when simplifying graphs as used in most algorithm unit and integration tests. This closes #4350 --- .../operators/base/ReduceOperatorBase.java | 8 ++- .../graph/asm/simple/directed/Simplify.java | 2 + .../graph/asm/simple/undirected/Simplify.java | 2 + .../flink/optimizer/dag/ReduceNode.java | 10 +-- .../GroupReduceWithCombineProperties.java | 2 +- .../optimizer/operators/ReduceProperties.java | 25 ++++---- .../optimizer/java/ReduceCompilationTest.java | 62 +++++++++++++++++-- 7 files changed, 86 insertions(+), 25 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java index 7828748e43cca..f97e4d62fa206 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/base/ReduceOperatorBase.java @@ -76,7 +76,13 @@ public enum CombineHint { * Use a hash-based strategy. This should be faster in most cases, especially if the number * of different keys is small compared to the number of input elements (eg. 1/10). */ - HASH + HASH, + + /** + * Disable the use of a combiner. This can be faster in cases when the number of different keys + * is very small compared to the number of input elements (eg. 1/100). + */ + NONE } private CombineHint hint; diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java index 1bab9c6dcbb8f..511840aa97fd3 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.asm.simple.directed; import org.apache.flink.api.common.functions.FilterFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; @@ -44,6 +45,7 @@ public Graph runInternal(Graph input) .setParallelism(parallelism) .name("Remove self-loops") .distinct(0, 1) + .setCombineHint(CombineHint.NONE) .setParallelism(parallelism) .name("Remove duplicate edges"); diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java index 6f1e282360aae..21db233270eb6 100644 --- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java +++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java @@ -19,6 +19,7 @@ package org.apache.flink.graph.asm.simple.undirected; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.java.DataSet; import org.apache.flink.graph.Edge; import org.apache.flink.graph.Graph; @@ -74,6 +75,7 @@ public Graph runInternal(Graph input) .setParallelism(parallelism) .name("Remove self-loops") .distinct(0, 1) + .setCombineHint(CombineHint.NONE) .setParallelism(parallelism) .name("Remove duplicate edges"); diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java index e83352eaba3e3..1a1f3eba11bf1 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/dag/ReduceNode.java @@ -16,12 +16,8 @@ * limitations under the License. */ - package org.apache.flink.optimizer.dag; -import java.util.Collections; -import java.util.List; - import org.apache.flink.api.common.operators.base.ReduceOperatorBase; import org.apache.flink.optimizer.DataStatistics; import org.apache.flink.optimizer.operators.AllReduceProperties; @@ -29,6 +25,9 @@ import org.apache.flink.optimizer.operators.ReduceProperties; import org.apache.flink.runtime.operators.DriverStrategy; +import java.util.Collections; +import java.util.List; + /** * The Optimizer representation of a Reduce operator. */ @@ -63,6 +62,9 @@ public ReduceNode(ReduceOperatorBase operator) { case HASH: combinerStrategy = DriverStrategy.HASHED_PARTIAL_REDUCE; break; + case NONE: + combinerStrategy = DriverStrategy.NONE; + break; default: throw new RuntimeException("Unknown CombineHint"); } diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java index 888b670b55f30..accd11bc0bc27 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/GroupReduceWithCombineProperties.java @@ -105,7 +105,7 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); } - return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", in, + return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in, DriverStrategy.SORTED_GROUP_REDUCE, this.keyList); } else { // non forward case. all local properties are killed anyways, so we can safely plug in a combiner diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java index d8e5a6c0d99df..eab31d30f64ea 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/operators/ReduceProperties.java @@ -65,20 +65,18 @@ public DriverStrategy getStrategy() { @Override public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { + Channel toReducer = in; + if (in.getShipStrategy() == ShipStrategyType.FORWARD || - (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) - { - if(in.getSource().getOptimizerNode() instanceof PartitionNode) { + (node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) { + if (in.getSource().getOptimizerNode() instanceof PartitionNode) { LOG.warn("Cannot automatically inject combiner for ReduceFunction. Please add an explicit combiner with combineGroup() in front of the partition operator."); } - return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in, - DriverStrategy.SORTED_REDUCE, this.keyList); - } - else { + } else if (combinerStrategy != DriverStrategy.NONE) { // non forward case. all local properties are killed anyways, so we can safely plug in a combiner Channel toCombiner = new Channel(in.getSource()); toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED); - + // create an input node for combine with same parallelism as input node ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode(); combinerNode.setParallelism(in.getSource().getParallelism()); @@ -89,15 +87,16 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) { combiner.setCosts(new Costs(0, 0)); combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties()); - - Channel toReducer = new Channel(combiner); + + toReducer = new Channel(combiner); toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(), in.getShipStrategySortOrder(), in.getDataExchangeMode()); toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder()); - - return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", toReducer, - DriverStrategy.SORTED_REDUCE, this.keyList); } + + return new SingleInputPlanNode(node, "Reduce (" + node.getOperator().getName() + ")", toReducer, + DriverStrategy.SORTED_REDUCE, this.keyList); + } @Override diff --git a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java index f513155802c37..d2c640f8a5902 100644 --- a/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java +++ b/flink-optimizer/src/test/java/org/apache/flink/optimizer/java/ReduceCompilationTest.java @@ -19,23 +19,25 @@ package org.apache.flink.optimizer.java; import org.apache.flink.api.common.Plan; +import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint; import org.apache.flink.api.common.operators.util.FieldList; +import org.apache.flink.api.java.DataSet; +import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.optimizer.util.CompilerTestBase; -import org.junit.Test; -import org.apache.flink.api.java.DataSet; -import org.apache.flink.api.java.ExecutionEnvironment; import org.apache.flink.optimizer.plan.OptimizedPlan; import org.apache.flink.optimizer.plan.SingleInputPlanNode; import org.apache.flink.optimizer.plan.SinkPlanNode; import org.apache.flink.optimizer.plan.SourcePlanNode; +import org.apache.flink.optimizer.util.CompilerTestBase; import org.apache.flink.runtime.operators.DriverStrategy; -import static org.junit.Assert.*; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; @SuppressWarnings("serial") public class ReduceCompilationTest extends CompilerTestBase implements java.io.Serializable { @@ -327,4 +329,52 @@ public Tuple2 reduce(Tuple2 value1, Tuple2> data = env.readCsvFile("file:///will/never/be/read").types(String.class, Double.class) + .name("source").setParallelism(6); + + data + .groupBy(0) + .reduce(new RichReduceFunction>() { + @Override + public Tuple2 reduce(Tuple2 value1, Tuple2 value2) { + return null; + } + }).setCombineHint(CombineHint.NONE).name("reducer") + .output(new DiscardingOutputFormat>()).name("sink"); + + Plan p = env.createProgramPlan(); + OptimizedPlan op = compileNoStats(p); + + OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op); + + // get the original nodes + SourcePlanNode sourceNode = resolver.getNode("source"); + SingleInputPlanNode reduceNode = resolver.getNode("reducer"); + SinkPlanNode sinkNode = resolver.getNode("sink"); + + // check wiring + assertEquals(sourceNode, reduceNode.getInput().getSource()); + + // check the strategies + assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy()); + + // check the keys + assertEquals(new FieldList(0), reduceNode.getKeys(0)); + assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys()); + + // check parallelism + assertEquals(6, sourceNode.getParallelism()); + assertEquals(8, reduceNode.getParallelism()); + assertEquals(8, sinkNode.getParallelism()); + } }