Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query failing with valid partition filter in v448 but succeeds in v437 #22268

Open
abhinavdangi opened this issue Jun 5, 2024 · 8 comments
Open
Labels
bug Something isn't working

Comments

@abhinavdangi
Copy link

Trino 448 is set up with the following property.
delta.query-partition-filter-required=true
While querying a table with partitions, with cast condition on the partition column;

SELECT test_partition_table_1.b
  FROM test_partition_table_1 
    LEFT JOIN test_partition_table_2  ON test_partition_table_1.merge_col = test_partition_table_2.merge_col
  WHERE
  test_partition_table_2.c IS NOT NULL and 
  cast(test_partition_table_1.part_col as BIGINT) BETWEEN 20240201 and 20240229; -- not working
  -- part_col BETWEEN '20240201' and '20240229'; -- working

It fails with error:

io.trino.spi.TrinoException: Filter required on dev_7.test_partition_table_1 for at least one partition column: part_col
	at io.trino.plugin.deltalake.DeltaLakeMetadata.validateScan(DeltaLakeMetadata.java:3258)
	at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.validateScan(ClassLoaderSafeConnectorMetadata.java:1127)
	at io.trino.tracing.TracingConnectorMetadata.validateScan(TracingConnectorMetadata.java:1289)
	at io.trino.metadata.MetadataManager.validateScan(MetadataManager.java:2122)
	at io.trino.tracing.TracingMetadata.validateScan(TracingMetadata.java:1052)

Whereas, considering the plain condition without casting, it succeeds.
Table definitions are as follows:

CREATE TABLE test_partition_table_1 (
   merge_col varchar,
   b varchar,
   part_col varchar
)
WITH (
   partitioned_by = ARRAY['part_col']
);
insert into test_partition_table_1 values ('spark','col1','20240201'),('trino','col2','20240202'), ('druid','col3','20240203');
select * from test_partition_table_1 where part_col is not null;
CREATE TABLE test_partition_table_2 (
   merge_col varchar,
   c varchar
);
insert into test_partition_table_2 values ('spark','c3'), ('trino','c1'), ('druid', 'c2');
select * from test_partition_table_2;

This is failing with versions 447 and 448 but succeeding with 437.
Please help.

@ebyhr
Copy link
Member

ebyhr commented Jun 5, 2024

cc: @marcinsbd

@marcinsbd
Copy link
Contributor

@abhinavdangi can you share the query plans for success (version 437) and failure ( version 447 or 448).

@abhinavdangi
Copy link
Author

Version 448


trino:dev_7> explain SELECT test_partition_table_1.b
          ->   FROM test_partition_table_1
          ->     LEFT JOIN test_partition_table_2  ON test_partition_table_1.merge_col = test_partition_table_2.merge_col
          ->   WHERE
          ->   test_partition_table_2.c IS NOT NULL and
          ->   cast(test_partition_table_1.part_col as BIGINT) BETWEEN 20240201 and 20240229;
Query 20240605_180616_00228_d4737 failed: Filter required on dev_7.test_partition_table_1 for at least one partition column: part_col
io.trino.spi.TrinoException: Filter required on dev_7.test_partition_table_1 for at least one partition column: part_col
	at io.trino.plugin.deltalake.DeltaLakeMetadata.validateScan(DeltaLakeMetadata.java:3258)
	at io.trino.plugin.base.classloader.ClassLoaderSafeConnectorMetadata.validateScan(ClassLoaderSafeConnectorMetadata.java:1127)
	at io.trino.tracing.TracingConnectorMetadata.validateScan(TracingConnectorMetadata.java:1289)
	at io.trino.metadata.MetadataManager.validateScan(MetadataManager.java:2122)
	at io.trino.tracing.TracingMetadata.validateScan(TracingMetadata.java:1052)
	at io.trino.sql.planner.sanity.TableScanValidator$1.visitTableScan(TableScanValidator.java:37)
	at io.trino.sql.planner.sanity.TableScanValidator$1.visitTableScan(TableScanValidator.java:33)
	at io.trino.sql.planner.plan.TableScanNode.accept(TableScanNode.java:236)
	at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:26)
	at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:19)
	at io.trino.sql.planner.plan.PlanVisitor.visitFilter(PlanVisitor.java:34)
	at io.trino.sql.planner.plan.FilterNode.accept(FilterNode.java:74)
	at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:26)
	at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:19)
	at io.trino.sql.planner.plan.PlanVisitor.visitJoin(PlanVisitor.java:99)
	at io.trino.sql.planner.plan.JoinNode.accept(JoinNode.java:295)
	at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:26)
	at io.trino.sql.planner.SimplePlanVisitor.visitPlan(SimplePlanVisitor.java:19)
	at io.trino.sql.planner.plan.PlanVisitor.visitOutput(PlanVisitor.java:49)
	at io.trino.sql.planner.plan.OutputNode.accept(OutputNode.java:82)
	at io.trino.sql.planner.sanity.TableScanValidator.validate(TableScanValidator.java:32)
	at io.trino.sql.planner.sanity.PlanSanityChecker.lambda$validate$0(PlanSanityChecker.java:107)
	at com.google.common.collect.ImmutableList.forEach(ImmutableList.java:423)
	at io.trino.sql.planner.sanity.PlanSanityChecker.validate(PlanSanityChecker.java:107)
	at io.trino.sql.planner.sanity.PlanSanityChecker.validateFinalPlan(PlanSanityChecker.java:78)
	at io.trino.sql.planner.LogicalPlanner.plan(LogicalPlanner.java:272)
	at io.trino.sql.analyzer.QueryExplainer.getLogicalPlan(QueryExplainer.java:176)
	at io.trino.sql.analyzer.QueryExplainer.getDistributedPlan(QueryExplainer.java:187)
	at io.trino.sql.analyzer.QueryExplainer.getPlan(QueryExplainer.java:106)
	at io.trino.sql.rewrite.ExplainRewrite$Visitor.getQueryPlan(ExplainRewrite.java:145)
	at io.trino.sql.rewrite.ExplainRewrite$Visitor.visitExplain(ExplainRewrite.java:129)
	at io.trino.sql.rewrite.ExplainRewrite$Visitor.visitExplain(ExplainRewrite.java:74)
	at io.trino.sql.tree.Explain.accept(Explain.java:61)
	at io.trino.sql.tree.AstVisitor.process(AstVisitor.java:27)
	at io.trino.sql.rewrite.ExplainRewrite.rewrite(ExplainRewrite.java:71)
	at io.trino.sql.rewrite.StatementRewrite.rewrite(StatementRewrite.java:54)
	at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:92)
	at io.trino.sql.analyzer.Analyzer.analyze(Analyzer.java:86)
	at io.trino.execution.SqlQueryExecution.analyze(SqlQueryExecution.java:285)
	at io.trino.execution.SqlQueryExecution.<init>(SqlQueryExecution.java:218)
	at io.trino.execution.SqlQueryExecution$SqlQueryExecutionFactory.createQueryExecution(SqlQueryExecution.java:884)
	at io.trino.dispatcher.LocalDispatchQueryFactory.lambda$createDispatchQuery$0(LocalDispatchQueryFactory.java:153)
	at io.trino.$gen.Trino_448____20240531_114046_2.call(Unknown Source)
	at com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:131)
	at com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:76)
	at com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:82)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1570)
	Suppressed: java.lang.Exception: Current plan:
                Output[columnNames = [b]]
                │   Layout: [b:varchar]
                └─ InnerJoin[criteria = (merge_col = merge_col_0), filter = (CAST(part_col AS bigint) BETWEEN bigint '20240201' AND bigint '20240229'), distribution = REPLICATED]
                   │   Layout: [b:varchar]
                   │   Distribution: REPLICATED
                   │   dynamicFilterAssignments = {merge_col_0 -> #df_383}
                   ├─ ScanFilter[table = delta_prod:dev_7.test_partition_table_1, dynamicFilters = {merge_col = #df_383}]
                   │      Layout: [merge_col:varchar, b:varchar, part_col:varchar]
                   │      part_col := part_col:varchar:PARTITION_KEY
                   │      merge_col := merge_col:varchar:REGULAR
                   │      b := b:varchar:REGULAR
                   └─ LocalExchange[partitioning = SINGLE]
                      │   Layout: [merge_col_0:varchar]
                      └─ RemoteExchange[type = REPLICATE]
                         │   Layout: [merge_col_0:varchar]
                         └─ ScanFilterProject[table = delta_prod:dev_7.test_partition_table_2, filterPredicate = (NOT (c IS NULL))]
                                Layout: [merge_col_0:varchar]
                                c := c:varchar:REGULAR
                                merge_col_0 := merge_col:varchar:REGULAR

		at io.trino.sql.planner.sanity.PlanSanityChecker.validate(PlanSanityChecker.java:120)
		... 25 more

@abhinavdangi
Copy link
Author

abhinavdangi commented Jun 5, 2024

Version 437


trino:dev_7> explain SELECT test_partition_table_1.b
          ->   FROM test_partition_table_1
          ->     LEFT JOIN test_partition_table_2  ON test_partition_table_1.merge_col = test_partition_table_2.merge_col
          ->   WHERE
          ->   test_partition_table_2.c IS NOT NULL and
          ->   cast(test_partition_table_1.part_col as BIGINT) BETWEEN 20240201 and 20240229;
                                                                                                                     Query Plan                                                                                          >
------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------->
 Trino version: 437                                                                                                                                                                                                      >
 Fragment 0 [SOURCE]                                                                                                                                                                                                     >
     Output layout: [b]                                                                                                                                                                                                  >
     Output partitioning: SINGLE []                                                                                                                                                                                      >
     Output[columnNames = [b]]                                                                                                                                                                                           >
     │   Layout: [b:varchar]                                                                                                                                                                                             >
     │   Estimates: {rows: ? (?), cpu: 0, memory: 0B, network: 0B}                                                                                                                                                       >
     └─ InnerJoin[criteria = ("merge_col" = "merge_col_0"), distribution = REPLICATED]                                                                                                                                   >
        │   Layout: [b:varchar]                                                                                                                                                                                          >
        │   Estimates: {rows: ? (?), cpu: ?, memory: 450B, network: 0B}                                                                                                                                                  >
        │   Distribution: REPLICATED                                                                                                                                                                                     >
        │   dynamicFilterAssignments = {merge_col_0 -> #df_461}                                                                                                                                                          >
        ├─ ScanFilterProject[table = delta_prod:dev_7.test_partition_table_1, filterPredicate = ((CAST("part_col" AS BIGINT) >= BIGINT '20240201') AND (CAST("part_col" AS BIGINT) <= BIGINT '20240229')), dynamicFilters>
        │      Layout: [merge_col:varchar, b:varchar]                                                                                                                                                                    >
        │      Estimates: {rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}/{rows: ? (?), cpu: ?, memory: 0B, network: 0B}                                                   >
        │      b := b:varchar:REGULAR                                                                                                                                                                                    >
        │      part_col := part_col:varchar:PARTITION_KEY                                                                                                                                                                >
        │      merge_col := merge_col:varchar:REGULAR                                                                                                                                                                    >
        └─ LocalExchange[partitioning = SINGLE]                                                                                                                                                                          >
           │   Layout: [merge_col_0:varchar]                                                                                                                                                                             >
           │   Estimates: {rows: 3 (30B), cpu: 0, memory: 0B, network: 0B}                                                                                                                                               >
           └─ RemoteSource[sourceFragmentIds = [1]]                                                                                                                                                                      >
                  Layout: [merge_col_0:varchar]                                                                                                                                                                          >
                                                                                                                                                                                                                         >
 Fragment 1 [SOURCE]                                                                                                                                                                                                     >
     Output layout: [merge_col_0]                                                                                                                                                                                        >
     Output partitioning: BROADCAST []                                                                                                                                                                                   >
     ScanFilterProject[table = delta_prod:dev_7.test_partition_table_2, filterPredicate = (NOT ("c" IS NULL))]                                                                                                           >
         Layout: [merge_col_0:varchar]                                                                                                                                                                                   >
         Estimates: {rows: 3 (30B), cpu: 51, memory: 0B, network: 0B}/{rows: 3 (30B), cpu: 51, memory: 0B, network: 0B}/{rows: 3 (30B), cpu: 30, memory: 0B, network: 0B}                                                >
         c := c:varchar:REGULAR                                                                                                                                                                                          >
         merge_col_0 := merge_col:varchar:REGULAR                                                                                                                                                                        >
                                                                                                                                                                                                                         >
                                                                                                                                                                                                                         >
(1 row)

@marcinsbd
Copy link
Contributor

@abhinavdangi Can you provide the type of the column test_partition_table_1.part_col?

@abhinavdangi
Copy link
Author

Had mentioned the table definitions above.

CREATE TABLE test_partition_table_1 (
   merge_col varchar,
   b varchar,
   part_col varchar
)
WITH (
   partitioned_by = ARRAY['part_col']
);
insert into test_partition_table_1 values ('spark','col1','20240201'),('trino','col2','20240202'), ('druid','col3','20240203');

@marcinsbd marcinsbd added the bug Something isn't working label Jun 14, 2024
@marcinsbd
Copy link
Contributor

marcinsbd commented Jun 14, 2024

I did a test when partition column part_col is of the type integer instead of varchar as in example above so the cast is integer->bigint and it works.

trino:tpch> CREATE TABLE t1 (b varchar, merge_col integer, part_col integer) WITH (partitioned_by = ARRAY['part_col']);
CREATE TABLE
trino:tpch> INSERT INTO t1(b, merge_col, part_col)
         -> VALUES ('a', 1, 1),
         ->        ('b', 1, 2),
         ->        ('c', 1, 3),
         ->        ('d', 1, 4),
         ->        ('e', 2, 1),
         ->        ('f', 2, 2),
         ->        ('g', 2, 3),
         ->        ('h', 2, 4);
INSERT: 8 rows

trino:tpch> CREATE TABLE t2 (c varchar, merge_col integer);
CREATE TABLE
trino:tpch> INSERT INTO t2(c, merge_col)
         -> VALUES ('x', 1),
         ->        (null, 1),
         ->        ('y', 2),
         ->        ('Z', 3),
         ->        ('w', 4);
INSERT: 5 rows

trino:tpch> SELECT t1.b
         ->   FROM t1
         ->     LEFT JOIN t2  ON t1.merge_col = t2.merge_col
         ->   WHERE
         ->   t2.c IS NOT NULL and
         ->   cast(t1.part_col as BIGINT) BETWEEN 0 AND 1;
 b
---
 e
 a
(2 rows)

It seems to me that issue could be connected with way how we handle different casts.
as it started failing with this commit 0f9adc6
@martint , could you PTAL
It seems as the cast varchar->bigint is marked as "may-fail" expression and it's not pushed down whereas cast integer->bigint is marked as safe and is pushed down.
Should we provide different implementation of verification if partition column was used within the query?
Thanks

@martint
Copy link
Member

martint commented Jun 20, 2024

@marcinsbd, your analysis seems correct.

We're going to have to look into how to perform speculative pushdown of expressions that may fail. I have some very rough ideas on how to go about this. I'll post later.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Development

No branches or pull requests

4 participants