-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-6772] Change Select semantics to match what a user expects #8006
Conversation
R: @kanterov |
I've started looking, the code makes sense, however, I need more time to think about the idea of automatic unnesting. I'm wondering if we can make it less implicit. As for me, I would expect the previous behavior, that's how, for instance, Spark data frames work, IIRC. |
The new behavior better matches how SQL works (Select a.b return an int if
b is an int. It doesn't return a nested row). However we could also make it
an option on the Select transform so the user can pick which behavior they
want.
…On Wed, Mar 13, 2019 at 10:39 AM Gleb Kanterov ***@***.***> wrote:
I've started looking, the code makes sense, however, I need more time to
think about the idea of automatic unnesting. I'm wondering if we can make
it less implicit.
As for me, I would expect the previous behavior, that's how, for instance,
Spark data frames work, IIRC.
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#8006 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AUGE1dwShANQpi5Q1EIyg-1oQY1XdHKEks5vWTfMgaJpZM4bh_oa>
.
|
I checked a similar query in BigQuery: SELECT location FROM UNNEST([
STRUCT(STRUCT(1.0 as latitude, 2.0 as longtitude) as location, "abc" as userId)
]); It returns: [
{
"location": {
"latitude": "1.0",
"longtitude": "2.0"
}
}
] As I understand this PR, it makes Beam to return: [{
"latitude": "1.0",
"longtitude": "2.0"
}] |
what happens without UNNEST. If you simply do SELECT a.location?
…On Wed, Mar 13, 2019 at 10:55 AM Gleb Kanterov ***@***.***> wrote:
I checked a similar query in BigQuery:
SELECT location FROM UNNEST([
STRUCT(STRUCT(1.0 as latitude, 2.0 as longtitude) as location, "abc" as userId)
]);
It returns:
[
{
"location": {
"latitude": "1.0",
"longtitude": "2.0"
}
}
]
I understand this PR, it make Beam to return:
[{
"latitude": "1.0",
"longtitude": "2.0"
}]
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#8006 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AUGE1cX8xq_UbvjQFrt1pu75bQm353eBks5vWTuegaJpZM4bh_oa>
.
|
Leaving the nested structure in place seem like it would be surprising to
users. For example:
Event:
location: ROW(Location)
A user would expect to be able to write the following:
pc.apply(ParDo.of(new DoFn....
@ProcessElement
public void process(@FieldAccess("location") Location loc) {
}));
(where Location is a POJO that matches the Location schema). However this
conversion will fail if the select actually returns an extra level of
nesting. In addition they would expect to be able to write:
pc.apply(ParDo.of(new DoFn....
@ProcessElement
public void process(@FieldAccess("location.lat") double lat) {
}));
But again, that will fail if the select leaves extra layers of nesting.
…On Wed, Mar 13, 2019 at 10:59 AM Reuven Lax ***@***.***> wrote:
what happens without UNNEST. If you simply do SELECT a.location?
On Wed, Mar 13, 2019 at 10:55 AM Gleb Kanterov ***@***.***>
wrote:
> I checked a similar query in BigQuery:
>
> SELECT location FROM UNNEST([
> STRUCT(STRUCT(1.0 as latitude, 2.0 as longtitude) as location, "abc" as userId)
> ]);
>
> It returns:
>
> [
> {
> "location": {
> "latitude": "1.0",
> "longtitude": "2.0"
> }
> }
> ]
>
> I understand this PR, it make Beam to return:
>
> [{
> "latitude": "1.0",
> "longtitude": "2.0"
> }]
>
> —
> You are receiving this because you authored the thread.
> Reply to this email directly, view it on GitHub
> <#8006 (comment)>, or mute
> the thread
> <https://github.com/notifications/unsubscribe-auth/AUGE1cX8xq_UbvjQFrt1pu75bQm353eBks5vWTuegaJpZM4bh_oa>
> .
>
|
As I understand it, I did few BigQuery experiments:
|
I need to check how |
But doesn't your BigQuery experiment confirm this behavior? You got back a
Row containing location.latitude and location.longtitude, not a Row of Row.
It does look like BigQuery flattened the filed names though.
…On Wed, Mar 13, 2019 at 11:25 AM Gleb Kanterov ***@***.***> wrote:
As I understand it, FieldAccess("location") would work only if we leave
an extra layer of nesting.
I did few BigQuery experiments:
CREATE TABLE IF NOT EXISTS test.test (
userId STRING,
location STRUCT<latitude FLOAT64, longtitude FLOAT64>
);
INSERT INTO test.test (userId, location) VALUES ("abc", (123.0, 234.0));
SELECT location FROM test.test;
-- Row location.latitude location.longtitude
-- 1 123.0 234.0
SELECT location.latitude FROM gleb_test.test;
-- Row latitude
-- 1 123.0
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#8006 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AUGE1Zx9vkVCOc_1nO0c7hker6Ai-JK3ks5vWUKDgaJpZM4bh_oa>
.
|
It created a schema with nested records, to confirm I did an experiment: CREATE TABLE `test.test_schema` AS
SELECT location FROM test.test; And I got:
|
This way it's more visible:
|
+Kenn for comment
This seems very surprising to me, to be honest. It's not what I would
expect. This behavior has some mathematical niceties (e.g. it makes select
distribute across a union of selectors), but seems like it's almost never
what a user actually wants.
In Beam, part of the goal of schemas is to interact seamlessly with user
types (such as Pojos), so users don't have to deal with Row objects (unless
they want to). The above example for ParDo is one (though the
implementation for that is not yet merged). Another examples is that users
might expect to be able to write:
PCollection<Location> locations = pc.apply(Select.fieldNames("location"))
.apply(Convert.to(Location.class));
And that will only work if the selected item matches the schema of
Location, not if it's a nested schema.
Maybe for now we should add an option to the Select transform so that the
user can specify which behavior they want?
…On Wed, Mar 13, 2019 at 1:28 PM Gleb Kanterov ***@***.***> wrote:
This way it's more visible:
$ bq show --schema test.test_schema | jq
[
{
"fields": [
{
"type": "FLOAT",
"name": "latitude"
},
{
"type": "FLOAT",
"name": "longtitude"
}
],
"type": "RECORD",
"name": "location"
}
]```
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#8006 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AUGE1YZ5-hg8j4BjFSrhQXI3ACpYTImyks5vWV9zgaJpZM4bh_oa>
.
|
I did a similar experiment with Spark:
|
So select(location.latitude) in Spark matches the unnest semantics, but
select(location) does not. So Spark is inconsistent in itself?
…On Wed, Mar 13, 2019 at 1:54 PM Gleb Kanterov ***@***.***> wrote:
I did a similar experiment with Spark:
df = spark.createDataFrame(
[{"userId":"abc", "location":{"longtitude":1.0,"latitude":2.0}}],
T.StructType([
T.StructField("userId", T.StringType()),
T.StructField("location", T.StructType([
T.StructField("longtitude", T.DoubleType()),
T.StructField("latitude", T.DoubleType()),
])),
])
)
df
> DataFrame[userId: string, location: struct<longtitude:double,latitude:double>]
df.select("location")
> DataFrame[location: struct<longtitude:double,latitude:double>]
df.select("location.latitude")
> DataFrame[latitude: double]
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#8006 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AUGE1b9I4gFnsRpq_v48XBuWOzKWwpUmks5vWWV5gaJpZM4bh_oa>
.
|
As I understand, the approach is to take the last element in the field path, and use it as name, as I see, BigQuery works the same way:
|
It seems that Spark allows duplicate column names:
And BigQuery doesn't:
|
I don't actually see a contradiction here. When you
It isn't maintaining the original structure of the input row, it is a list of rows, and each row has a column you selected, and that column contains a struct. |
I'm curious about this one:
The output when you |
Ok this makes sense then.
As I see it BigQuery (and Spark) is unnesting just like this PR doees. The
difference is that it then creates a new Row to contain the selected
result. So select("location") returns a Row containing a location.
select("location.latitude") returns a Row containing a double. I would
assume that select("location.*") would return a row containing a latitude
and a longitude.
We could validate this by nesting the location three layers deep. In that
case I suspect BQ would return the same thing it did in the above
experiment, not a three-layered nested row.
…On Wed, Mar 13, 2019 at 2:39 PM Gleb Kanterov ***@***.***> wrote:
As I understand, the approach is to take the last element in the field
path, as I see, BigQuery works the same way:
CREATE TABLE `test.test_schema_2` AS
SELECT location.latitude FROM test.test;
$ bq show --schema test.test_schema_2 | jq
[
{
"type": "FLOAT",
"name": "latitude"
}
]
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#8006 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AUGE1XIPD-j12E_DRmJcDaVECYPizaOcks5vWXAhgaJpZM4bh_oa>
.
|
@kennknowles it isn't flattened, you are right:
Gives
|
Makes sense. I tried and both BigQuery and Spark have consistent behavior for selecting rows nested in rows:
|
Always adding one extra row around whatever is selected makes sense in
Spark and BQ, because it makes selection consistent between primitive and
nested types. Select must always return a row, so if you select userId, it
returns a Row with a single string field (not a scalar string field). Spark
and BQ have therefore decided to treat nested rows the exact same way: even
though the nested row could be returned directly, it is treated
consistently like primitive types and boxed inside a result row.
…On Wed, Mar 13, 2019 at 4:09 PM Gleb Kanterov ***@***.***> wrote:
Makes sense. I tried and both BigQuery and Spark have consistent behavior
for selecting rows nested in rows:
df.printSchema()
root
|-- userId: string (nullable = true)
|-- position: struct (nullable = true)
| |-- location: struct (nullable = true)
| | |-- longtitude: double (nullable = true)
| | |-- latitude: double (nullable = true)
df.select("position.location").printSchema()
root
|-- location: struct (nullable = true)
| |-- longtitude: double (nullable = true)
| |-- latitude: double (nullable = true)
—
You are receiving this because you authored the thread.
Reply to this email directly, view it on GitHub
<#8006 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AUGE1X7rouPUfs4n2cmi2PS6w6iTZj-Uks5vWYVCgaJpZM4bh_oa>
.
|
@kanterov I'm changing the semantics here to always return a new row, matching BigQuery and Spark. Will ping once that's done. |
@reuvenlax To avoid miscommunication, I didn't reply anything not because I wasn't certain about the solution or something wasn't clear, but because I didn't have time to continue the review. |
I would try to summarize my understanding of Spark properties:
|
I also uploaded the notebook I used for Spark: https://gist.github.com/kanterov/78972d40ebe5b6f5c553b5a018dbb0bf You can get local Spark instance with Jupyter with:
Or using Dataproc |
Spark has an implementation of schema prunning, that behaves similar to the code in this PR (preserves schema structure). It was introduced in apache/spark#21320, there is a test suite demonstrating it's behavior https://github.com/apache/spark/pull/21320/files#diff-3131013c95ca682e798070a2d50d6896. As I understand, it isn't used in user-facing API, only for pushing down projections into Parquet data source. |
Oh, I see now. The issue is eliding the array layers actually. |
I think the summary by @kanterov is super clear are the right approach, but I don't know if I would call that "union" so much as row building. For SQL I would state this as Now for raw Beam if you select just a single column and the type of that column can convert to a Java type, you shoulud be able to convert. Specifically:
This is a step towards a thing mentioned in the Go SDK coders thread:
|
Given that this is mostly an issue eliding nested arrays, I wonder if we
should merge this PR and file a JIRA for the nested array case. We can
discuss the correct behavior on the JIRA. This PR seems more correct than
the old behavior.
…On Fri, Mar 22, 2019 at 10:12 AM Kenn Knowles ***@***.***> wrote:
I think the summary by @kanterov <https://github.com/kanterov> is super
clear are the right approach, but I don't know if I would call that "union"
so much as row building.
For SQL I would state this as SELECT <expr1 : name1>, <expr2, name2> ...
each result is a new row containing the columns name1, name2 etc and the
type of each column is the type of the expression.
Now for raw Beam if you select just a single column and the type of that
column can convert to a Java type, you shoulud be able to convert.
Specifically:
- row with one int column convertible to int
- row with one row column matching a POJO can convert directly
This is a step towards a thing mentioned in the Go SDK coders thread:
- row with one bytes column w/ coder metadata equivalent to today's
approach of coders
—
You are receiving this because you were mentioned.
Reply to this email directly, view it on GitHub
<#8006 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AUGE1QmldObUjrqJDswOV3CXYalW27aCks5vZQ77gaJpZM4bh_oa>
.
|
@kanterov I'm surprised that Spark cannot resolve select("a.c.d"). Is there a reason for that? |
@reuvenlax I was surprised to see that Spark didn't resolve Agree that we should keep iterations quick, and given API is heavily experimental we could move forward with increments. I think we did a good analysis, and now the problem better. However, we still don't have a good formalization of desired semantics. I'm going to revisit the code again, didn't have time to check this today. |
@reuvenlax I've tried the following test:
And it fails with an exception:
I don't see anything wrong with the test itself. |
Good catch on that test. This was a bug in my code. I've fixed that bug and added your test. |
@kennknowles and I discussed the array issue. We both think that the best approach is probably to distribute out array selects. However there are other options that should be discussed (e.g. completely unnesting arrays whenever a select happens), so we think this discussion should be brought to the dev list. This PR I believe is generally an improvement, so (assuming other comments are resolved) I think we should merge it and address array select semantics separately. |
FYI by "distribute array selects" I mean the following: if you have { a: {b: int, c: int}[] } and someone select "a.b", "a.c" the result should be {b: int[], c: int[]}. The same should be true if someone selects a.*. However if someone selects just a, you get the original schema back. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@reuvenlax sorry for long iterations
I've tried the following test, and it didn't work:
// expected {"f2": {"f1": {"f0": 42}}}
// got {"f1": {"f0": 42}}
@Test
public void testSelectFieldOfRecordOrRecord() {
Schema f1 = Schema.builder().addInt64Field("f0").build();
Schema f2 = Schema.builder().addRowField("f1", f1).build();
Schema f3 = Schema.builder().addRowField("f2", f2).build();
Schema f4 = Schema.builder().addRowField("f3", f3).build();
Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42}
Row r2 = Row.withSchema(f2).addValue(r1).build(); // {"f1": {"f0": 42}}
Row r3 = Row.withSchema(f3).addValue(r2).build(); // {"f2": {"f1": {"f0": 42}}}
Row r4 = Row.withSchema(f4).addValue(r3).build(); // {"f3": {"f2": {"f1": {"f0": 42}}}}
FieldAccessDescriptor fieldAccessDescriptor =
FieldAccessDescriptor.withFieldNames("f3.f2").resolve(f4);
Schema outputSchema = SelectHelpers.getOutputSchema(f3, fieldAccessDescriptor);
Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, r4.getSchema(), outputSchema);
assertEquals(f3, outputSchema);
assertEquals(r3, out);
}
Row out = SelectHelpers.selectRow(r3, fieldAccessDescriptor, r3.getSchema(), outputSchema); | ||
|
||
assertEquals(outputSchema, f2); | ||
assertEquals(out, r2); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parameters should be in a different order: expected, actual
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
I think we can merge and iterate from there, but there are still issues with semantics that can be discussed separately. |
@kanterov looking. I suspect my fix to the last issue you found is what caused this use case to break. |
@kanterov I believe actually that the bug is in your test. You have: When I fix this, the test now passes. |
Run Java PreCommit |
2 similar comments
Run Java PreCommit |
Run Java PreCommit |
@kanterov I went ahead and changed array and map selects to distribute the select. This preserves the invariant that the field name selected is always the one that appears in the resulting schema, and I believe is closer to what Spark does. I still think we should have a thread on the dev list to discuss the detailed semantics, as we still might want to change them. However I think with this last commit, we are probably closer to where we want to end up. |
@kanterov did you have any further comments on this PR? |
@reuvenlax sorry for the delay, LGTM 👍 please feel free to merge when you are comfortable with it |
…o match what a user expects
Details in the JIRA: The current Select transform provides confusing and less-useful semantics. This PR causes the transform to remove extra levels of nesting that are left after a Select.