-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-17425][blink-planner] supportsFilterPushDown rule in DynamicSource. #12866
Conversation
Thanks a lot for your contribution to the Apache Flink project. I'm the @flinkbot. I help the community Automated ChecksLast check on commit dcdd7b7 (Fri Jul 10 08:56:12 UTC 2020) Warnings:
Mention the bot in a comment to re-run the automated checks. Review Progress
Please see the Pull Request Review Guide for a full explanation of the review process. The Bot is tracking the review progress through labels. Labels are applied according to the order of the review items. For consensus, approval by a Flink committer of PMC member is required Bot commandsThe @flinkbot bot supports the following commands:
|
@godfreyhe ,because of my careless operation, i drop my github repo. So i open a open PR. And the third commit is modification for all your reviews. |
b745ddd
to
33ee4b7
Compare
hi @godfreyhe, you can review it now. The blink planner tests passed. |
PR before #12851 |
|
||
LogicalTableScan scan = call.rel(1); | ||
TableSourceTable tableSourceTable = scan.getTable().unwrap(TableSourceTable.class); | ||
//we can not push filter twice |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: add a space after //
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, I left some comments.
There are some comments in #12851 are not been addressed, WDYT?
}), | ||
context.getCatalogManager().getDataTypeFactory()) | ||
.build(); | ||
SupportsFilterPushDown.Result result = ((SupportsFilterPushDown) newTableSource).applyFilters(resolver.resolve(remainingPredicates)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: wrap the line, it's too long
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ok
getNewFlinkStatistic(oldTableSourceTable, originPredicatesSize, updatedPredicatesSize), | ||
getNewExtraDigests(oldTableSourceTable, result.getAcceptedFilters()) | ||
); | ||
TableScan newScan = new LogicalTableScan(scan.getCluster(), scan.getTraitSet(), scan.getHints(), newTableSourceTable); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use LogicalTableScan.create
instead of new LogicalTableScan
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i want to know why?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
because the traits
of Scan
may be changed after filter push down, It's better the traits
could be recomputed which is done in LogicalTableScan#create
method. Currently, new LogicalTableScan
is ok, but I think using LogicalTableScan#create
is better.
//we can not push filter twice | ||
return tableSourceTable != null | ||
&& tableSourceTable.tableSource() instanceof SupportsFilterPushDown | ||
&& !Arrays.stream(tableSourceTable.extraDigests()).anyMatch(str -> str.contains("filter")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if a table contain a field named filter
?
use str.startWith("filter=[")
Hi @godfreyhe . Yes, i find some type is not Comparable, such as Period int getValueAs(Class clazz). |
Object lhsValue = getValue(children.get(0), row); | ||
Object rhsValue = getValue(children.get(1), row); | ||
// validate that literal is comparable | ||
if (!isComparable(lhsValue, binExpr) || !isComparable(rhsValue, binExpr)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about check whether the type of a field is comparable in shouldPushDownUnaryExpression
method ? then the logic of binaryFilterApplies
need not to change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea
21ff7c2
to
b9ae3fb
Compare
|
||
if (expr instanceof ValueLiteralExpression) { | ||
// validate that literal is comparable | ||
Optional value = ((ValueLiteralExpression) expr).getValueAs(((ValueLiteralExpression) expr).getOutputDataType().getConversionClass()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should also consider whether the type of FieldReferenceExpression
is comparable, consider the following pattern:
the filterableFields
is a
and b
and the pushed pattern a > b
thanks for the update. LGTM overall, last comment: please remove the import: |
77a0241
to
a78a2f7
Compare
@@ -565,14 +562,12 @@ private boolean binaryFilterApplies(CallExpression binExpr, Row row) { | |||
} | |||
} | |||
|
|||
private boolean isComparable(Class<?> clazz) { | |||
// validate that literal is comparable | |||
private void validateTypeComparable(Class<?> clazz) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this method should be
private boolean isComparable(Class<?> clazz) {
return Comparable.class.isAssignableFrom(clazz);
}
we do not need throw exception if a class is non-comparable.TestValuesTableFactory
just does not support non-comparable type. just like the case that TestValuesTableFactory
just supports UPPER
and LOWER
UDF.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@godfreyhe , But need it log some warning info here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think so. We can add some java doc at the class level to explain which patterns this class supports
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea +1
a78a2f7
to
1ca506e
Compare
* Test implementation of {@link DynamicTableSourceFactory} that creates | ||
* a source that produces a sequence of values. | ||
* Test implementation of {@link DynamicTableSourceFactory} that creates a source that produces a sequence of values. | ||
* And this source {@link TestValuesTableSource} supports FilterPushDown. And it has some limitations. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And {@link TestValuesTableSource} can push down filter into table source. A predicate can be pushed down only if it satisfies the following conditions:
- field name is in
filterable-fields
which is defined inwith
properties - the field type is comparable
- UDF is
UPPER
orLOWER
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the update, LGTM. cc @wuchong
@godfreyhe @wuchong , could it merge to master . The error is test_rocksdb_state_memory_control, which it is not relevant to my commits. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the late response. LGTM.
I will merge this once the build is passed in my repo.
…ScanTableSource in planner This closes apache#12866
What is the purpose of the change
make the DynamicSource supports FilterPushDown Rule
Verifying this change
This change added tests and can be verified as follows:
Does this pull request potentially affect one of the following parts:
Documentation