Skip to content

Commit

Permalink
[FLINK-7204] [core] CombineHint.NONE
Browse files Browse the repository at this point in the history
Add a new option to CombineHint which excludes the creation of a
combiner for a reduce function.

Gelly now excludes the combiner when simplifying graphs as used in most
algorithm unit and integration tests.

This closes apache#4350
  • Loading branch information
greghogan committed Jul 19, 2017
1 parent 9beccec commit ce10e57
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,13 @@ public enum CombineHint {
* Use a hash-based strategy. This should be faster in most cases, especially if the number
* of different keys is small compared to the number of input elements (eg. 1/10).
*/
HASH
HASH,

/**
* Disable the use of a combiner. This can be faster in cases when the number of different keys
* is very small compared to the number of input elements (eg. 1/100).
*/
NONE
}

private CombineHint hint;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.graph.asm.simple.directed;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
Expand All @@ -44,6 +45,7 @@ public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
.setParallelism(parallelism)
.name("Remove self-loops")
.distinct(0, 1)
.setCombineHint(CombineHint.NONE)
.setParallelism(parallelism)
.name("Remove duplicate edges");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.graph.asm.simple.undirected;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
Expand Down Expand Up @@ -74,6 +75,7 @@ public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
.setParallelism(parallelism)
.name("Remove self-loops")
.distinct(0, 1)
.setCombineHint(CombineHint.NONE)
.setParallelism(parallelism)
.name("Remove duplicate edges");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,18 @@
* limitations under the License.
*/


package org.apache.flink.optimizer.dag;

import java.util.Collections;
import java.util.List;

import org.apache.flink.api.common.operators.base.ReduceOperatorBase;
import org.apache.flink.optimizer.DataStatistics;
import org.apache.flink.optimizer.operators.AllReduceProperties;
import org.apache.flink.optimizer.operators.OperatorDescriptorSingle;
import org.apache.flink.optimizer.operators.ReduceProperties;
import org.apache.flink.runtime.operators.DriverStrategy;

import java.util.Collections;
import java.util.List;

/**
* The Optimizer representation of a <i>Reduce</i> operator.
*/
Expand Down Expand Up @@ -63,6 +62,9 @@ public ReduceNode(ReduceOperatorBase<?, ?> operator) {
case HASH:
combinerStrategy = DriverStrategy.HASHED_PARTIAL_REDUCE;
break;
case NONE:
combinerStrategy = DriverStrategy.NONE;
break;
default:
throw new RuntimeException("Unknown CombineHint");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
in.setLocalStrategy(LocalStrategy.COMBININGSORT, in.getLocalStrategyKeys(),
in.getLocalStrategySortOrder());
}
return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", in,
return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in,
DriverStrategy.SORTED_GROUP_REDUCE, this.keyList);
} else {
// non forward case. all local properties are killed anyways, so we can safely plug in a combiner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,20 +65,18 @@ public DriverStrategy getStrategy() {

@Override
public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {
Channel toReducer = in;

if (in.getShipStrategy() == ShipStrategyType.FORWARD ||
(node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty()))
{
if(in.getSource().getOptimizerNode() instanceof PartitionNode) {
(node.getBroadcastConnections() != null && !node.getBroadcastConnections().isEmpty())) {
if (in.getSource().getOptimizerNode() instanceof PartitionNode) {
LOG.warn("Cannot automatically inject combiner for ReduceFunction. Please add an explicit combiner with combineGroup() in front of the partition operator.");
}
return new SingleInputPlanNode(node, "Reduce ("+node.getOperator().getName()+")", in,
DriverStrategy.SORTED_REDUCE, this.keyList);
}
else {
} else if (combinerStrategy != DriverStrategy.NONE) {
// non forward case. all local properties are killed anyways, so we can safely plug in a combiner
Channel toCombiner = new Channel(in.getSource());
toCombiner.setShipStrategy(ShipStrategyType.FORWARD, DataExchangeMode.PIPELINED);

// create an input node for combine with same parallelism as input node
ReduceNode combinerNode = ((ReduceNode) node).getCombinerUtilityNode();
combinerNode.setParallelism(in.getSource().getParallelism());
Expand All @@ -89,15 +87,16 @@ public SingleInputPlanNode instantiate(Channel in, SingleInputNode node) {

combiner.setCosts(new Costs(0, 0));
combiner.initProperties(toCombiner.getGlobalProperties(), toCombiner.getLocalProperties());
Channel toReducer = new Channel(combiner);

toReducer = new Channel(combiner);
toReducer.setShipStrategy(in.getShipStrategy(), in.getShipStrategyKeys(),
in.getShipStrategySortOrder(), in.getDataExchangeMode());
toReducer.setLocalStrategy(LocalStrategy.SORT, in.getLocalStrategyKeys(), in.getLocalStrategySortOrder());

return new SingleInputPlanNode(node, "Reduce("+node.getOperator().getName()+")", toReducer,
DriverStrategy.SORTED_REDUCE, this.keyList);
}

return new SingleInputPlanNode(node, "Reduce (" + node.getOperator().getName() + ")", toReducer,
DriverStrategy.SORTED_REDUCE, this.keyList);

}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,23 +19,25 @@
package org.apache.flink.optimizer.java;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
import org.apache.flink.api.common.operators.util.FieldList;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.java.io.DiscardingOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.junit.Test;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.optimizer.plan.OptimizedPlan;
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.DriverStrategy;

import static org.junit.Assert.*;
import org.junit.Test;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;

@SuppressWarnings("serial")
public class ReduceCompilationTest extends CompilerTestBase implements java.io.Serializable {
Expand Down Expand Up @@ -327,4 +329,52 @@ public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<Strin
fail(e.getClass().getSimpleName() + " in test: " + e.getMessage());
}
}

/**
* Test program compilation when the Reduce's combiner has been excluded
* by setting {@code CombineHint.NONE}.
*/
@Test
public void testGroupedReduceWithoutCombiner() {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(8);

DataSet<Tuple2<String, Double>> data = env.readCsvFile("file:https:///will/never/be/read").types(String.class, Double.class)
.name("source").setParallelism(6);

data
.groupBy(0)
.reduce(new RichReduceFunction<Tuple2<String, Double>>() {
@Override
public Tuple2<String, Double> reduce(Tuple2<String, Double> value1, Tuple2<String, Double> value2) {
return null;
}
}).setCombineHint(CombineHint.NONE).name("reducer")
.output(new DiscardingOutputFormat<Tuple2<String, Double>>()).name("sink");

Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(op);

// get the original nodes
SourcePlanNode sourceNode = resolver.getNode("source");
SingleInputPlanNode reduceNode = resolver.getNode("reducer");
SinkPlanNode sinkNode = resolver.getNode("sink");

// check wiring
assertEquals(sourceNode, reduceNode.getInput().getSource());

// check the strategies
assertEquals(DriverStrategy.SORTED_REDUCE, reduceNode.getDriverStrategy());

// check the keys
assertEquals(new FieldList(0), reduceNode.getKeys(0));
assertEquals(new FieldList(0), reduceNode.getInput().getLocalStrategyKeys());

// check parallelism
assertEquals(6, sourceNode.getParallelism());
assertEquals(8, reduceNode.getParallelism());
assertEquals(8, sinkNode.getParallelism());
}
}

0 comments on commit ce10e57

Please sign in to comment.