-
Notifications
You must be signed in to change notification settings - Fork 1.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
reimplement push_down_filter
to remove global-state
#4365
Conversation
07931c4
to
ee5c9c7
Compare
The rule re-implementation of this rule was complex and now is completed. Now all that remains is an equally complex |
I need to set aside time to review this PR carefully given how important FilterPushdown is -- I will try and find some in the upcoming week. |
.into_iter() | ||
.zip(on_or_to_left.1) | ||
.for_each(|(expr, cols)| left_state.filters.push((expr, cols))); | ||
let left = optimize(left, left_state)?; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Original implementation here is bottom-up, outside is top down. Now unify to top down
in all places.
ee5c9c7
to
0f32b0f
Compare
This week I have some time and I can take a closer look at this PR carefully. |
@ayushdg @sarahyurick It would be good to test the impact of this change in Dask SQL to make sure it doesn't cause any regressions for us. |
I only did a couple of trials but looks good on my end so far - I'm seeing a performance improvement with the changes in this PR over DF 14.0.0 |
.unwrap_or_else(Vec::new); | ||
|
||
if join.join_type == JoinType::Inner { | ||
// For inner joins, duplicate filters for joined columns so filters can be pushed down |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the same logic can apply to LeftSemi and RightSemi join ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And for Out joins:
For Left Out join, if we have a query SELECT t1_id, t1_name, t2_name FROM t1 LEFT JOIN t2 ON (t1_id = t2_id and t2_id >= 100);
t2_id >= 100
can be pushed down to the right side.
For Right Out join, if we have a query SELECT t1_id, t1_name, t2_name FROM t1 Right JOIN t2 ON (t1_id = t2_id and t1_id >= 100);
t1_id >= 100
can be pushed down to the left side.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think we can.
In fact, SEMI/ANTI JOIN is similar with FILTER
Current code is just for refactor, I don't include extra enhancement.
Related issue to track problems like this #4413
It seems that with the LogicalPlan structs, it is not that convenient to write pattern-matching and destructuring code, because of the Arc.
|
Why we use Arc to wrap all the inputs? Is it because of nested types and Rust does not support nested struct, must wrap the nested structs with Box or Arc ? |
// Convert both qualified and unqualified fields | ||
[ | ||
(field.name().clone(), expr.clone()), | ||
(field.qualified_name(), expr), | ||
] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand this is coming from the existing logic, but could it encounter conflicting unqualified field names here? For example a Projection contains both a.id, b.id
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, original code here exist bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
has resolved this bug, and add UT, thanks @mingmwang !
The original UT test_crossjoin_with_or_clause
can discover this bug there, but it bypassed it with alias.
d50b5d9
to
1f75d1d
Compare
I plan to test this PR against the IOx test suite later today |
Some part of the |
Regression is fixed in This PR, which is convenient for others test this PR. |
I plan to review the code and test failure in this PR later today |
@mingmwang look like alias can't be in groupby. sql 1999
pg:
|
3c02cab
to
842d503
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I found the source of the problem in IOx -- it is exposed but not caused by this change.
After reading through this PR carefully it is really nice work @jackwener 🏅
Thank you so much for the contribution and @mingmwang for the reviews
} | ||
push_down_all_join(predicates, filter.input(), left, right, vec![])? | ||
} | ||
LogicalPlan::TableScan(scan) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that the pushdown into scan could be done as part of the physical planning phase. However, since the current filter pushdown happens in the logical planning phase I think it is ok to keep the same behavior in this PR and move the pushdown in some other PR
You can try this:
|
#[test]
fn push_down_filter_groupby_expr_contains_alias() {
let sql = "SELECT (col_int32 + col_uint32) AS c, count(*) from test group by 1";
let plan = test_sql(sql).unwrap();
let expected = "Projection: test.col_int32 + test.col_uint32 AS c, COUNT(UInt8(1))\
\n Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS Int32)]], aggr=[[COUNT(UInt8(1))]]\
\n TableScan: test projection=[col_int32, col_uint32]";
assert_eq!(expected, format!("{:?}", plan));
} current implementation will put alias into projection expr run in integration test. exist bug #4430, run it need to remove bug rule. #[test]
#[ignore]
// TODO: UnwrapCastInComparison exist bug.
fn push_down_filter_groupby_expr_contains_alias() {
let sql = "SELECT * FROM (SELECT (col_int32 + col_uint32) AS c, count(*) FROM test GROUP BY 1) where c > 3";
let plan = test_sql(sql).unwrap();
let expected = "Projection: c, COUNT(UInt8(1))\
\n Projection: test.col_int32 + test.col_uint32 AS c, COUNT(UInt8(1))\
\n Aggregate: groupBy=[[test.col_int32 + CAST(test.col_uint32 AS Int32)]], aggr=[[COUNT(UInt8(1))]]\
\n Filter: CAST(test.col_int32 + test.col_uint32 AS Int64) > Int64(3)\
\n TableScan: test projection=[col_int32, col_uint32]";
assert_eq!(expected, format!("{:?}", plan));
} |
Ok, got it. |
Could you please also modify the UT
I think some of the logic in the rule like |
Before this PR, there is a global state which can help to avoid duplicate Filters been generated and pushed down. |
Has added it in UT. |
Except for the LogicalPlan::Window, the others LGTM. You can also add it in the following PR. |
extract_or_clauses_for_join(&to_keep.0, left.schema(), left_preserved); | ||
let or_to_right = | ||
extract_or_clauses_for_join(&to_keep.0, right.schema(), right_preserved); | ||
// TODO: we just get, but don't remove them from origin expr. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it still a TODO here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It existed originally. We can found it in tpch-q19.
Yes, I just add a comment, because I think it's a point that we can improve it in the future.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm afraid they can not be removed from the original exprs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think carefully, yes, we can't remove it.
All followup enhancement in #4433 |
for col in columns.iter() { | ||
for (l, r) in join.on.iter() { | ||
if col == l { | ||
join_cols_to_replace.insert(col, r); | ||
break; | ||
} else if col == r { | ||
join_cols_to_replace.insert(col, l); | ||
break; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is break
to return the inner loop. I think for join conditions: on(a.id = b.id and a.id = b.id2) where b.id = 10
, we should be able to infer more equality predicates.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is
break
to return the inner loop. I think for join conditions:on(a.id = b.id and a.id = b.id2) where b.id = 10
, we should be able to infer more equality predicates.
It should be common optimization, infer conditon
, Many rule about join
need to it.
It should be a future ticket, and I think it's complex, because infer condtion
sometime will cause bad effect
I think this PR has been outstanding enough and has had enough feedback and testing. Let's plan on completing the follow on work as other PRs. Thank you so much @jackwener and @mingmwang for your careful reviews and @sarahyurick for your testing |
Benchmark runs are scheduled for baseline = f2e2c29 and contender = 3fe542f. 3fe542f is a master commit associated with this PR. Results will be available as each benchmark for each run completes. |
Which issue does this PR close?
Close #4266
Part of #4267
Rationale for this change
What changes are included in this PR?
Are these changes tested?
Are there any user-facing changes?