Skip to content

Commit

Permalink
Remove SinkNode
Browse files Browse the repository at this point in the history
It exists for historical reasons. Output from plan nodes was purely logical,
so SinkNodes were used to enforce the shape of the output across distribution
boundaries (similar to OutputNode).

This change moves the layout tracking to the plan fragment and the enforcement
to the physical execution planner.
  • Loading branch information
martint committed Jan 6, 2015
1 parent fca4c31 commit 400ed75
Show file tree
Hide file tree
Showing 20 changed files with 96 additions and 247 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@
import com.facebook.presto.sql.planner.plan.PlanFragmentId;
import com.facebook.presto.sql.planner.plan.PlanNode;
import com.facebook.presto.sql.planner.plan.PlanNodeId;
import com.facebook.presto.sql.planner.plan.SinkNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Functions;
import com.google.common.base.Throwables;
Expand Down Expand Up @@ -1000,7 +999,6 @@ private static Optional<Integer> getHashChannel(PlanFragment fragment)
private static List<Integer> getPartitioningChannels(PlanFragment fragment)
{
checkState(fragment.getOutputPartitioning() == OutputPartitioning.HASH, "fragment is not hash partitioned");
checkState(fragment.getRoot() instanceof SinkNode, "root is not an instance of SinkNode");
// We can convert the symbols directly into channels, because the root must be a sink and therefore the layout is fixed
return fragment.getPartitionBy().stream()
.map(symbol -> fragment.getRoot().getOutputSymbols().indexOf(symbol))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ private SqlTaskExecution(
try (SetThreadName ignored = new SetThreadName("Task-%s", taskId)) {
List<DriverFactory> driverFactories;
try {
LocalExecutionPlan localExecutionPlan = planner.plan(taskContext.getSession(), fragment.getRoot(), fragment.getSymbols(), new TaskOutputFactory(sharedBuffer));
LocalExecutionPlan localExecutionPlan = planner.plan(taskContext.getSession(), fragment.getRoot(), fragment.getOutputLayout(), fragment.getSymbols(), new TaskOutputFactory(sharedBuffer));
driverFactories = localExecutionPlan.getDriverFactories();
}
catch (Throwable e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.SampleNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
import com.facebook.presto.sql.planner.plan.SinkNode;
import com.facebook.presto.sql.planner.plan.SortNode;
import com.facebook.presto.sql.planner.plan.TableCommitNode;
import com.facebook.presto.sql.planner.plan.TableScanNode;
Expand Down Expand Up @@ -246,12 +245,6 @@ public Optional<SplitSource> visitSort(SortNode node, Void context)
return node.getSource().accept(this, context);
}

@Override
public Optional<SplitSource> visitSink(SinkNode node, Void context)
{
return node.getSource().accept(this, context);
}

@Override
public Optional<SplitSource> visitTableWriter(TableWriterNode node, Void context)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
import com.facebook.presto.sql.planner.plan.RowNumberNode;
import com.facebook.presto.sql.planner.plan.SampleNode;
import com.facebook.presto.sql.planner.plan.SemiJoinNode;
import com.facebook.presto.sql.planner.plan.SinkNode;
import com.facebook.presto.sql.planner.plan.SortNode;
import com.facebook.presto.sql.planner.plan.TableCommitNode;
import com.facebook.presto.sql.planner.plan.TableScanNode;
Expand Down Expand Up @@ -174,22 +173,18 @@ public SubPlanBuilder visitMarkDistinct(MarkDistinctNode node, Void context)
return current;
}
else {
PlanNode sink = new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols());
current.setHashOutputPartitioning(node.getDistinctSymbols(), hashSymbol);

current.setRoot(sink)
.setHashOutputPartitioning(node.getDistinctSymbols(), hashSymbol);

PlanNode exchange = new ExchangeNode(idAllocator.getNextId(), current.getId(), sink.getOutputSymbols());
PlanNode exchange = new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols());
MarkDistinctNode markNode = new MarkDistinctNode(idAllocator.getNextId(), exchange, node.getMarkerSymbol(), node.getDistinctSymbols(), hashSymbol);

return createFixedDistributionPlan(markNode)
.addChild(current.build());
}
}

private SubPlanBuilder addSingleNodeAggregation(SubPlanBuilder plan, Map<Symbol, FunctionCall> aggregations, Map<Symbol, Signature> functions, Map<Symbol, Symbol> masks, List<Symbol> groupBy, Optional<Symbol> sampleWeight, double confidence, Optional<Symbol> hashSymbol)
{
plan.setRoot(new SinkNode(idAllocator.getNextId(), plan.getRoot(), plan.getRoot().getOutputSymbols()));

// create aggregation plan
ExchangeNode source = new ExchangeNode(idAllocator.getNextId(), plan.getId(), plan.getRoot().getOutputSymbols());
AggregationNode aggregation = new AggregationNode(idAllocator.getNextId(), source, groupBy, aggregations, functions, masks, SINGLE, sampleWeight, confidence, hashSymbol);
Expand Down Expand Up @@ -221,7 +216,7 @@ private SubPlanBuilder addDistributedAggregation(SubPlanBuilder plan, Map<Symbol

// create partial aggregation plan
AggregationNode partialAggregation = new AggregationNode(idAllocator.getNextId(), plan.getRoot(), groupBy, intermediateCalls, intermediateFunctions, intermediateMask, PARTIAL, sampleWeight, confidence, hashSymbol);
plan.setRoot(new SinkNode(idAllocator.getNextId(), partialAggregation, partialAggregation.getOutputSymbols()));
plan.setRoot(partialAggregation);

// create final aggregation plan
ExchangeNode source = new ExchangeNode(idAllocator.getNextId(), plan.getId(), plan.getRoot().getOutputSymbols());
Expand All @@ -246,8 +241,6 @@ public SubPlanBuilder visitWindow(WindowNode node, Void context)

if (current.isDistributed()) {
List<Symbol> partitionedBy = node.getPartitionBy();
current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols()));

ExchangeNode source = new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols());
if (partitionedBy.isEmpty()) {
// create a new non-partitioned fragment
Expand All @@ -272,8 +265,6 @@ public SubPlanBuilder visitRowNumber(RowNumberNode node, Void context)
SubPlanBuilder current = node.getSource().accept(this, context);
if (current.isDistributed()) {
List<Symbol> partitionedBy = node.getPartitionBy();
current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols()));

ExchangeNode source = new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols());
if (node.getPartitionBy().isEmpty()) {
current = createSingleNodePlan(source).addChild(current.build());
Expand Down Expand Up @@ -304,7 +295,6 @@ public SubPlanBuilder visitTopNRowNumber(TopNRowNumberNode node, Void context)
node.getMaxRowCountPerPartition(),
true,
node.getHashSymbol()));
current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols()));
ExchangeNode source = new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols());
TopNRowNumberNode merge = new TopNRowNumberNode(node.getId(),
source,
Expand Down Expand Up @@ -373,8 +363,6 @@ public SubPlanBuilder visitTopN(TopNNode node, Void context)
current.setRoot(new TopNNode(node.getId(), current.getRoot(), node.getCount(), node.getOrderBy(), node.getOrderings(), partial));

if (current.isDistributed()) {
current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols()));

// create merge plan fragment
PlanNode source = new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols());
TopNNode merge = new TopNNode(idAllocator.getNextId(), source, node.getCount(), node.getOrderBy(), node.getOrderings(), false);
Expand All @@ -391,8 +379,6 @@ public SubPlanBuilder visitSort(SortNode node, Void context)
SubPlanBuilder current = node.getSource().accept(this, context);

if (current.isDistributed()) {
current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols()));

// create a new non-partitioned fragment
current = createSingleNodePlan(new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols()))
.addChild(current.build());
Expand All @@ -409,8 +395,6 @@ public SubPlanBuilder visitOutput(OutputNode node, Void context)
SubPlanBuilder current = node.getSource().accept(this, context);

if (current.isDistributed()) {
current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols()));

// create a new non-partitioned fragment
current = createSingleNodePlan(new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols()))
.addChild(current.build());
Expand All @@ -429,8 +413,6 @@ public SubPlanBuilder visitLimit(LimitNode node, Void context)
current.setRoot(new LimitNode(node.getId(), current.getRoot(), node.getCount()));

if (current.isDistributed()) {
current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols()));

// create merge plan fragment
PlanNode source = new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols());
LimitNode merge = new LimitNode(idAllocator.getNextId(), source, node.getCount());
Expand All @@ -449,8 +431,6 @@ public SubPlanBuilder visitDistinctLimit(DistinctLimitNode node, Void context)
current.setRoot(new DistinctLimitNode(node.getId(), current.getRoot(), node.getLimit(), node.getHashSymbol()));

if (current.isDistributed()) {
current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols()));

PlanNode source = new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols());
DistinctLimitNode merge = new DistinctLimitNode(idAllocator.getNextId(), source, node.getLimit(), node.getHashSymbol());
current = createSingleNodePlan(merge).addChild(current.build());
Expand Down Expand Up @@ -484,8 +464,6 @@ public SubPlanBuilder visitTableCommit(TableCommitNode node, Void context)
SubPlanBuilder current = node.getSource().accept(this, context);

if (current.getDistribution() != PlanDistribution.COORDINATOR_ONLY && !createSingleNodePlan) {
current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols()));

// create a new non-partitioned fragment to run on the coordinator
current = createCoordinatorOnlyPlan(new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols()))
.addChild(current.build());
Expand All @@ -509,7 +487,6 @@ public SubPlanBuilder visitJoin(JoinNode node, Void context)
switch (node.getType()) {
case INNER:
case LEFT:
right.setRoot(new SinkNode(idAllocator.getNextId(), right.getRoot(), right.getRoot().getOutputSymbols()));
if (distributedJoins) {
right.setHashOutputPartitioning(rightSymbols, node.getRightHashSymbol());
left = hashDistributeSubplan(left, leftSymbols, node.getLeftHashSymbol());
Expand All @@ -523,7 +500,6 @@ public SubPlanBuilder visitJoin(JoinNode node, Void context)

return left;
case RIGHT:
left.setRoot(new SinkNode(idAllocator.getNextId(), left.getRoot(), left.getRoot().getOutputSymbols()));
if (distributedJoins) {
left.setHashOutputPartitioning(leftSymbols, node.getLeftHashSymbol());
right = hashDistributeSubplan(right, rightSymbols, node.getRightHashSymbol());
Expand All @@ -549,13 +525,11 @@ public SubPlanBuilder visitJoin(JoinNode node, Void context)

public SubPlanBuilder hashDistributeSubplan(SubPlanBuilder subPlan, List<Symbol> symbols, Optional<Symbol> hashSymbol)
{
PlanNode sink = new SinkNode(idAllocator.getNextId(), subPlan.getRoot(), subPlan.getRoot().getOutputSymbols());
subPlan.setRoot(sink).setHashOutputPartitioning(symbols, hashSymbol);
subPlan.setHashOutputPartitioning(symbols, hashSymbol);

PlanNode exchange = new ExchangeNode(idAllocator.getNextId(), subPlan.getId(), sink.getOutputSymbols());
subPlan = createFixedDistributionPlan(exchange)
PlanNode exchange = new ExchangeNode(idAllocator.getNextId(), subPlan.getId(), subPlan.getRoot().getOutputSymbols());
return createFixedDistributionPlan(exchange)
.addChild(subPlan.build());
return subPlan;
}

@Override
Expand All @@ -565,7 +539,6 @@ public SubPlanBuilder visitSemiJoin(SemiJoinNode node, Void context)
SubPlanBuilder filteringSource = node.getFilteringSource().accept(this, context);

if (source.isDistributed() || filteringSource.isDistributed()) {
filteringSource.setRoot(new SinkNode(idAllocator.getNextId(), filteringSource.getRoot(), filteringSource.getRoot().getOutputSymbols()));
source.setRoot(new SemiJoinNode(node.getId(),
source.getRoot(),
new ExchangeNode(idAllocator.getNextId(), filteringSource.getId(), filteringSource.getRoot().getOutputSymbols()),
Expand Down Expand Up @@ -598,12 +571,9 @@ public SubPlanBuilder visitIndexJoin(IndexJoinNode node, Void context)
SubPlanBuilder current = node.getProbeSource().accept(this, context);

if (distributedIndexJoins && current.isDistributed()) {
PlanNode sink = new SinkNode(idAllocator.getNextId(), current.getRoot(), current.getRoot().getOutputSymbols());

current.setRoot(sink)
.setHashOutputPartitioning(Lists.transform(node.getCriteria(), IndexJoinNode.EquiJoinClause::getProbe), node.getProbeHashSymbol());
current.setHashOutputPartitioning(Lists.transform(node.getCriteria(), IndexJoinNode.EquiJoinClause::getProbe), node.getProbeHashSymbol());

PlanNode exchange = new ExchangeNode(idAllocator.getNextId(), current.getId(), sink.getOutputSymbols());
PlanNode exchange = new ExchangeNode(idAllocator.getNextId(), current.getId(), current.getRoot().getOutputSymbols());
IndexJoinNode indexJoinNode = new IndexJoinNode(node.getId(), node.getType(), exchange, node.getIndexSource(), node.getCriteria(), node.getProbeHashSymbol(), node.getIndexHashSymbol());
return createFixedDistributionPlan(indexJoinNode).addChild(current.build());
}
Expand All @@ -628,11 +598,14 @@ public SubPlanBuilder visitUnion(UnionNode node, Void context)
ImmutableList.Builder<SubPlan> sourceBuilder = ImmutableList.builder();
ImmutableList.Builder<PlanFragmentId> fragmentIdBuilder = ImmutableList.builder();
for (int i = 0; i < node.getSources().size(); i++) {
PlanNode subPlan = node.getSources().get(i);
SubPlanBuilder current = subPlan.accept(this, context);
current.setRoot(new SinkNode(idAllocator.getNextId(), current.getRoot(), node.sourceOutputLayout(i)));
fragmentIdBuilder.add(current.getId());
sourceBuilder.add(current.build());
SubPlan child = node.getSources()
.get(i)
.accept(this, context)
.setOutputLayout(node.sourceOutputLayout(i))
.build();

fragmentIdBuilder.add(child.getFragment().getId());
sourceBuilder.add(child);
}
ExchangeNode exchangeNode = new ExchangeNode(idAllocator.getNextId(), fragmentIdBuilder.build(), node.getOutputSymbols());
return createSingleNodePlan(exchangeNode)
Expand Down
Loading

0 comments on commit 400ed75

Please sign in to comment.