Skip to content

Commit

Permalink
[FLINK-9031] [optimizer] Fix DataSet Union operator translation bug.
Browse files Browse the repository at this point in the history
- Adds a pass over the pre-optimized plan that fixes the output strategy of union nodes to FORWARD.

This closes apache#5742
  • Loading branch information
fhueske authored and StephanEwen committed Mar 29, 2018
1 parent b0fbb89 commit 87ff6eb
Show file tree
Hide file tree
Showing 6 changed files with 244 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.flink.optimizer.plan.SinkPlanNode;
import org.apache.flink.optimizer.postpass.OptimizerPostPass;
import org.apache.flink.optimizer.traversals.RangePartitionRewriter;
import org.apache.flink.optimizer.traversals.UnionParallelismAndForwardEnforcer;
import org.apache.flink.util.InstantiationUtil;

import org.slf4j.Logger;
Expand Down Expand Up @@ -476,6 +477,11 @@ else if (graphCreator.getSinks().size() > 1) {
// guaranteed memory, for further cost estimations. We assume an equal distribution of memory among consumer tasks
rootNode.accept(new IdAndEstimatesVisitor(this.statistics));

// We need to enforce that union nodes always forward their output to their successor.
// Any partitioning must be either pushed before or done after the union, but not on the union's output.
UnionParallelismAndForwardEnforcer unionEnforcer = new UnionParallelismAndForwardEnforcer();
rootNode.accept(unionEnforcer);

// We are dealing with operator DAGs, rather than operator trees.
// That requires us to deviate at some points from the classical DB optimizer algorithms.
// This step builds auxiliary structures to help track branches and joins in the DAG
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,15 @@ public BinaryUnionNode(Union<?> union){
super(union);
}

@Override
public void addOutgoingConnection(DagConnection connection) {
// ensure that union nodes have not more than one outgoing connection.
if (this.getOutgoingConnections() != null && this.getOutgoingConnections().size() > 0) {
throw new CompilerException("BinaryUnionNode may only have a single outgoing connection.");
}
super.addOutgoingConnection(connection);
}

@Override
public String getOperatorName() {
return "Union";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -635,6 +635,12 @@ private int translateChannel(Channel input, int inputIndex, JobVertex targetVert
in.setShipStrategy(ShipStrategyType.BROADCAST, in.getDataExchangeMode());
}
}

// The outgoing connection of an NAryUnion must be a forward connection.
if (input.getShipStrategy() != ShipStrategyType.FORWARD && !isBroadcast) {
throw new CompilerException("Optimized plan contains Union with non-forward outgoing ship strategy.");
}

}
else if (inputPlanNode instanceof BulkPartialSolutionPlanNode) {
if (this.vertices.get(inputPlanNode) == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,7 +244,11 @@ else if (c instanceof DeltaIterationBase.SolutionSetPlaceHolder) {
if (n.getParallelism() < 1) {
// set the parallelism
int par = c.getParallelism();
if (par > 0) {
if (n instanceof BinaryUnionNode) {
// Keep parallelism of union undefined for now.
// It will be determined based on the parallelism of its successor.
par = -1;
} else if (par > 0) {
if (this.forceParallelism && par != this.defaultParallelism) {
par = this.defaultParallelism;
Optimizer.LOG.warn("The parallelism of nested dataflows (such as step functions in iterations) is " +
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.optimizer.traversals;

import org.apache.flink.optimizer.dag.BinaryUnionNode;
import org.apache.flink.optimizer.dag.DagConnection;
import org.apache.flink.optimizer.dag.IterationNode;
import org.apache.flink.optimizer.dag.OptimizerNode;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.util.Visitor;

/**
* Enforces that all union nodes have the same parallelism as their successor (there must be only one!)
* and that the union node and its successor are connected by a forward ship strategy.
*/
public class UnionParallelismAndForwardEnforcer implements Visitor<OptimizerNode> {

@Override
public boolean preVisit(OptimizerNode node) {

// if the current node is a union
if (node instanceof BinaryUnionNode) {
int parallelism = -1;
// set ship strategy of all outgoing connections to FORWARD.
for (DagConnection conn : node.getOutgoingConnections()) {
parallelism = conn.getTarget().getParallelism();
conn.setShipStrategy(ShipStrategyType.FORWARD);
}
// adjust parallelism to be same as successor
node.setParallelism(parallelism);
}

// traverse the whole plan
return true;
}

@Override
public void postVisit(OptimizerNode node) {
// if required, recurse into the step function
if (node instanceof IterationNode) {
((IterationNode) node).acceptForStepFunction(this);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
import org.apache.flink.optimizer.plan.SingleInputPlanNode;
import org.apache.flink.optimizer.plan.SourcePlanNode;
import org.apache.flink.optimizer.plantranslate.JobGraphGenerator;
import org.apache.flink.optimizer.testfunctions.IdentityGroupReducer;
import org.apache.flink.optimizer.testfunctions.IdentityMapper;
import org.apache.flink.optimizer.util.CompilerTestBase;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
Expand Down Expand Up @@ -438,4 +440,160 @@ public void testConsecutiveUnionsWithBroadcast() throws Exception {
}
}

/**
* Tests that a the outgoing connection of a Union node is FORWARD.
* See FLINK-9031 for a bug report.
*
* The issue is quite hard to reproduce as the plan choice seems to depend on the enumeration
* order due to lack of plan costs. This test is a smaller variant of the job that was reported
* to fail.
*
* /-\ /- PreFilter1 -\-/- Union - PostFilter1 - Reducer1 -\
* Src -< >- Union -< X >- Union - Out
* \-/ \- PreFilter2 -/-\- Union - PostFilter2 - Reducer2 -/
*/
@Test
public void testUnionForwardOutput() throws Exception {

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);

DataSet<Tuple2<Long, Long>> src1 = env.fromElements(new Tuple2<>(0L, 0L));

DataSet<Tuple2<Long, Long>> u1 = src1.union(src1)
.map(new IdentityMapper<>());

DataSet<Tuple2<Long, Long>> s1 = u1
.filter(x -> true).name("preFilter1");
DataSet<Tuple2<Long, Long>> s2 = u1
.filter(x -> true).name("preFilter2");

DataSet<Tuple2<Long, Long>> reduced1 = s1
.union(s2)
.filter(x -> true).name("postFilter1")
.groupBy(0)
.reduceGroup(new IdentityGroupReducer<>()).name("reducer1");
DataSet<Tuple2<Long, Long>> reduced2 = s1
.union(s2)
.filter(x -> true).name("postFilter2")
.groupBy(1)
.reduceGroup(new IdentityGroupReducer<>()).name("reducer2");

reduced1
.union(reduced2)
.output(new DiscardingOutputFormat<>());

// -----------------------------------------------------------------------------------------
// Verify optimized plan
// -----------------------------------------------------------------------------------------

OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());

OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);

SingleInputPlanNode unionOut1 = resolver.getNode("postFilter1");
SingleInputPlanNode unionOut2 = resolver.getNode("postFilter2");

assertEquals(ShipStrategyType.FORWARD, unionOut1.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, unionOut2.getInput().getShipStrategy());
}

/**
* Test the input and output shipping strategies for union operators with input and output
* operators with different parallelisms.
*
* Src1 - Map(fullP) -\-/- Union - Map(fullP) - Out
* X
* Src2 - Map(halfP) -/-\- Union - Map(halfP) - Out
*
* The union operator must always have the same parallelism as its successor and connect to it
* with a FORWARD strategy.
* In this program, the input connections for union should be FORWARD for parallelism-preserving
* connections and PARTITION_RANDOM for parallelism-changing connections.
*
*/
@Test
public void testUnionInputOutputDifferentDOP() throws Exception {

int fullDop = DEFAULT_PARALLELISM;
int halfDop = DEFAULT_PARALLELISM / 2;

ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(DEFAULT_PARALLELISM);

DataSet<Tuple2<Long, Long>> in1 = env.fromElements(new Tuple2<>(0L, 0L))
.map(new IdentityMapper<>()).setParallelism(fullDop).name("inDopFull");
DataSet<Tuple2<Long, Long>> in2 = env.fromElements(new Tuple2<>(0L, 0L))
.map(new IdentityMapper<>()).setParallelism(halfDop).name("inDopHalf");

DataSet<Tuple2<Long, Long>> union = in1.union(in2);

DataSet<Tuple2<Long, Long>> dopFullMap = union
.map(new IdentityMapper<>()).setParallelism(fullDop).name("outDopFull");
DataSet<Tuple2<Long, Long>> dopHalfMap = union
.map(new IdentityMapper<>()).setParallelism(halfDop).name("outDopHalf");

dopFullMap.output(new DiscardingOutputFormat<>());
dopHalfMap.output(new DiscardingOutputFormat<>());

// -----------------------------------------------------------------------------------------
// Verify optimized plan
// -----------------------------------------------------------------------------------------

OptimizedPlan optimizedPlan = compileNoStats(env.createProgramPlan());

OptimizerPlanNodeResolver resolver = getOptimizerPlanNodeResolver(optimizedPlan);

SingleInputPlanNode inDopFull = resolver.getNode("inDopFull");
SingleInputPlanNode inDopHalf = resolver.getNode("inDopHalf");
SingleInputPlanNode outDopFull = resolver.getNode("outDopFull");
SingleInputPlanNode outDopHalf = resolver.getNode("outDopHalf");
NAryUnionPlanNode unionDopFull = (NAryUnionPlanNode) outDopFull.getInput().getSource();
NAryUnionPlanNode unionDopHalf = (NAryUnionPlanNode) outDopHalf.getInput().getSource();

// check in map nodes
assertEquals(2, inDopFull.getOutgoingChannels().size());
assertEquals(2, inDopHalf.getOutgoingChannels().size());
assertEquals(fullDop, inDopFull.getParallelism());
assertEquals(halfDop, inDopHalf.getParallelism());

// check union nodes
assertEquals(fullDop, unionDopFull.getParallelism());
assertEquals(halfDop, unionDopHalf.getParallelism());

// check out map nodes
assertEquals(fullDop, outDopFull.getParallelism());
assertEquals(halfDop, outDopHalf.getParallelism());

// check Union -> outMap ship strategies
assertEquals(ShipStrategyType.FORWARD, outDopHalf.getInput().getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, outDopFull.getInput().getShipStrategy());

// check inMap -> Union ship strategies
Channel fullFull;
Channel fullHalf;
Channel halfFull;
Channel halfHalf;

if (inDopFull.getOutgoingChannels().get(0).getTarget() == unionDopFull) {
fullFull = inDopFull.getOutgoingChannels().get(0);
fullHalf = inDopFull.getOutgoingChannels().get(1);
} else {
fullFull = inDopFull.getOutgoingChannels().get(1);
fullHalf = inDopFull.getOutgoingChannels().get(0);
}
if (inDopHalf.getOutgoingChannels().get(0).getTarget() == unionDopFull) {
halfFull = inDopHalf.getOutgoingChannels().get(0);
halfHalf = inDopHalf.getOutgoingChannels().get(1);
} else {
halfFull = inDopHalf.getOutgoingChannels().get(1);
halfHalf = inDopHalf.getOutgoingChannels().get(0);
}

assertEquals(ShipStrategyType.FORWARD, fullFull.getShipStrategy());
assertEquals(ShipStrategyType.FORWARD, halfHalf.getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_RANDOM, fullHalf.getShipStrategy());
assertEquals(ShipStrategyType.PARTITION_RANDOM, halfFull.getShipStrategy());
}

}

0 comments on commit 87ff6eb

Please sign in to comment.