Skip to content

Commit

Permalink
[FLINK-1357] [compiler] Add union between static and dynamic path
Browse files Browse the repository at this point in the history
  • Loading branch information
StephanEwen committed Jan 6, 2015
1 parent d2f0c40 commit 0190dd2
Show file tree
Hide file tree
Showing 10 changed files with 412 additions and 105 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@
import org.apache.flink.compiler.postpass.OptimizerPostPass;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
import org.apache.flink.runtime.operators.util.LocalStrategy;
import org.apache.flink.util.InstantiationUtil;
Expand Down Expand Up @@ -580,9 +581,7 @@ private OptimizedPlan compile(Plan program, OptimizerPostPass postPasser) throws

// finalize the plan
OptimizedPlan plan = new PlanFinalizer().createFinalPlan(bestPlanSinks, program.getJobName(), program);

// swap the binary unions for n-ary unions. this changes no strategies or memory consumers whatsoever, so
// we can do this after the plan finalization

plan.accept(new BinaryUnionReplacer());

// post pass the plan. this is the phase where the serialization and comparator code is set
Expand Down Expand Up @@ -1029,7 +1028,6 @@ public boolean preVisit(OptimizerNode node) {
}
}


@Override
public void postVisit(OptimizerNode visitable) {}
}
Expand Down Expand Up @@ -1057,8 +1055,11 @@ public void postVisit(OptimizerNode node) {
}

/**
* Utility class that traverses a plan to collect all nodes and add them to the OptimizedPlan.
* Besides collecting all nodes, this traversal assigns the memory to the nodes.
* Finalization of the plan:
* - The graph of nodes is double-linked (links from child to parent are inserted)
* - If unions join static and dynamic paths, the cache is marked as a memory consumer
* - Relative memory fractions are assigned to all nodes.
* - All nodes are collected into a set.
*/
private static final class PlanFinalizer implements Visitor<PlanNode> {

Expand Down Expand Up @@ -1119,9 +1120,7 @@ private OptimizedPlan createFinalPlan(List<SinkPlanNode> sinks, String jobName,
c.setRelativeTempMemory(relativeMem);
if (LOG.isDebugEnabled()) {
LOG.debug("Assigned " + relativeMem + " of total memory to each instance of the temp " +
"table" +
" " +
"for " + c + ".");
"table for " + c + ".");
}
}
}
Expand All @@ -1143,6 +1142,12 @@ public boolean preVisit(PlanNode visitable) {
else if (visitable instanceof SourcePlanNode) {
this.sources.add((SourcePlanNode) visitable);
}
else if (visitable instanceof BinaryUnionPlanNode) {
BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
if (unionNode.unionsStaticAndDynamicPath()) {
unionNode.setDriverStrategy(DriverStrategy.UNION_WITH_CACHED);
}
}
else if (visitable instanceof BulkPartialSolutionPlanNode) {
// tell the partial solution about the iteration node that contains it
final BulkPartialSolutionPlanNode pspn = (BulkPartialSolutionPlanNode) visitable;
Expand Down Expand Up @@ -1229,7 +1234,6 @@ else if (visitable instanceof SolutionSetPlanNode) {
@Override
public void postVisit(PlanNode visitable) {}
}


/**
* A visitor that traverses the graph and collects cascading binary unions into a single n-ary
Expand All @@ -1256,24 +1260,50 @@ public boolean preVisit(PlanNode visitable) {
public void postVisit(PlanNode visitable) {

if (visitable instanceof BinaryUnionPlanNode) {

final BinaryUnionPlanNode unionNode = (BinaryUnionPlanNode) visitable;
final Channel in1 = unionNode.getInput1();
final Channel in2 = unionNode.getInput2();

PlanNode newUnionNode;
if (!unionNode.unionsStaticAndDynamicPath()) {

// both on static path, or both on dynamic path. we can collapse them
NAryUnionPlanNode newUnionNode;

List<Channel> inputs = new ArrayList<Channel>();
collect(in1, inputs);
collect(in2, inputs);
List<Channel> inputs = new ArrayList<Channel>();
collect(in1, inputs);
collect(in2, inputs);

newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs, unionNode.getGlobalProperties());
newUnionNode = new NAryUnionPlanNode(unionNode.getOptimizerNode(), inputs,
unionNode.getGlobalProperties(), unionNode.getCumulativeCosts());

newUnionNode.setDegreeOfParallelism(unionNode.getDegreeOfParallelism());

for (Channel c : inputs) {
c.setTarget(newUnionNode);
}
for (Channel c : inputs) {
c.setTarget(newUnionNode);
}

for(Channel channel : unionNode.getOutgoingChannels()){
channel.swapUnionNodes(newUnionNode);
for (Channel channel : unionNode.getOutgoingChannels()) {
channel.swapUnionNodes(newUnionNode);
newUnionNode.addOutgoingChannel(channel);
}
}
else {
// union between the static and the dynamic path. we need to handle this for now
// through a special union operator

// make sure that the first input is the cached (static) and the second input is the dynamic
if (in1.isOnDynamicPath()) {
BinaryUnionPlanNode newUnionNode = new BinaryUnionPlanNode(unionNode);

in1.setTarget(newUnionNode);
in2.setTarget(newUnionNode);

for (Channel channel : unionNode.getOutgoingChannels()) {
channel.swapUnionNodes(newUnionNode);
newUnionNode.addOutgoingChannel(channel);
}
}
}
}
}
Expand All @@ -1290,7 +1320,7 @@ private void collect(Channel in, List<Channel> inputs) {

inputs.addAll(((NAryUnionPlanNode) in.getSource()).getListOfInputs());
} else {
// is not a union node, so we take the channel directly
// is not a collapsed union node, so we take the channel directly
inputs.add(in);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,8 @@
* limitations under the License.
*/


package org.apache.flink.compiler.plan;


import org.apache.flink.compiler.dag.BinaryUnionNode;
import org.apache.flink.runtime.operators.DriverStrategy;

Expand All @@ -35,7 +33,28 @@ public BinaryUnionPlanNode(BinaryUnionNode template, Channel in1, Channel in2) {
super(template, "Union", in1, in2, DriverStrategy.UNION);
}

public BinaryUnionPlanNode(BinaryUnionPlanNode toSwapFrom) {
super(toSwapFrom.getOptimizerNode(), "Union-With-Cached", toSwapFrom.getInput2(), toSwapFrom.getInput1(),
DriverStrategy.UNION_WITH_CACHED);

this.globalProps = toSwapFrom.globalProps;
this.localProps = toSwapFrom.localProps;
this.nodeCosts = toSwapFrom.nodeCosts;
this.cumulativeCosts = toSwapFrom.cumulativeCosts;

setDegreeOfParallelism(toSwapFrom.getDegreeOfParallelism());
}

public BinaryUnionNode getOptimizerNode() {
return (BinaryUnionNode) this.template;
}

public boolean unionsStaticAndDynamicPath() {
return getInput1().isOnDynamicPath() != getInput2().isOnDynamicPath();
}

@Override
public int getMemoryConsumerWeight() {
return 0;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.util.Iterator;
import java.util.List;

import org.apache.flink.compiler.costs.Costs;
import org.apache.flink.compiler.dag.BinaryUnionNode;
import org.apache.flink.compiler.dataproperties.GlobalProperties;
import org.apache.flink.compiler.dataproperties.LocalProperties;
Expand All @@ -40,12 +41,16 @@ public class NAryUnionPlanNode extends PlanNode {
/**
* @param template
*/
public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> inputs, GlobalProperties gProps) {
public NAryUnionPlanNode(BinaryUnionNode template, List<Channel> inputs, GlobalProperties gProps,
Costs cumulativeCosts)
{
super(template, "Union", DriverStrategy.NONE);

this.inputs = inputs;
this.globalProps = gProps;
this.localProps = new LocalProperties();
this.nodeCosts = new Costs();
this.cumulativeCosts = cumulativeCosts;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.compiler;

import static org.junit.Assert.*;

import org.apache.flink.api.common.Plan;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.IterativeDataSet;
import org.apache.flink.compiler.plan.BinaryUnionPlanNode;
import org.apache.flink.compiler.plan.BulkIterationPlanNode;
import org.apache.flink.compiler.plan.Channel;
import org.apache.flink.compiler.plan.NAryUnionPlanNode;
import org.apache.flink.compiler.plan.OptimizedPlan;
import org.apache.flink.compiler.plan.SingleInputPlanNode;
import org.apache.flink.compiler.plantranslate.NepheleJobGraphGenerator;
import org.junit.Test;

@SuppressWarnings("serial")
public class UnionBetweenDynamicAndStaticPathTest extends CompilerTestBase {

@Test
public void testUnionStaticFirst() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Long> input1 = env.generateSequence(1, 10);
DataSet<Long> input2 = env.generateSequence(1, 10);

IterativeDataSet<Long> iteration = input1.iterate(10);

DataSet<Long> result = iteration.closeWith(
input2.union(input2).union(iteration.union(iteration)));

result.print();
result.print();

Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

assertEquals(2, op.getDataSinks().size());

BulkIterationPlanNode iterPlan = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();

SingleInputPlanNode noopNode = (SingleInputPlanNode) iterPlan.getRootOfStepFunction();
BinaryUnionPlanNode mixedUnion = (BinaryUnionPlanNode) noopNode.getInput().getSource();
NAryUnionPlanNode staticUnion = (NAryUnionPlanNode) mixedUnion.getInput1().getSource();
NAryUnionPlanNode dynamicUnion = (NAryUnionPlanNode) mixedUnion.getInput2().getSource();

assertTrue(mixedUnion.unionsStaticAndDynamicPath());
assertFalse(mixedUnion.getInput1().isOnDynamicPath());
assertTrue(mixedUnion.getInput2().isOnDynamicPath());
assertTrue(mixedUnion.getInput1().getTempMode().isCached());

for (Channel c : staticUnion.getInputs()) {
assertFalse(c.isOnDynamicPath());
}
for (Channel c : dynamicUnion.getInputs()) {
assertTrue(c.isOnDynamicPath());
}

assertEquals(0.5, iterPlan.getRelativeMemoryPerSubTask(), 0.0);
assertEquals(0.5, mixedUnion.getInput1().getRelativeTempMemory(), 0.0);
assertEquals(0.0, mixedUnion.getInput2().getRelativeTempMemory(), 0.0);

new NepheleJobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}

@Test
public void testUnionStaticSecond() {
try {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

DataSet<Long> input1 = env.generateSequence(1, 10);
DataSet<Long> input2 = env.generateSequence(1, 10);

IterativeDataSet<Long> iteration = input1.iterate(10);

DataSet<Long> iterResult = iteration
.closeWith(iteration.union(iteration).union(input2.union(input2)));

iterResult.print();
iterResult.print();


Plan p = env.createProgramPlan();
OptimizedPlan op = compileNoStats(p);

assertEquals(2, op.getDataSinks().size());

BulkIterationPlanNode iterPlan = (BulkIterationPlanNode) op.getDataSinks().iterator().next().getInput().getSource();

SingleInputPlanNode noopNode = (SingleInputPlanNode) iterPlan.getRootOfStepFunction();
BinaryUnionPlanNode mixedUnion = (BinaryUnionPlanNode) noopNode.getInput().getSource();
NAryUnionPlanNode staticUnion = (NAryUnionPlanNode) mixedUnion.getInput1().getSource();
NAryUnionPlanNode dynamicUnion = (NAryUnionPlanNode) mixedUnion.getInput2().getSource();

assertTrue(mixedUnion.unionsStaticAndDynamicPath());
assertFalse(mixedUnion.getInput1().isOnDynamicPath());
assertTrue(mixedUnion.getInput2().isOnDynamicPath());
assertTrue(mixedUnion.getInput1().getTempMode().isCached());

assertEquals(0.5, iterPlan.getRelativeMemoryPerSubTask(), 0.0);
assertEquals(0.5, mixedUnion.getInput1().getRelativeTempMemory(), 0.0);
assertEquals(0.0, mixedUnion.getInput2().getRelativeTempMemory(), 0.0);

for (Channel c : staticUnion.getInputs()) {
assertFalse(c.isOnDynamicPath());
}
for (Channel c : dynamicUnion.getInputs()) {
assertTrue(c.isOnDynamicPath());
}

new NepheleJobGraphGenerator().compileJobGraph(op);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
}
}
Loading

0 comments on commit 0190dd2

Please sign in to comment.