-
Notifications
You must be signed in to change notification settings - Fork 2.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Support partitioning on nested ROW fields in Iceberg #15712
Support partitioning on nested ROW fields in Iceberg #15712
Conversation
@krvikash Could you rebase on upstream to resolve conflicts? |
0d38c8c
to
53011d6
Compare
Rebased the PR with latest code. |
assertThat(onTrino().executeQuery(format(selectByString, trinoTableName))) | ||
.containsOnly(row1); | ||
// TODO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TODO task. Parquet format returning null for partition nested field.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If Spark Iceberg can't handle this kind of use-case please see whether there is already reported an issue to address this missing functionality.
We shouldn't introduce TODO
s in the code relating to Spark limitations. Trino does not depend on Spark.
Simply add an assertion that the query is failing with an expected message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could not find any existing issue for PARQUET
issue. Where Saprk returns null
for the partitioned nested field.
53011d6
to
46d91a8
Compare
Hi, @alexjo2144 | @ebyhr | @findepi | @findepi, when you get time could you please review this? |
46d91a8
to
dbf698f
Compare
Rebased the PR with latest code. |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/ColumnIdentity.java
Outdated
Show resolved
Hide resolved
assertThat(onTrino().executeQuery(format(selectByString, trinoTableName))) | ||
.containsOnly(row1); | ||
// TODO |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If Spark Iceberg can't handle this kind of use-case please see whether there is already reported an issue to address this missing functionality.
We shouldn't introduce TODO
s in the code relating to Spark limitations. Trino does not depend on Spark.
Simply add an assertion that the query is failing with an expected message.
|
||
Row row1 = row("a", new byte[] {15, -15, 2, -16, -2, -1}, 1001, "field1"); | ||
String select = "SELECT _string, _varbinary, _bigint, _struct._field FROM %s WHERE _string = 'a'"; | ||
// ORC: Job aborted due to stage failure: Task 0 in stage 40.0 failed 1 times, most recent failure: Lost task 0.0 in stage 40.0 (TID 70) (spark executor driver): java.lang.IndexOutOfBoundsException: Index 1 out of bounds for length 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as for Parquet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could not find any existing issue for PARQUET issue. Where Saprk returns null for the partitioned nested field.
...roduct-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
Outdated
Show resolved
Hide resolved
...roduct-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
Outdated
Show resolved
Hide resolved
...roduct-tests/src/main/java/io/trino/tests/product/iceberg/TestIcebergSparkCompatibility.java
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you also add some predicate pushdown tests? Similar to one of these tests that use isFullyPushedDown
https://github.com/trinodb/trino/blob/master/plugin/trino-iceberg/src/test/java/io/trino/plugin/iceberg/BaseIcebergConnectorTest.java#L1611
int parentSourceId = getParentSourceId(indexParents, field.sourceId()); | ||
Type sourceType = tableSchema.findType(parentSourceId); | ||
if (sourceType.isMapType()) { | ||
throw new TrinoException(NOT_SUPPORTED, "Partitioning field [" + field.name() + "] cannot be contained in a map"); | ||
} | ||
if (sourceType.isListType()) { | ||
throw new TrinoException(NOT_SUPPORTED, "Partitioning field [" + field.name() + "] cannot be contained in a array"); | ||
} | ||
return requireNonNull(columnById.get(parentSourceId), () -> "Cannot find source column for partition field " + field); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This block is a bit confusing to me since it relies on getParentSourceId
returning the field id that was passed to it if it's already a base column. Is an integer column the parent of itself? That's fuzzy. I might rephrase it as
boolean isBaseColumn = !parentIndex.contains(fieldId);
if (isBaseColumn) {
...
}
else {
...
}
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergColumnHandle.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/PartitionTable.java
Outdated
Show resolved
Hide resolved
.map(column -> { | ||
org.apache.iceberg.types.Type type = toIcebergType(column.getType()); | ||
if (!type.isPrimitiveType()) { | ||
type = TypeUtil.assignFreshIds(type, nextNestedFieldId::getAndIncrement); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't seem right to me.
Doing tricks like this can get us into trouble (there are some syntethic columns - see org.apache.iceberg.MetadataColumns#FILE_PATH
) which have the id set to Integer.MAX_VALUE - 1
. Incrementing this value to get artificially a new field id for the row can lead to problems.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was looking for where schemaFromHandles
is being used and found the following:
- deletes handling in
IcebergPageSourceProvider
- bucketing function in
IcebergNodePartitioningProvider
In both places we do have schema
available.
Please investigate whether using schema
in schemaFromHandles
would be possible.
If yes, we could work with TypeUtil.indexParents(schema)
to get only the extra "row" contents we need from the schema.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is related to some changes in #14837
Maybe we can pull some of those changes in here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks, @findinpath | @alexjo2144 for pointing this out. I have added some changes from #14837 and now I do not need to reassign indexes.
IMO, It will be better if #14837 gets merged first because the current PR contains changes, that are unrelated to the supporting partitioning field.
9ab6531
to
9d97902
Compare
This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua |
@krvikash @alexjo2144 @findinpath I see no approvals and some yet unresolved conversations. Where are we with the PR? |
I believe this PR is waiting for #14837 (#15712 (comment)) |
9d97902
to
958a762
Compare
Rebased on top of #14837 's commit and resolved conflicts. This PR (2nd commit of this PR) is ready for review now. |
(some refactoring in |
e2afa24
to
39c59dd
Compare
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
List<Integer> path = new ArrayList<>(requireNonNull(indexPaths.get(fieldId))); | ||
if (!path.isEmpty()) { | ||
// Path does not include the base field id | ||
baseField = indexById.get(path.removeFirst()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is hard to follow what is happening when we have mutations.
Why do we remove the first element from the list?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The first element is used for getting the root field id, but the element shouldn't exist for IcebergColumnHandle.path
. I updated the comment and slightly modified the logic.
39c59dd
to
76e3886
Compare
76e3886
to
bfe8da9
Compare
Rebased on master to resolve conflicts. |
Pls rebase the code to resolve conflicts. |
bfe8da9
to
1f15a36
Compare
This pull request has gone a while without any activity. Tagging the Trino developer relations team: @bitsondatadev @colebow @mosabua |
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java
Outdated
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergBucketFunction.java
Show resolved
Hide resolved
plugin/trino-iceberg/src/main/java/io/trino/plugin/iceberg/IcebergUtil.java
Outdated
Show resolved
Hide resolved
1f15a36
to
bf37211
Compare
Co-authored-by: Victoria Bukta <[email protected]> Co-authored-by: Yuya Ebihara <[email protected]>
bf37211
to
336ec37
Compare
Description
Fixes #15109
Support partitioning on nested ROW fields in Iceberg
Release notes
( ) This is not user-visible or docs only and no release notes are required.
( ) Release notes are required, please propose a release note for me.
(X) Release notes are required, with the following suggested text: