Skip to content

Commit

Permalink
Implement distributed planning as an optimizer
Browse files Browse the repository at this point in the history
Add a new optimizer (AddExchanges) that introduces Exchange node
at the right places in the logical plan and determines partitioning
and placement requirements.

This also adds basic partition awareness, which can result in better plans:

E.g.,

 SELECT custkey, sum(t)
 FROM (
   SELECT custkey, count(*) t
   FROM orders
   GROUP BY custkey
 )
 GROUP BY custkey

Before:

 Fragment 3 [SINGLE]
     Output layout: [custkey, sum]
     - Output[custkey, _col1] => [custkey:bigint, sum:bigint]
             _col1 := sum
         - RemoteSource[2] => [custkey:bigint, sum:bigint]

 Fragment 2 [FIXED]
     Output layout: [custkey, sum]
     - Aggregate(FINAL)[custkey] => [custkey:bigint, sum:bigint]
             sum := "sum"("sum_18")
         - RemoteSource[1] => [custkey:bigint, sum_18:bigint]

 Fragment 1 [FIXED]
     Output layout: [custkey, sum_18]
     Output partitioning: [custkey]
     - Aggregate(PARTIAL)[custkey] => [custkey:bigint, sum_18:bigint]
             sum_18 := "sum"("count")
         - Aggregate(FINAL)[custkey] => [custkey:bigint, count:bigint]
                 count := "count"("count_17")
             - RemoteSource[0] => [custkey:bigint, count_17:bigint]

 Fragment 0 [SOURCE]
     Output layout: [custkey, count_17]
     Output partitioning: [custkey]
     - Aggregate(PARTIAL)[custkey] => [custkey:bigint, count_17:bigint]
             count_17 := "count"(*)
         - TableScan[tpch:tpch:orders:sf1.0, original constraint=true] => [custkey:bigint]
                 custkey := tpch:tpch:custkey:1

After:

 Fragment 2 [SINGLE]
     Output layout: [custkey, sum]
     - Output[custkey, _col1] => [custkey:bigint, sum:bigint]
             _col1 := sum
         - RemoteSource[1] => [custkey:bigint, sum:bigint]

 Fragment 1 [FIXED]
     Output layout: [custkey, sum]
     - Aggregate[custkey] => [custkey:bigint, sum:bigint]
             sum := "sum"("count")
         - Aggregate(FINAL)[custkey] => [custkey:bigint, count:bigint]
                 count := "count"("count_17")
             - RemoteSource[0] => [custkey:bigint, count_17:bigint]

 Fragment 0 [SOURCE]
     Output layout: [custkey, count_17]
     Output partitioning: [custkey]
     - Aggregate(PARTIAL)[custkey] => [custkey:bigint, count_17:bigint]
             count_17 := "count"(*)
         - TableScan[tpch:tpch:orders:sf1.0, original constraint=true] => [custkey:bigint]
                 custkey := tpch:tpch:custkey:1
  • Loading branch information
martint committed Jan 6, 2015
1 parent 89d82b6 commit 505098d
Show file tree
Hide file tree
Showing 17 changed files with 1,284 additions and 809 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,6 @@ public class CreateViewTask
private final SqlParser sqlParser;
private final List<PlanOptimizer> planOptimizers;
private final boolean experimentalSyntaxEnabled;
private final boolean distributedIndexJoinsEnabled;
private final boolean distributedJoinsEnabled;

@Inject
public CreateViewTask(JsonCodec<ViewDefinition> codec, SqlParser sqlParser, List<PlanOptimizer> planOptimizers, FeaturesConfig featuresConfig)
Expand All @@ -61,8 +59,6 @@ public CreateViewTask(JsonCodec<ViewDefinition> codec, SqlParser sqlParser, List
this.planOptimizers = ImmutableList.copyOf(checkNotNull(planOptimizers, "planOptimizers is null"));
checkNotNull(featuresConfig, "featuresConfig is null");
this.experimentalSyntaxEnabled = featuresConfig.isExperimentalSyntaxEnabled();
this.distributedIndexJoinsEnabled = featuresConfig.isDistributedIndexJoinsEnabled();
this.distributedJoinsEnabled = featuresConfig.isDistributedJoinsEnabled();
}

@Override
Expand All @@ -86,7 +82,7 @@ public void execute(CreateView statement, Session session, Metadata metadata, Qu

public Analysis analyzeStatement(Statement statement, Session session, Metadata metadata)
{
QueryExplainer explainer = new QueryExplainer(session, planOptimizers, metadata, sqlParser, experimentalSyntaxEnabled, distributedIndexJoinsEnabled, distributedJoinsEnabled);
QueryExplainer explainer = new QueryExplainer(session, planOptimizers, metadata, sqlParser, experimentalSyntaxEnabled);
Analyzer analyzer = new Analyzer(session, metadata, sqlParser, Optional.of(explainer), experimentalSyntaxEnabled);
return analyzer.analyze(statement);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@
import com.facebook.presto.sql.analyzer.QueryExplainer;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.DistributedExecutionPlanner;
import com.facebook.presto.sql.planner.DistributedLogicalPlanner;
import com.facebook.presto.sql.planner.InputExtractor;
import com.facebook.presto.sql.planner.LogicalPlanner;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.PlanFragmenter;
import com.facebook.presto.sql.planner.PlanNodeIdAllocator;
import com.facebook.presto.sql.planner.StageExecutionPlan;
import com.facebook.presto.sql.planner.SubPlan;
Expand Down Expand Up @@ -135,7 +135,7 @@ public SqlQueryExecution(QueryId queryId,
checkNotNull(self, "self is null");
this.stateMachine = new QueryStateMachine(queryId, query, session, self, queryExecutor);

this.queryExplainer = new QueryExplainer(session, planOptimizers, metadata, sqlParser, experimentalSyntaxEnabled, distributedIndexJoinsEnabled, distributedJoinsEnabled);
this.queryExplainer = new QueryExplainer(session, planOptimizers, metadata, sqlParser, experimentalSyntaxEnabled);
}
}

Expand Down Expand Up @@ -215,7 +215,7 @@ private SubPlan doAnalyzeQuery()
stateMachine.setInputs(inputs);

// fragment the plan
SubPlan subplan = new DistributedLogicalPlanner(session, metadata, idAllocator).createSubPlans(plan, false, distributedIndexJoinsEnabled, distributedJoinsEnabled);
SubPlan subplan = new PlanFragmenter().createSubPlans(plan);

stateMachine.recordAnalysisTime(analysisStart);
return subplan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
import com.facebook.presto.Session;
import com.facebook.presto.metadata.Metadata;
import com.facebook.presto.sql.parser.SqlParser;
import com.facebook.presto.sql.planner.DistributedLogicalPlanner;
import com.facebook.presto.sql.planner.LogicalPlanner;
import com.facebook.presto.sql.planner.Plan;
import com.facebook.presto.sql.planner.PlanFragmenter;
import com.facebook.presto.sql.planner.PlanNodeIdAllocator;
import com.facebook.presto.sql.planner.PlanPrinter;
import com.facebook.presto.sql.planner.SubPlan;
Expand All @@ -38,25 +38,19 @@ public class QueryExplainer
private final Metadata metadata;
private final SqlParser sqlParser;
private final boolean experimentalSyntaxEnabled;
private final boolean distributedIndexJoinsEnabled;
private final boolean distributedJoinsEnabled;

public QueryExplainer(
Session session,
List<PlanOptimizer> planOptimizers,
Metadata metadata,
SqlParser sqlParser,
boolean experimentalSyntaxEnabled,
boolean distributedIndexJoinsEnabled,
boolean distributedJoinsEnabled)
boolean experimentalSyntaxEnabled)
{
this.session = checkNotNull(session, "session is null");
this.planOptimizers = checkNotNull(planOptimizers, "planOptimizers is null");
this.metadata = checkNotNull(metadata, "metadata is null");
this.sqlParser = checkNotNull(sqlParser, "sqlParser is null");
this.experimentalSyntaxEnabled = experimentalSyntaxEnabled;
this.distributedIndexJoinsEnabled = distributedIndexJoinsEnabled;
this.distributedJoinsEnabled = distributedJoinsEnabled;
}

public String getPlan(Statement statement, ExplainType.Type planType)
Expand Down Expand Up @@ -116,6 +110,6 @@ private SubPlan getDistributedPlan(Statement statement)
LogicalPlanner logicalPlanner = new LogicalPlanner(session, planOptimizers, idAllocator, metadata);
Plan plan = logicalPlanner.plan(analysis);

return new DistributedLogicalPlanner(session, metadata, idAllocator).createSubPlans(plan, false, distributedIndexJoinsEnabled, distributedJoinsEnabled);
return new PlanFragmenter().createSubPlans(plan);
}
}
Loading

0 comments on commit 505098d

Please sign in to comment.