Skip to content

Commit

Permalink
Termination Criterion for Bulk Iterations
Browse files Browse the repository at this point in the history
  • Loading branch information
markus-h authored and StephanEwen committed Mar 5, 2014
1 parent 2006980 commit 2f1050f
Show file tree
Hide file tree
Showing 14 changed files with 743 additions and 42 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -935,17 +935,40 @@ public void postVisit(Operator c) {
// first, recursively build the data flow for the step function
final GraphCreatingVisitor recursiveCreator = new GraphCreatingVisitor(this, true,
this.maxMachines, iterNode.getDegreeOfParallelism());

BulkPartialSolutionNode partialSolution = null;

iter.getNextPartialSolution().accept(recursiveCreator);

partialSolution = (BulkPartialSolutionNode) recursiveCreator.con2node.get(iter.getPartialSolution());
OptimizerNode rootOfStepFunction = recursiveCreator.con2node.get(iter.getNextPartialSolution());
BulkPartialSolutionNode partialSolution =
(BulkPartialSolutionNode) recursiveCreator.con2node.get(iter.getPartialSolution());
if (partialSolution == null) {
throw new CompilerException("Invalid Bulk iteration: The result of the iterative step functions result does not depend on the partial solution.");
throw new CompilerException("Error: The step functions result does not depend on the partial solution.");
}

// add an outgoing connection to the root of the step function
iterNode.setNextPartialSolution(rootOfStepFunction);

OptimizerNode terminationCriterion = null;
if(iter.getTerminationCriterion() != null) {
terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());

// no intermediate node
if(terminationCriterion == null) {
iter.getTerminationCriterion().accept(recursiveCreator);
terminationCriterion = recursiveCreator.con2node.get(iter.getTerminationCriterion());
partialSolution = (BulkPartialSolutionNode) recursiveCreator.con2node.get(iter.getPartialSolution());

if (partialSolution == null) {
throw new CompilerException("Error: The termination criterion result does not depend on the partial solution.");
}
}
}


// add an outgoing connection to the root of the step function
//PactConnection rootConn = new PactConnection(rootOfStepFunction);
//rootOfStepFunction.addOutgoingConnection(rootConn);

iterNode.setNextPartialSolution(rootOfStepFunction, terminationCriterion);
iterNode.setPartialSolution(partialSolution);

// account for the nested memory consumers
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import eu.stratosphere.compiler.DataStatistics;
import eu.stratosphere.compiler.PactCompiler.InterestingPropertyVisitor;
import eu.stratosphere.compiler.costs.CostEstimator;
import eu.stratosphere.compiler.dag.WorksetIterationNode.SingleRootJoiner;
import eu.stratosphere.compiler.dataproperties.GlobalProperties;
import eu.stratosphere.compiler.dataproperties.InterestingProperties;
import eu.stratosphere.compiler.dataproperties.LocalProperties;
Expand All @@ -44,11 +45,19 @@ public class BulkIterationNode extends SingleInputNode implements IterationNode

private BulkPartialSolutionNode partialSolution;

private OptimizerNode terminationCriterion;

private OptimizerNode nextPartialSolution;

//private OptimizerNode nextTerminationCriterion;

private PactConnection rootConnection;

private final int costWeight;
private PactConnection terminationCriterionRootConnection;


private OptimizerNode singleRoot;

// --------------------------------------------------------------------------------------------

Expand Down Expand Up @@ -103,14 +112,13 @@ public void setPartialSolution(BulkPartialSolutionNode partialSolution) {
public OptimizerNode getNextPartialSolution() {
return nextPartialSolution;
}


/**
* Sets the nextPartialSolution for this BulkIterationNode.
*
* @param nextPartialSolution The nextPartialSolution to set.
*/
public void setNextPartialSolution(OptimizerNode nextPartialSolution) {
public void setNextPartialSolution(OptimizerNode nextPartialSolution, OptimizerNode terminationCriterion) {

// check if the root of the step function has the same DOP as the iteration
if (nextPartialSolution.getDegreeOfParallelism() != getDegreeOfParallelism() ||
Expand All @@ -128,11 +136,37 @@ public void setNextPartialSolution(OptimizerNode nextPartialSolution) {
nextPartialSolution = noop;
}

PactConnection rootConn = new PactConnection(nextPartialSolution);
nextPartialSolution.addOutgoingConnection(rootConn);

this.nextPartialSolution = nextPartialSolution;
this.rootConnection = rootConn;
this.terminationCriterion = terminationCriterion;

if(terminationCriterion == null) {
this.singleRoot = nextPartialSolution;
this.rootConnection = new PactConnection(nextPartialSolution);
}
// we have a termination criterion
else {
// this.singleRoot = new SingleRootJoiner();
// this.singleRoot.setInputs(new PactConnection(nextPartialSolution, this.singleRoot, -1), new PactConnection(terminationCriterion, this.singleRoot, -1));

//this.nextTerminationCriterion = terminationCriterion;

SingleRootJoiner singleRootJoiner = new SingleRootJoiner();
this.rootConnection = new PactConnection(nextPartialSolution, singleRootJoiner);
this.terminationCriterionRootConnection = new PactConnection(terminationCriterion, singleRootJoiner);
singleRootJoiner.setInputs(this.rootConnection, this.terminationCriterionRootConnection);

this.singleRoot = singleRootJoiner;

// add connection to terminationCriterion for interesting properties visitor
terminationCriterion.addOutgoingConnection(terminationCriterionRootConnection);

//nextTerminationCriterion.addOutgoingConnection(this.terminationCriterionRootConnection);

}

nextPartialSolution.addOutgoingConnection(rootConnection);


}

public int getCostWeight() {
Expand Down Expand Up @@ -176,6 +210,13 @@ public boolean isMemoryConsumer() {
public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
final InterestingProperties intProps = getInterestingProperties().clone();


if(this.terminationCriterion != null) {
// first propagate through termination Criterion
this.terminationCriterionRootConnection.setInterestingProperties(intProps);
this.terminationCriterion.accept(new InterestingPropertyVisitor(estimator));
}

// we need to make 2 interesting property passes, because the root of the step function needs also
// the interesting properties as generated by the partial solution

Expand All @@ -202,6 +243,7 @@ public void computeInterestingPropertiesForInputs(CostEstimator estimator) {
inProps.addGlobalProperties(new RequestedGlobalProperties());
inProps.addLocalProperties(new RequestedLocalProperties());
this.inConn.setInterestingProperties(inProps);

}

@Override
Expand Down Expand Up @@ -236,13 +278,34 @@ protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, Li
}
}

// TODO Sanity Check terminationCriterionCandidates

// 5) Create a candidate for the Iteration Node for every remaining plan of the step function.
for (PlanNode candidate : candidates) {
BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate);
GlobalProperties gProps = candidate.getGlobalProperties().clone();
LocalProperties lProps = candidate.getLocalProperties().clone();
node.initProperties(gProps, lProps);
target.add(node);
if(terminationCriterion == null)
for (PlanNode candidate : candidates) {
BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate);
GlobalProperties gProps = candidate.getGlobalProperties().clone();
LocalProperties lProps = candidate.getLocalProperties().clone();
node.initProperties(gProps, lProps);
target.add(node);
}
else {
List<PlanNode> terminationCriterionCandidates = this.terminationCriterion.getAlternativePlans(estimator);

for (PlanNode candidate : candidates) {
for(PlanNode terminationCandidate : terminationCriterionCandidates) {
if (this.singleRoot.areBranchCompatible(candidate, terminationCandidate)) {

BulkIterationPlanNode node = new BulkIterationPlanNode(this, "BulkIteration ("+this.getPactContract().getName()+")", in, pspn, candidate, terminationCandidate);
GlobalProperties gProps = candidate.getGlobalProperties().clone();
LocalProperties lProps = candidate.getLocalProperties().clone();
node.initProperties(gProps, lProps);
target.add(node);

}
}
}

}
}

Expand All @@ -251,6 +314,12 @@ protected void instantiateCandidate(OperatorDescriptorSingle dps, Channel in, Li
// --------------------------------------------------------------------------------------------

public void acceptForStepFunction(Visitor<OptimizerNode> visitor) {
this.nextPartialSolution.accept(visitor);
// If termination criterion is defined
//if(this.singleRoot != null) {
this.singleRoot.accept(visitor);
//}
//else {
// this.nextPartialSolution.accept(visitor);
//}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ public LocalProperties computeLocalProperties(LocalProperties in1, LocalProperti
}
}

private static class SingleRootJoiner extends TwoInputNode {
public static class SingleRootJoiner extends TwoInputNode {

private SingleRootJoiner() {
SingleRootJoiner() {
super(NoOpBinaryUdfOp.INSTANCE);

setDegreeOfParallelism(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ public class BulkIterationPlanNode extends SingleInputPlanNode implements Iterat

private final PlanNode rootOfStepFunction;

private PlanNode rootOfTerminationCriterion;

private TypeSerializerFactory<?> serializerForIterationChannel;

// --------------------------------------------------------------------------------------------
Expand All @@ -41,6 +43,14 @@ public BulkIterationPlanNode(BulkIterationNode template, String nodeName, Channe
this.partialSolutionPlanNode = pspn;
this.rootOfStepFunction = rootOfStepFunction;
}

public BulkIterationPlanNode(BulkIterationNode template, String nodeName, Channel input,
BulkPartialSolutionPlanNode pspn, PlanNode rootOfStepFunction, PlanNode rootOfTerminationCriterion) {
this(template, nodeName, input, pspn, rootOfStepFunction);
// HACK!
//this.rootOfTerminationCriterion = rootOfTerminationCriterion.getInputs().next().getSource();
this.rootOfTerminationCriterion = rootOfTerminationCriterion;
}

// --------------------------------------------------------------------------------------------

Expand All @@ -60,6 +70,10 @@ public PlanNode getRootOfStepFunction() {
return this.rootOfStepFunction;
}

public PlanNode getRootOfTerminationCriterion() {
return this.rootOfTerminationCriterion;
}

// --------------------------------------------------------------------------------------------


Expand All @@ -74,6 +88,8 @@ public void setSerializerForIterationChannel(TypeSerializerFactory<?> serializer
public void setCosts(Costs nodeCosts) {
// add the costs from the step function
nodeCosts.addCosts(this.rootOfStepFunction.getCumulativeCosts());
if(rootOfTerminationCriterion != null)
nodeCosts.addCosts(this.rootOfTerminationCriterion.getCumulativeCosts());
super.setCosts(nodeCosts);
}

Expand Down Expand Up @@ -101,5 +117,9 @@ else if (fromOutside == FOUND_SOURCE) {
@Override
public void acceptForStepFunction(Visitor<PlanNode> visitor) {
this.rootOfStepFunction.accept(visitor);

if(this.rootOfTerminationCriterion != null)
this.rootOfTerminationCriterion.accept(visitor);

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,15 @@ private JobTaskVertex createSingleInputVertex(SingleInputPlanNode node) throws C
chaining = false;
}
}
// cannot chain the nodes that produce the next workset in a bulk iteration if a termination criterion follows
if (this.currentIteration != null && this.currentIteration instanceof BulkIterationPlanNode &&
node.getOutgoingChannels().size() > 0)
{
BulkIterationPlanNode wspn = (BulkIterationPlanNode) this.currentIteration;
if (wspn.getRootOfStepFunction() == pred || wspn.getRootOfTerminationCriterion() == pred) {
chaining = false;
}
}
}

final JobTaskVertex vertex;
Expand Down Expand Up @@ -1176,8 +1185,11 @@ private void finalizeBulkIteration(IterationDescriptor descr) {


// ----------------------------- create the iteration tail ------------------------------

final PlanNode rootOfTerminationCriterion = bulkNode.getRootOfTerminationCriterion();
final PlanNode rootOfStepFunction = bulkNode.getRootOfStepFunction();
final TaskConfig tailConfig;

JobTaskVertex rootOfStepFunctionVertex = (JobTaskVertex) this.vertices.get(rootOfStepFunction);
if (rootOfStepFunctionVertex == null) {
// last op is chained
Expand All @@ -1192,23 +1204,76 @@ private void finalizeBulkIteration(IterationDescriptor descr) {
} else {
tailConfig = new TaskConfig(rootOfStepFunctionVertex.getConfiguration());
}
rootOfStepFunctionVertex.setTaskClass(IterationTailPactTask.class);

tailConfig.setIsWorksetUpdate();
tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);

// create the fake output task
JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
fakeTail.setOutputClass(FakeOutputTask.class);
fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
fakeTail.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance());
this.auxVertices.add(fakeTail);
// No following termination criterion
if(rootOfStepFunction.getOutgoingChannels().isEmpty()) {

rootOfStepFunctionVertex.setTaskClass(IterationTailPactTask.class);

tailConfig.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
tailConfig.addOutputShipStrategy(ShipStrategyType.FORWARD);

// create the fake output task
JobOutputVertex fakeTail = new JobOutputVertex("Fake Tail", this.jobGraph);
fakeTail.setOutputClass(FakeOutputTask.class);
fakeTail.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
fakeTail.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance());
this.auxVertices.add(fakeTail);

// connect the fake tail
try {
rootOfStepFunctionVertex.connectTo(fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
} catch (JobGraphDefinitionException e) {
throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task");
}

}

// connect the fake tail
try {
rootOfStepFunctionVertex.connectTo(fakeTail, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
} catch (JobGraphDefinitionException e) {
throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task");

// create the fake output task for termination criterion, if needed
final TaskConfig tailConfigOfTerminationCriterion;
// If we have a termination criterion and it is not an intermediate node
if(rootOfTerminationCriterion != null && rootOfTerminationCriterion.getOutgoingChannels().isEmpty()) {
JobTaskVertex rootOfTerminationCriterionVertex = (JobTaskVertex) this.vertices.get(rootOfTerminationCriterion);


if (rootOfTerminationCriterionVertex == null) {
// last op is chained
final TaskInChain taskInChain = this.chainedTasks.get(rootOfTerminationCriterion);
if (taskInChain == null) {
throw new CompilerException("Bug: Tail of termination criterion not found as vertex or chained task.");
}
rootOfTerminationCriterionVertex = (JobTaskVertex) taskInChain.getContainingVertex();

// the fake channel is statically typed to pact record. no data is sent over this channel anyways.
tailConfigOfTerminationCriterion = taskInChain.getTaskConfig();
} else {
tailConfigOfTerminationCriterion = new TaskConfig(rootOfTerminationCriterionVertex.getConfiguration());
}

rootOfTerminationCriterionVertex.setTaskClass(IterationTailPactTask.class);
// Hack
tailConfigOfTerminationCriterion.setIsSolutionSetUpdate();
tailConfigOfTerminationCriterion.setOutputSerializer(bulkNode.getSerializerForIterationChannel());
tailConfigOfTerminationCriterion.addOutputShipStrategy(ShipStrategyType.FORWARD);

JobOutputVertex fakeTailTerminationCriterion = new JobOutputVertex("Fake Tail for Termination Criterion", this.jobGraph);
fakeTailTerminationCriterion.setOutputClass(FakeOutputTask.class);
fakeTailTerminationCriterion.setNumberOfSubtasks(headVertex.getNumberOfSubtasks());
fakeTailTerminationCriterion.setNumberOfSubtasksPerInstance(headVertex.getNumberOfSubtasksPerInstance());
this.auxVertices.add(fakeTailTerminationCriterion);

// connect the fake tail
try {
rootOfTerminationCriterionVertex.connectTo(fakeTailTerminationCriterion, ChannelType.INMEMORY, DistributionPattern.POINTWISE);
} catch (JobGraphDefinitionException e) {
throw new CompilerException("Bug: Cannot connect iteration tail vertex fake tail task for termination criterion");
}

// tell the head that it needs to wait for the solution set updates
headConfig.setWaitForSolutionSetUpdate();
}

// ------------------- register the aggregators -------------------
Expand Down
Loading

0 comments on commit 2f1050f

Please sign in to comment.