-
Notifications
You must be signed in to change notification settings - Fork 380
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GLUTEN-6189][CORE] Shouldn't pushdown undeterministic filter to scan node #6191
Conversation
Run Gluten Clickhouse CI |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there any root cause from Velox's code for this issue? It doesn't make sense if rand()
results in a constant vector or involves in constant folding procedure.
|""".stripMargin) | ||
.collect() | ||
.length | ||
assert(resultLength >= 25000) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could give resultLength
a upper bound to improve stability of the test? Say if rand()
yielded a constant 0.1
then the test could still pass as false-positive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's not velox's rand bug, it was because rand() < 0.5
was evaluate twice, first is in ScanNode
, then in FilterNode
, so that the filtered result is 0.5 * 0.5.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it was because rand() < 0.5 was evaluate twice
In which place it gets evaluated twice? It should be either in subfield filters or in remaining filters already by design. Am I missing something? cc @rui-mo
Would you mean it's placed both in subfield filters and remaining filters?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The design is all filters are pushed down to Spark's Scan operator on Scala side, and in Velox's TableScanNode there are subfield filters and remaining filter to handle two kinds filters according to if their push-down is supported. I'm not clear if the duplication occurs on Scala side between Scan and Filter operator, or on native side between subfield filters and remaining filter. @WangGuangxin Could you clarify?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let me double check
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take the UT as an example, the rand() < 0.5
both exists in
-
getPushedFilter
incubator-gluten/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
Line 672 in 9cceba6
def getPushedFilter(dataFilters: Seq[Expression]): Seq[Expression] = {
which is pushed toScanNode
( it has nothing to do with the logic ofremainingFilter
andsubFieldFilter
) -
and also, the
FilterExecTransformer
also hasrand() < 0.5
ingetRemainingCondition
Line 39 in 9cceba6
FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Take the UT as an example, the
rand() < 0.5
both exists in
getPushedFilter
incubator-gluten/gluten-core/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala
Line 672 in 9cceba6
def getPushedFilter(dataFilters: Seq[Expression]): Seq[Expression] = { which is pushed to
ScanNode
( it has nothing to do with the logic ofremainingFilter
andsubFieldFilter
)and also, the
FilterExecTransformer
also hasrand() < 0.5
ingetRemainingCondition
Line 39 in 9cceba6
FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))
It looks like a bug. By looking into the code, I believe the author didn't want a filter expression appearing in FilterHandler.getRemainingFilters
once it was pushed into scan. So perhaps we need a bugfix for that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However I will not be against this change if it doesn't bring performance issues. Given that it's a more robust way to make sure non-deterministic filter expressions be only evaluated once from JVM side.
But I think we still need to find out reason of the bug.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if it doesn't bring performance issues.
But probably it's hardly true in Velox backend if the backend does support pushing non-deterministic exprs to file reader to shorten read time. So my assumption could be too ideal, I am not sure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rand() < 0.5 was evaluate twice, first is in ScanNode, then in FilterNode
If one filter exists in scan node, we do not add it to the filter node again. getRemainingFilters
is for that purpose. Is there any bug in it? Thanks.
Run Gluten Clickhouse CI |
@rui-mo I think the assumption was broken since |
@WangGuangxin The purpose of postProcessPushDownFilter is to push down all the filters to Scan even if only Filter operator contains it. After that, Filter will only process those do not exist in Scan. It looks like a bug if both Scan and Filter contain the same expression. |
Run Gluten Clickhouse CI |
.filterNot(_.references.exists { | ||
attr => SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(attr.name) | ||
}) | ||
.filter(_.deterministic) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Vanilla Spark would not push down non-deterministic filer exprs. But as Gluten have already supported it, can we add a new config to control if push down non-deterministic filer exprs ?
Bug of getRemainingFilters. ExpressionSet can only remove deterministic expression.
to
This seems to work, if the filter reference pushed down has not changed. |
I submitted a PR to resolve it. #6296 |
Is this error in Velox or Gluten? Velox table scan supports undeterministic filter pushdown so it shouldn't be an issue to pushdown to Velox. @rui-mo I remember initially we have to distinguish subfiled filter and remaining filter in oap/velox. Is the code moved to Gluten? |
What changes were proposed in this pull request?
We shouldn't pushdown undeterministic filter to scan node, for example, if we push down filter with
rand() < 0.5
to scan, the result will be around 1/4 instead of 1/2.(Fixes: #6189)
How was this patch tested?
UT