Skip to content

Commit

Permalink
Correctly handle cases where iteration operators do not consume all i…
Browse files Browse the repository at this point in the history
…nput data.

Allow cases where in-iteration DOP is lowered.
  • Loading branch information
StephanEwen committed Feb 5, 2014
1 parent d0672a3 commit 4cdf70f
Show file tree
Hide file tree
Showing 27 changed files with 277 additions and 194 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,7 @@ else if (defaultParallelism > maxMachinesJob * this.maxIntraNodeParallelism) {
// 4) It makes estimates about the data volume of the data sources and
// propagates those estimates through the plan

GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(this.statistics, maxMachinesJob, defaultParallelism, true);
GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(maxMachinesJob, defaultParallelism);
pactPlan.accept(graphCreator);

// if we have a plan with multiple data sinks, add logical optimizer nodes that have two data-sinks as children
Expand All @@ -669,11 +669,9 @@ else if (defaultParallelism > maxMachinesJob * this.maxIntraNodeParallelism) {
} else if (graphCreator.sinks.size() > 1) {
Iterator<DataSinkNode> iter = graphCreator.sinks.iterator();
rootNode = iter.next();
int id = graphCreator.getId();

while (iter.hasNext()) {
rootNode = new SinkJoiner(rootNode, iter.next());
rootNode.setId(id++);
}
} else {
throw new CompilerException("Bug: The optimizer plan representation has no sinks.");
Expand All @@ -682,7 +680,7 @@ else if (defaultParallelism > maxMachinesJob * this.maxIntraNodeParallelism) {
// now that we have all nodes created and recorded which ones consume memory, tell the nodes their minimal
// guaranteed memory, for further cost estimations. we assume an equal distribution of memory among consumer tasks

rootNode.accept(new MemoryDistributer(
rootNode.accept(new IdAndMemoryAndEstimatesVisitor(this.statistics,
graphCreator.getMemoryConsumerCount() == 0 ? 0 : memoryPerInstance / graphCreator.getMemoryConsumerCount()));

// Now that the previous step is done, the next step is to traverse the graph again for the two
Expand Down Expand Up @@ -745,7 +743,7 @@ else if (defaultParallelism > maxMachinesJob * this.maxIntraNodeParallelism) {
* from the plan can be traversed.
*/
public static List<DataSinkNode> createPreOptimizedPlan(Plan pactPlan) {
GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(null, -1, 1, false);
GraphCreatingVisitor graphCreator = new GraphCreatingVisitor(-1, 1);
pactPlan.accept(graphCreator);
return graphCreator.sinks;
}
Expand Down Expand Up @@ -773,38 +771,27 @@ private static final class GraphCreatingVisitor implements Visitor<Operator> {

private final List<DataSinkNode> sinks; // all data sink nodes in the optimizer plan

private final DataStatistics statistics; // used to access basic file statistics

private final int maxMachines; // the maximum number of machines to use

private final int defaultParallelism; // the default degree of parallelism

private int id; // the incrementing id for the nodes.

private int numMemoryConsumers;

private final boolean computeEstimates; // flag indicating whether to compute additional info

private final GraphCreatingVisitor parent; // reference to enclosing creator, in case of a recursive translation

private final boolean forceDOP;


GraphCreatingVisitor(DataStatistics statistics, int maxMachines, int defaultParallelism, boolean computeEstimates) {
this(null, false, statistics, maxMachines, defaultParallelism, computeEstimates);
GraphCreatingVisitor(int maxMachines, int defaultParallelism) {
this(null, false, maxMachines, defaultParallelism);
}

GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP,
DataStatistics statistics, int maxMachines, int defaultParallelism, boolean computeEstimates)
{
GraphCreatingVisitor(GraphCreatingVisitor parent, boolean forceDOP, int maxMachines, int defaultParallelism) {
this.con2node = new HashMap<Operator, OptimizerNode>();
this.sources = new ArrayList<DataSourceNode>(4);
this.sinks = new ArrayList<DataSinkNode>(2);
this.statistics = statistics;
this.maxMachines = maxMachines;
this.defaultParallelism = defaultParallelism;
this.id = 1;
this.computeEstimates = computeEstimates;
this.parent = parent;
this.forceDOP = forceDOP;
}
Expand Down Expand Up @@ -860,10 +847,6 @@ else if (c instanceof PartialSolutionPlaceHolder) {
BulkPartialSolutionNode p = new BulkPartialSolutionNode(holder, containingIterationNode);
p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
p.setSubtasksPerInstance(containingIterationNode.getSubtasksPerInstance());

// we need to manually set the estimates to the estimates from the initial partial solution
// we need to do this now, such that all successor nodes can compute their estimates properly
p.copyEstimates(containingIterationNode);
n = p;
}
else if (c instanceof WorksetPlaceHolder) {
Expand All @@ -876,10 +859,6 @@ else if (c instanceof WorksetPlaceHolder) {
WorksetNode p = new WorksetNode(holder, containingIterationNode);
p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
p.setSubtasksPerInstance(containingIterationNode.getSubtasksPerInstance());

// we need to manually set the estimates to the estimates from the initial partial solution
// we need to do this now, such that all successor nodes can compute their estimates properly
p.copyEstimates(containingIterationNode);
n = p;
}
else if (c instanceof SolutionSetPlaceHolder) {
Expand All @@ -892,10 +871,6 @@ else if (c instanceof SolutionSetPlaceHolder) {
SolutionSetNode p = new SolutionSetNode(holder, containingIterationNode);
p.setDegreeOfParallelism(containingIterationNode.getDegreeOfParallelism());
p.setSubtasksPerInstance(containingIterationNode.getSubtasksPerInstance());

// we need to manually set the estimates to the estimates from the initial partial solution
// we need to do this now, such that all successor nodes can compute their estimates properly
p.copyEstimates(containingIterationNode);
n = p;
}
else {
Expand Down Expand Up @@ -944,22 +919,8 @@ else if (c instanceof SolutionSetPlaceHolder) {
public void postVisit(Operator c) {
OptimizerNode n = this.con2node.get(c);

// check if we have been here before
if (n.getId() > 0) {
return;
}
n.setId(this.id);

// first connect to the predecessors
n.setInputs(this.con2node);

// read id again as it might have been incremented for newly created union nodes
this.id = n.getId() + 1;

// now compute the output estimates
if (this.computeEstimates) {
n.computeOutputEstimates(this.statistics);
}

// if the node represents a bulk iteration, we recursively translate the data flow now
if (n instanceof BulkIterationNode) {
Expand All @@ -968,21 +929,18 @@ public void postVisit(Operator c) {

// first, recursively build the data flow for the step function
final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
this.statistics, this.maxMachines, iterNode.getDegreeOfParallelism(), this.computeEstimates);
this.maxMachines, iterNode.getDegreeOfParallelism());
iter.getNextPartialSolution().accept(recursiveCreator);

OptimizerNode rootOfStepFunction = recursiveCreator.con2node.get(iter.getNextPartialSolution());
BulkPartialSolutionNode partialSolution =
(BulkPartialSolutionNode) recursiveCreator.con2node.get(iter.getPartialSolution());
if (partialSolution == null) {
throw new CompilerException("Error: The step functions result does not depend on the partial solution.");
throw new CompilerException("Invalid Bulk iteration: The result of the iterative step functions result does not depend on the partial solution.");
}

// add an outgoing connection to the root of the step function
PactConnection rootConn = new PactConnection(rootOfStepFunction);
rootOfStepFunction.addOutgoingConnection(rootConn);

iterNode.setNextPartialSolution(rootOfStepFunction, rootConn);
// add an outgoing connection to the root of the step function
iterNode.setNextPartialSolution(rootOfStepFunction);
iterNode.setPartialSolution(partialSolution);

// account for the nested memory consumers
Expand All @@ -997,7 +955,7 @@ else if (n instanceof WorksetIterationNode) {

// first, recursively build the data flow for the step function
final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
this.statistics, this.maxMachines, iterNode.getDegreeOfParallelism(), this.computeEstimates);
this.maxMachines, iterNode.getDegreeOfParallelism());
// descend from the solution set delta. check that it depends on both the workset
// and the solution set. If it does depend on both, this descend should create both nodes
iter.getSolutionSetDelta().accept(recursiveCreator);
Expand Down Expand Up @@ -1065,10 +1023,6 @@ else if (successor.getClass() == CoGroupNode.class) {
solutionSetDeltaNode.accept(pathIdentifier);
}
}

int getId() {
return this.id;
}

int getMemoryConsumerCount() {
return this.numMemoryConsumers;
Expand All @@ -1081,7 +1035,7 @@ private static final class StaticDynamicPathIdentifier implements Visitor<Optimi

private final int costWeight;

StaticDynamicPathIdentifier(int costWeight) {
private StaticDynamicPathIdentifier(int costWeight) {
this.costWeight = costWeight;
}

Expand All @@ -1100,35 +1054,54 @@ public void postVisit(OptimizerNode visitable) {
* Simple visitor that sets the minimal guaranteed memory per task based on the amount of available memory,
* the number of memory consumers, and on the task's degree of parallelism.
*/
private static final class MemoryDistributer implements Visitor<OptimizerNode> {
private static final class IdAndMemoryAndEstimatesVisitor implements Visitor<OptimizerNode> {

private final DataStatistics statistics;

private final long memoryPerTaskPerInstance;

MemoryDistributer(long memoryPerTaskPerInstance) {
private int id = 1;

private IdAndMemoryAndEstimatesVisitor(DataStatistics statistics, long memoryPerTaskPerInstance) {
this.statistics = statistics;
this.memoryPerTaskPerInstance = memoryPerTaskPerInstance;
}


@Override
public boolean preVisit(OptimizerNode visitable) {
// if required, recurse into the step function
if (visitable instanceof IterationNode) {
((IterationNode) visitable).acceptForStepFunction(this);
if (visitable.getId() != -1) {
// been here before
return false;
}

if (visitable.getMinimalMemoryPerSubTask() == -1) {
final long mem = visitable.isMemoryConsumer() ?
// assign minimum memory share, for lower bound estimates
final long mem = visitable.isMemoryConsumer() ?
this.memoryPerTaskPerInstance / visitable.getSubtasksPerInstance() : 0;
visitable.setMinimalMemoryPerSubTask(mem);
return true;
} else {
return false;
}
visitable.setMinimalMemoryPerSubTask(mem);

return true;
}


@Override
public void postVisit(OptimizerNode visitable) {}
public void postVisit(OptimizerNode visitable) {
// the node ids
visitable.initId(this.id++);

// connections need to figure out their maximum path depths
for (PactConnection conn : visitable.getIncomingConnections()) {
conn.initMaxDepth();
}

// the estimates
visitable.computeOutputEstimates(this.statistics);

// if required, recurse into the step function
if (visitable instanceof IterationNode) {
((IterationNode) visitable).acceptForStepFunction(this);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,24 +18,22 @@
import java.util.Map;

import eu.stratosphere.api.common.operators.Operator;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.costs.CostEstimator;
import eu.stratosphere.compiler.plan.PlanNode;
import eu.stratosphere.util.Visitor;

/**
* The optimizer's internal representation of the partial solution that is input to a bulk iteration.
*/
public abstract class AbstractPartialSolutionNode extends OptimizerNode
{
public abstract class AbstractPartialSolutionNode extends OptimizerNode {

protected AbstractPartialSolutionNode(Operator contract) {
super(contract);
}

// --------------------------------------------------------------------------------------------

public void copyEstimates(OptimizerNode node) {
protected void copyEstimates(OptimizerNode node) {
this.estimatedCardinality = node.estimatedCardinality;
this.estimatedNumRecords = node.estimatedNumRecords;
this.estimatedOutputSize = node.estimatedOutputSize;
Expand Down Expand Up @@ -67,11 +65,6 @@ public List<PactConnection> getIncomingConnections() {
@Override
public void setInputs(Map<Operator, OptimizerNode> contractToNode) {}

@Override
public void computeOutputEstimates(DataStatistics statistics) {
// do nothing. we obtain the estimates another way from the enclosing iteration
}

@Override
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
// no children, so nothing to compute
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@
import eu.stratosphere.pact.runtime.shipping.ShipStrategyType;

/**
* The Optimizer representation of a <i>Union</i>. A Union is automatically
* inserted before any node which has more than one incoming connection per
* input.
* The Optimizer representation of a binary <i>Union</i>.
*/
public class BinaryUnionNode extends TwoInputNode {

Expand All @@ -47,8 +45,8 @@ public class BinaryUnionNode extends TwoInputNode {
public BinaryUnionNode(OptimizerNode pred1, OptimizerNode pred2) {
super(new UnionPlaceholderContract());

this.input1 = new PactConnection(pred1, this, pred1.getMaxDepth() + 1);
this.input2 = new PactConnection(pred2, this, pred2.getMaxDepth() + 1);
this.input1 = new PactConnection(pred1, this);
this.input2 = new PactConnection(pred2, this);

pred1.addOutgoingConnection(this.input1);
pred2.addOutgoingConnection(this.input2);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,9 +108,29 @@ public OptimizerNode getNextPartialSolution() {
*
* @param nextPartialSolution The nextPartialSolution to set.
*/
public void setNextPartialSolution(OptimizerNode nextPartialSolution, PactConnection rootingConnection) {
public void setNextPartialSolution(OptimizerNode nextPartialSolution) {

// check if the root of the step function has the same DOP as the iteration
if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism() ||
nextPartialSolution.getSubtasksPerInstance() != getSubtasksPerInstance() )
{
// add a no-op to the root to express the re-partitioning
NoOpNode noop = new NoOpNode();
noop.setDegreeOfParallelism(getDegreeOfParallelism());
noop.setSubtasksPerInstance(getSubtasksPerInstance());

PactConnection noOpConn = new PactConnection(nextPartialSolution, noop);
noop.setIncomingConnection(noOpConn);
nextPartialSolution.addOutgoingConnection(noOpConn);

nextPartialSolution = noop;
}

PactConnection rootConn = new PactConnection(nextPartialSolution);
nextPartialSolution.addOutgoingConnection(rootConn);

this.nextPartialSolution = nextPartialSolution;
this.rootConnection = rootingConnection;
this.rootConnection = rootConn;
}

public int getCostWeight() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import java.util.Collections;

import eu.stratosphere.api.common.operators.BulkIteration.PartialSolutionPlaceHolder;
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.dataproperties.GlobalProperties;
import eu.stratosphere.compiler.dataproperties.LocalProperties;
import eu.stratosphere.compiler.plan.BulkPartialSolutionPlanNode;
Expand All @@ -24,8 +25,8 @@
/**
* The optimizer's internal representation of the partial solution that is input to a bulk iteration.
*/
public class BulkPartialSolutionNode extends AbstractPartialSolutionNode
{
public class BulkPartialSolutionNode extends AbstractPartialSolutionNode {
private final BulkIterationNode iterationNode;


Expand Down Expand Up @@ -56,6 +57,11 @@ public BulkIterationNode getIterationNode() {
return this.iterationNode;
}

@Override
public void computeOutputEstimates(DataStatistics statistics) {
copyEstimates(this.iterationNode.getPredecessorNode());
}

// --------------------------------------------------------------------------------------------

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ public void setInputs(Map<Operator, OptimizerNode> contractToNode) {
final PactConnection conn;
if (children.size() == 1) {
pred = contractToNode.get(children.get(0));
conn = new PactConnection(pred, this, pred.getMaxDepth() + 1);
conn = new PactConnection(pred, this);
} else {
pred = createdUnionCascade(children, contractToNode, null);
conn = new PactConnection(pred, this, pred.getMaxDepth() + 1);
conn = new PactConnection(pred, this);
conn.setShipStrategy(ShipStrategyType.FORWARD);
}
// create the connection and add it
Expand Down
Loading

0 comments on commit 4cdf70f

Please sign in to comment.