Skip to content

Commit

Permalink
fix push_down_projection push redundant columns. (apache#4487)
Browse files Browse the repository at this point in the history
* fix `push_down_projection` push redundant columns.

* resolve problem `multiple_or_predicates`
  • Loading branch information
jackwener committed Dec 6, 2022
1 parent 740a4fa commit d9a47d6
Show file tree
Hide file tree
Showing 10 changed files with 51 additions and 57 deletions.
2 changes: 1 addition & 1 deletion benchmarks/expected-plans/q11.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ Sort: value DESC NULLS FIRST
Aggregate: groupBy=[[]], aggr=[[SUM(CAST(partsupp.ps_supplycost AS Decimal128(26, 2)) * CAST(partsupp.ps_availqty AS Decimal128(26, 2)))]]
Inner Join: supplier.s_nationkey = nation.n_nationkey
Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_availqty, ps_supplycost]
TableScan: partsupp projection=[ps_suppkey, ps_availqty, ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_nationkey]
Filter: nation.n_name = Utf8("GERMANY")
TableScan: nation projection=[n_nationkey, n_name]
7 changes: 3 additions & 4 deletions benchmarks/expected-plans/q15.txt
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ Sort: supplier.s_suppkey ASC NULLS LAST
SubqueryAlias: revenue0
Projection: supplier_no, total_revenue
Projection: lineitem.l_suppkey AS supplier_no, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount) AS total_revenue
Projection: lineitem.l_suppkey, SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)
Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
Aggregate: groupBy=[[lineitem.l_suppkey]], aggr=[[SUM(CAST(lineitem.l_extendedprice AS Decimal128(38, 4)) * CAST(Decimal128(Some(100),23,2) - CAST(lineitem.l_discount AS Decimal128(23, 2)) AS Decimal128(38, 4))) AS SUM(lineitem.l_extendedprice * Int64(1) - lineitem.l_discount)]]
Filter: lineitem.l_shipdate >= Date32("9496") AND lineitem.l_shipdate < Date32("9587")
TableScan: lineitem projection=[l_suppkey, l_extendedprice, l_discount, l_shipdate]
SubqueryAlias: __sq_1
Projection: MAX(revenue0.total_revenue) AS __value
Aggregate: groupBy=[[]], aggr=[[MAX(revenue0.total_revenue)]]
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/expected-plans/q2.txt
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ Sort: supplier.s_acctbal DESC NULLS FIRST, nation.n_name ASC NULLS LAST, supplie
Inner Join: supplier.s_nationkey = nation.n_nationkey
Inner Join: partsupp.ps_suppkey = supplier.s_suppkey
TableScan: partsupp projection=[ps_partkey, ps_suppkey, ps_supplycost]
TableScan: supplier projection=[s_suppkey, s_name, s_address, s_nationkey, s_phone, s_acctbal, s_comment]
TableScan: nation projection=[n_nationkey, n_name, n_regionkey]
TableScan: supplier projection=[s_suppkey, s_nationkey]
TableScan: nation projection=[n_nationkey, n_regionkey]
Filter: region.r_name = Utf8("EUROPE")
TableScan: region projection=[r_regionkey, r_name]
6 changes: 3 additions & 3 deletions datafusion/core/tests/sql/explain_analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ async fn explain_analyze_baseline_metrics() {
);
assert_metrics!(
&formatted,
"SortExec: [c1@1 ASC NULLS LAST]",
"SortExec: [c1@0 ASC NULLS LAST]",
"metrics=[output_rows=5, elapsed_compute="
);
assert_metrics!(
Expand Down Expand Up @@ -573,7 +573,7 @@ async fn csv_explain_verbose_plans() {
// Since the plan contains path that are environmentally
// dependant(e.g. full path of the test file), only verify
// important content
assert_contains!(&actual, "logical_plan after projection_push_down");
assert_contains!(&actual, "logical_plan after push_down_projection");
assert_contains!(&actual, "physical_plan");
assert_contains!(&actual, "FilterExec: c2@1 > 10");
assert_contains!(actual, "ProjectionExec: expr=[c1@0 as c1]");
Expand Down Expand Up @@ -744,7 +744,7 @@ async fn test_physical_plan_display_indent_multi_children() {
" RepartitionExec: partitioning=Hash([Column { name: \"c2\", index: 0 }], 9000)",
" ProjectionExec: expr=[c1@0 as c2]",
" RepartitionExec: partitioning=RoundRobinBatch(9000)",
" CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1, c2]",
" CsvExec: files={1 group: [[ARROW_TEST_DATA/csv/aggregate_test_100.csv]]}, has_header=true, limit=None, projection=[c1]",
];

let normalizer = ExplainNormalizer::new();
Expand Down
15 changes: 8 additions & 7 deletions datafusion/core/tests/sql/joins.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1635,13 +1635,14 @@ async fn reduce_left_join_3() -> Result<()> {
let expected = vec![
"Explain [plan_type:Utf8, plan:Utf8]",
" Projection: t3.t1_id, t3.t1_name, t3.t1_int, t2.t2_id, t2.t2_name, t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Left Join: t3.t1_int = t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" SubqueryAlias: t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Filter: t1.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" Filter: t2.t2_int < UInt32(3) AND t2.t2_id < UInt32(100) [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" Left Join: t3.t1_int = t2.t2_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
" SubqueryAlias: t3 [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" Projection: t1.t1_id, t1.t1_name, t1.t1_int [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" Inner Join: t1.t1_id = t2.t2_id [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N, t2_id:UInt32;N, t2_int:UInt32;N]",
" Filter: t1.t1_id < UInt32(100) [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" TableScan: t1 projection=[t1_id, t1_name, t1_int] [t1_id:UInt32;N, t1_name:Utf8;N, t1_int:UInt32;N]",
" Filter: t2.t2_int < UInt32(3) AND t2.t2_id < UInt32(100) [t2_id:UInt32;N, t2_int:UInt32;N]",
" TableScan: t2 projection=[t2_id, t2_int] [t2_id:UInt32;N, t2_int:UInt32;N]",
" TableScan: t2 projection=[t2_id, t2_name, t2_int] [t2_id:UInt32;N, t2_name:Utf8;N, t2_int:UInt32;N]",
];
let formatted = plan.display_indent_schema().to_string();
Expand Down
Loading

0 comments on commit d9a47d6

Please sign in to comment.