Skip to content

Commit

Permalink
Enabled plan visualization for broadcast variables (quick fix visuali…
Browse files Browse the repository at this point in the history
…zed broadcast inputs like regular inputs)
  • Loading branch information
StephanEwen committed Feb 27, 2014
1 parent 762d506 commit 4180994
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -274,23 +274,17 @@ public void setBroadcastInputs(Map<Operator, OptimizerNode> operatorToNode) thro

@Override
public Iterator<OptimizerNode> getPredecessors() {
final Iterator<PactConnection> inputs = getIncomingConnections().iterator();
return new Iterator<OptimizerNode>() {
@Override
public boolean hasNext() {
return inputs.hasNext();
}

@Override
public OptimizerNode next() {
return inputs.next().getSource();
}

@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
List<OptimizerNode> allPredecessors = new ArrayList<OptimizerNode>();

for (Iterator<PactConnection> inputs = getIncomingConnections().iterator(); inputs.hasNext(); ){
allPredecessors.add(inputs.next().getSource());
}

for (PactConnection conn : getBroadcastConnections()) {
allPredecessors.add(conn.getSource());
}

return allPredecessors.iterator();
}

/**
Expand Down Expand Up @@ -1222,9 +1216,13 @@ public PlanNode getPlanNode() {
}

@Override
@SuppressWarnings("unchecked")
public Iterator<DumpableConnection<OptimizerNode>> getDumpableInputs() {
return (Iterator<DumpableConnection<OptimizerNode>>) (Iterator<?>) getIncomingConnections().iterator();
List<DumpableConnection<OptimizerNode>> allInputs = new ArrayList<DumpableConnection<OptimizerNode>>();

allInputs.addAll(getIncomingConnections());
allInputs.addAll(getBroadcastConnections());

return allInputs.iterator();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@

package eu.stratosphere.compiler.plan;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

import eu.stratosphere.api.common.operators.util.FieldList;
Expand Down Expand Up @@ -202,8 +204,8 @@ public void setCosts(Costs nodeCosts) {
// get the cumulative costs of the last joined branching node
for (OptimizerNode joinedBrancher : this.template.getJoinedBranchers()) {
PlanNode lastCommonChild = this.input1.getSource().branchPlan.get(joinedBrancher);
Costs douleCounted = lastCommonChild.getCumulativeCosts();
getCumulativeCosts().subtractCosts(douleCounted);
Costs doubleCounted = lastCommonChild.getCumulativeCosts();
getCumulativeCosts().subtractCosts(doubleCounted);
}
}

Expand All @@ -225,28 +227,41 @@ public void accept(Visitor<PlanNode> visitor) {

@Override
public Iterator<PlanNode> getPredecessors() {
return new Iterator<PlanNode>() {
private int hasLeft = 2;
@Override
public boolean hasNext() {
return this.hasLeft > 0;
}
@Override
public PlanNode next() {
if (this.hasLeft == 2) {
this.hasLeft = 1;
return DualInputPlanNode.this.input1.getSource();
} else if (this.hasLeft == 1) {
this.hasLeft = 0;
return DualInputPlanNode.this.input2.getSource();
} else
throw new NoSuchElementException();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) {
return new Iterator<PlanNode>() {
private int hasLeft = 2;
@Override
public boolean hasNext() {
return this.hasLeft > 0;
}
@Override
public PlanNode next() {
if (this.hasLeft == 2) {
this.hasLeft = 1;
return DualInputPlanNode.this.input1.getSource();
} else if (this.hasLeft == 1) {
this.hasLeft = 0;
return DualInputPlanNode.this.input2.getSource();
} else
throw new NoSuchElementException();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
} else {
List<PlanNode> preds = new ArrayList<PlanNode>();

preds.add(input1.getSource());
preds.add(input2.getSource());

for (Channel c : getBroadcastInputs()) {
preds.add(c.getSource());
}
};

return preds.iterator();
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -341,9 +341,18 @@ public PlanNode getPlanNode() {


@Override
@SuppressWarnings("unchecked")
public Iterator<DumpableConnection<PlanNode>> getDumpableInputs() {
return (Iterator<DumpableConnection<PlanNode>>) (Iterator<?>) getInputs();
List<DumpableConnection<PlanNode>> allInputs = new ArrayList<DumpableConnection<PlanNode>>();

for (Iterator<Channel> inputs = getInputs(); inputs.hasNext();) {
allInputs.add(inputs.next());
}

for (NamedChannel c : getBroadcastInputs()) {
allInputs.add(c);
}

return allInputs.iterator();
}

public static enum SourceAndDamReport {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@

package eu.stratosphere.compiler.plan;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;

import eu.stratosphere.api.common.operators.util.FieldList;
Expand Down Expand Up @@ -145,25 +147,38 @@ public void accept(Visitor<PlanNode> visitor) {

@Override
public Iterator<PlanNode> getPredecessors() {
return new Iterator<PlanNode>() {
private boolean hasLeft = true;
@Override
public boolean hasNext() {
return this.hasLeft;
}
@Override
public PlanNode next() {
if (this.hasLeft) {
this.hasLeft = false;
return SingleInputPlanNode.this.input.getSource();
} else
throw new NoSuchElementException();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
if (getBroadcastInputs() == null || getBroadcastInputs().isEmpty()) {
return new Iterator<PlanNode>() {
private boolean hasLeft = true;
@Override
public boolean hasNext() {
return this.hasLeft;
}
@Override
public PlanNode next() {
if (this.hasLeft) {
this.hasLeft = false;
return SingleInputPlanNode.this.input.getSource();
} else
throw new NoSuchElementException();
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
};
}
else {
List<PlanNode> preds = new ArrayList<PlanNode>();

preds.add(input.getSource());

for (Channel c : getBroadcastInputs()) {
preds.add(c.getSource());
}
};

return preds.iterator();
}
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
/**
*
*/
public interface DumpableNode<T extends DumpableNode<T>>
{
public interface DumpableNode<T extends DumpableNode<T>> {
/**
* Gets an iterator over the predecessors.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import eu.stratosphere.api.java.record.operators.MapOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator;
import eu.stratosphere.api.java.record.operators.ReduceOperator.Combinable;
import eu.stratosphere.client.LocalExecutor;
import eu.stratosphere.configuration.Configuration;
import eu.stratosphere.types.DoubleValue;
import eu.stratosphere.types.IntValue;
Expand Down Expand Up @@ -311,4 +312,8 @@ public void writeRecord(Record record) throws IOException {
this.stream.write(bytes);
}
}

public static void main(String[] args) throws Exception {
System.out.println(LocalExecutor.optimizerPlanAsJSON(new KMeans().getPlan("4", "/dev/random", "/dev/random", "/tmp", "20")));
}
}

0 comments on commit 4180994

Please sign in to comment.