Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GLUTEN-6189][CORE] Shouldn't pushdown undeterministic filter to scan node #6191

Closed
wants to merge 3 commits into from

Conversation

WangGuangxin
Copy link
Contributor

What changes were proposed in this pull request?

We shouldn't pushdown undeterministic filter to scan node, for example, if we push down filter with rand() < 0.5 to scan, the result will be around 1/4 instead of 1/2.

(Fixes: #6189)

How was this patch tested?

UT

Copy link

#6189

Copy link

Run Gluten Clickhouse CI

@WangGuangxin
Copy link
Contributor Author

cc @PHILO-HE @rui-mo @ulysses-you

Copy link
Member

@zhztheplayer zhztheplayer left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there any root cause from Velox's code for this issue? It doesn't make sense if rand() results in a constant vector or involves in constant folding procedure.

|""".stripMargin)
.collect()
.length
assert(resultLength >= 25000)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could give resultLength a upper bound to improve stability of the test? Say if rand() yielded a constant 0.1 then the test could still pass as false-positive.

Copy link
Contributor Author

@WangGuangxin WangGuangxin Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not velox's rand bug, it was because rand() < 0.5 was evaluate twice, first is in ScanNode, then in FilterNode, so that the filtered result is 0.5 * 0.5.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it was because rand() < 0.5 was evaluate twice

In which place it gets evaluated twice? It should be either in subfield filters or in remaining filters already by design. Am I missing something? cc @rui-mo

Would you mean it's placed both in subfield filters and remaining filters?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The design is all filters are pushed down to Spark's Scan operator on Scala side, and in Velox's TableScanNode there are subfield filters and remaining filter to handle two kinds filters according to if their push-down is supported. I'm not clear if the duplication occurs on Scala side between Scan and Filter operator, or on native side between subfield filters and remaining filter. @WangGuangxin Could you clarify?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let me double check

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take the UT as an example, the rand() < 0.5 both exists in

  1. getPushedFilter

    def getPushedFilter(dataFilters: Seq[Expression]): Seq[Expression] = {

    which is pushed to ScanNode ( it has nothing to do with the logic of remainingFilter and subFieldFilter)

  2. and also, the FilterExecTransformer also has rand() < 0.5 in getRemainingCondition

    FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Take the UT as an example, the rand() < 0.5 both exists in

  1. getPushedFilter

    def getPushedFilter(dataFilters: Seq[Expression]): Seq[Expression] = {

    which is pushed to ScanNode ( it has nothing to do with the logic of remainingFilter and subFieldFilter)

  2. and also, the FilterExecTransformer also has rand() < 0.5 in getRemainingCondition

    FilterHandler.getRemainingFilters(scanFilters, splitConjunctivePredicates(condition))

It looks like a bug. By looking into the code, I believe the author didn't want a filter expression appearing in FilterHandler.getRemainingFilters once it was pushed into scan. So perhaps we need a bugfix for that.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However I will not be against this change if it doesn't bring performance issues. Given that it's a more robust way to make sure non-deterministic filter expressions be only evaluated once from JVM side.

But I think we still need to find out reason of the bug.

Copy link
Member

@zhztheplayer zhztheplayer Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if it doesn't bring performance issues.

But probably it's hardly true in Velox backend if the backend does support pushing non-deterministic exprs to file reader to shorten read time. So my assumption could be too ideal, I am not sure.

Copy link
Contributor

@rui-mo rui-mo left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rand() < 0.5 was evaluate twice, first is in ScanNode, then in FilterNode

If one filter exists in scan node, we do not add it to the filter node again. getRemainingFilters is for that purpose. Is there any bug in it? Thanks.

Copy link

Run Gluten Clickhouse CI

@WangGuangxin
Copy link
Contributor Author

rand() < 0.5 was evaluate twice, first is in ScanNode, then in FilterNode

If one filter exists in scan node, we do not add it to the filter node again. getRemainingFilters is for that purpose. Is there any bug in it? Thanks.

@rui-mo I think the assumption was broken since rand() < 0.5 doesn't exists in FileSourceScanExec's dataFilters, that was because Spark will not pushdown undeterministic filter to scan https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/DataSourceUtils.scala#L291

@rui-mo
Copy link
Contributor

rui-mo commented Jun 24, 2024

@WangGuangxin The purpose of postProcessPushDownFilter is to push down all the filters to Scan even if only Filter operator contains it. After that, Filter will only process those do not exist in Scan. It looks like a bug if both Scan and Filter contain the same expression.
https://github.com/apache/incubator-gluten/blob/main/backends-velox/src/main/scala/org/apache/gluten/execution/FilterExecTransformer.scala#L38-L40

Copy link

Run Gluten Clickhouse CI

.filterNot(_.references.exists {
attr => SparkShimLoader.getSparkShims.isRowIndexMetadataColumn(attr.name)
})
.filter(_.deterministic)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vanilla Spark would not push down non-deterministic filer exprs. But as Gluten have already supported it, can we add a new config to control if push down non-deterministic filer exprs ?

@zml1206
Copy link
Contributor

zml1206 commented Jun 26, 2024

Bug of getRemainingFilters. ExpressionSet can only remove deterministic expression.

def getRemainingFilters(scanFilters: Seq[Expression], filters: Seq[Expression]): Seq[Expression] =
    (ExpressionSet(filters) -- ExpressionSet(scanFilters)).toSeq

to

def getRemainingFilters(scanFilters: Seq[Expression], filters: Seq[Expression]): Seq[Expression] =
    (filters.toSet -- scanFilters.toSet).toSeq

This seems to work, if the filter reference pushed down has not changed.

@zml1206
Copy link
Contributor

zml1206 commented Jul 1, 2024

I submitted a PR to resolve it. #6296

@FelixYBW
Copy link
Contributor

FelixYBW commented Jul 1, 2024

Is this error in Velox or Gluten? Velox table scan supports undeterministic filter pushdown so it shouldn't be an issue to pushdown to Velox.

@rui-mo I remember initially we have to distinguish subfiled filter and remaining filter in oap/velox. Is the code moved to Gluten?

@rui-mo
Copy link
Contributor

rui-mo commented Jul 2, 2024

@FelixYBW This bug is in Gluten Scala code, seeing #6296.

I remember initially we have to distinguish subfiled filter and remaining filter in oap/velox.

This code is in Gluten cpp, taking effect during the conversion from Substrait to Velox. The bug mentioned in this PR is not related to that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[CORE] Shouldn't pushdown undeterministic filter to scan node
6 participants