Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-6772] Change Select semantics to match what a user expects #8006

Merged
merged 14 commits into from
Apr 12, 2019

Conversation

reuvenlax
Copy link
Contributor

Details in the JIRA: The current Select transform provides confusing and less-useful semantics. This PR causes the transform to remove extra levels of nesting that are left after a Select.

@reuvenlax
Copy link
Contributor Author

R: @kanterov

@kanterov
Copy link
Member

I've started looking, the code makes sense, however, I need more time to think about the idea of automatic unnesting. I'm wondering if we can make it less implicit.

As for me, I would expect the previous behavior, that's how, for instance, Spark data frames work, IIRC.

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Mar 13, 2019 via email

@kanterov
Copy link
Member

kanterov commented Mar 13, 2019

I checked a similar query in BigQuery:

SELECT location FROM UNNEST([
  STRUCT(STRUCT(1.0 as latitude, 2.0 as longtitude) as location, "abc" as userId)
]);

It returns:

[
  {
    "location": {
      "latitude": "1.0",
      "longtitude": "2.0"
    }
  }
]

As I understand this PR, it makes Beam to return:

[{
  "latitude": "1.0",
  "longtitude": "2.0"
}]

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Mar 13, 2019 via email

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Mar 13, 2019 via email

@kanterov
Copy link
Member

As I understand it, FieldAccess("location") would work only if we leave an extra layer of nesting.

I did few BigQuery experiments:

CREATE TABLE IF NOT EXISTS test.test (
  userId STRING,
  location STRUCT<latitude FLOAT64, longtitude FLOAT64>
);

INSERT INTO test.test (userId, location) VALUES ("abc", (123.0, 234.0));

SELECT location FROM test.test;
-- Row  location.latitude   location.longtitude 
-- 1    123.0               234.0

SELECT location.latitude FROM gleb_test.test;
-- Row latitude    
-- 1   123.0

@kanterov
Copy link
Member

I need to check how FieldAccess works today

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Mar 13, 2019 via email

@kanterov
Copy link
Member

It created a schema with nested records, to confirm I did an experiment:

CREATE TABLE `test.test_schema` AS
SELECT location FROM test.test;

And I got:

location | RECORD | NULLABLE |  
-- | -- | -- | --
location. latitude | FLOAT | NULLABLE |  
location. longtitude | FLOAT | NULLABLE

@kanterov
Copy link
Member

kanterov commented Mar 13, 2019

This way it's more visible:

$ bq show --schema test.test_schema | jq
[
  {
    "fields": [
      {
        "type": "FLOAT",
        "name": "latitude"
      },
      {
        "type": "FLOAT",
        "name": "longtitude"
      }
    ],
    "type": "RECORD",
    "name": "location"
  }
]

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Mar 13, 2019 via email

@kanterov
Copy link
Member

I did a similar experiment with Spark:

df = spark.createDataFrame(
    [{"userId":"abc", "location":{"longtitude":1.0,"latitude":2.0}}], 
    T.StructType([
        T.StructField("userId", T.StringType()),
        T.StructField("location", T.StructType([
            T.StructField("longtitude", T.DoubleType()),
            T.StructField("latitude", T.DoubleType()),
        ])),        
    ])
)


df
> DataFrame[userId: string, location: struct<longtitude:double,latitude:double>]

df.select("location")
> DataFrame[location: struct<longtitude:double,latitude:double>]

df.select("location.latitude")
> DataFrame[latitude: double]

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Mar 13, 2019 via email

@kanterov
Copy link
Member

kanterov commented Mar 13, 2019

As I understand, the approach is to take the last element in the field path, and use it as name, as I see, BigQuery works the same way:

CREATE TABLE `test.test_schema_2` AS
SELECT location.latitude FROM test.test;
$ bq show --schema test.test_schema_2 | jq
[
  {
    "type": "FLOAT",
    "name": "latitude"
  }
]

@kanterov
Copy link
Member

It seems that Spark allows duplicate column names:

df.select("location.latitude", "location.latitude").printSchema()
root
 |-- latitude: double (nullable = true)
 |-- latitude: double (nullable = true)

And BigQuery doesn't:

Duplicate column names in the result are not supported. Found duplicate(s): latitude

@kennknowles
Copy link
Member

I don't actually see a contradiction here. When you SELECT location or when you df.select("location") the output is a row with a single column containing a location struct. This is because the value of the selected field is a struct and it gets a default name. That's what this is:

[
  {
    "location": {
      "latitude": "1.0",
      "longtitude": "2.0"
    }
  }
]

It isn't maintaining the original structure of the input row, it is a list of rows, and each row has a column you selected, and that column contains a struct.

@kennknowles
Copy link
Member

I'm curious about this one:

CREATE TABLE IF NOT EXISTS test.test (
  userId STRING,
  location STRUCT<latitude FLOAT64, longtitude FLOAT64>
);

INSERT INTO test.test (userId, location) VALUES ("abc", (123.0, 234.0));

SELECT location FROM test.test;
-- Row  location.latitude   location.longtitude 
-- 1    123.0               234.0

SELECT location.latitude FROM gleb_test.test;
-- Row latitude    
-- 1   123.0

The output when you SELECT location is flattened. It should be one column containing a row with the two fields. Is it just for display purposes?

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Mar 13, 2019 via email

@kanterov
Copy link
Member

@kennknowles it isn't flattened, you are right:

SELECT location FROM test.test;

Gives

[
  {
    "fields": [
      {
        "type": "FLOAT",
        "name": "latitude"
      },
      {
        "type": "FLOAT",
        "name": "longtitude"
      }
    ],
    "type": "RECORD",
    "name": "location"
  }
]

@kanterov
Copy link
Member

Makes sense. I tried and both BigQuery and Spark have consistent behavior for selecting rows nested in rows:

df.printSchema()
root
 |-- userId: string (nullable = true)
 |-- position: struct (nullable = true)
 |    |-- location: struct (nullable = true)
 |    |    |-- longtitude: double (nullable = true)
 |    |    |-- latitude: double (nullable = true)

df.select("position.location").printSchema()
root
 |-- location: struct (nullable = true)
 |    |-- longtitude: double (nullable = true)
 |    |-- latitude: double (nullable = true)

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Mar 13, 2019 via email

@reuvenlax
Copy link
Contributor Author

@kanterov I'm changing the semantics here to always return a new row, matching BigQuery and Spark. Will ping once that's done.

@kanterov
Copy link
Member

@reuvenlax To avoid miscommunication, I didn't reply anything not because I wasn't certain about the solution or something wasn't clear, but because I didn't have time to continue the review.

@kanterov
Copy link
Member

kanterov commented Mar 22, 2019

I would try to summarize my understanding of Spark properties:

  • resulting schema of select(<xs : x>), would always have a field named x, and schema will match the corresponding schema for <xs : x> path in the input schema
  • resulting schema select(<path1>, <path2>) is union of select(<path1>) and select(<path2>)

@kanterov
Copy link
Member

I also uploaded the notebook I used for Spark: https://gist.github.com/kanterov/78972d40ebe5b6f5c553b5a018dbb0bf

You can get local Spark instance with Jupyter with:

$ docker run -p8888:8888 -it jupyter/pyspark-notebook

Or using Dataproc

@kanterov
Copy link
Member

Spark has an implementation of schema prunning, that behaves similar to the code in this PR (preserves schema structure). It was introduced in apache/spark#21320, there is a test suite demonstrating it's behavior https://github.com/apache/spark/pull/21320/files#diff-3131013c95ca682e798070a2d50d6896.

As I understand, it isn't used in user-facing API, only for pushing down projections into Parquet data source.

@kennknowles
Copy link
Member

Oh, I see now. The issue is eliding the array layers actually.

@kennknowles
Copy link
Member

I think the summary by @kanterov is super clear are the right approach, but I don't know if I would call that "union" so much as row building.

For SQL I would state this as SELECT <expr1 : name1>, <expr2, name2> ... each result is a new row containing the columns name1, name2 etc and the type of each column is the type of the expression.

Now for raw Beam if you select just a single column and the type of that column can convert to a Java type, you shoulud be able to convert. Specifically:

  • row with one int column convertible to int
  • row with one row column matching a POJO can convert directly

This is a step towards a thing mentioned in the Go SDK coders thread:

  • row with one bytes column w/ coder metadata equivalent to today's approach of coders

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Mar 22, 2019 via email

@reuvenlax
Copy link
Contributor Author

@kanterov I'm surprised that Spark cannot resolve select("a.c.d"). Is there a reason for that?

@kanterov
Copy link
Member

@reuvenlax I was surprised to see that Spark didn't resolve select("a.c.d")

Agree that we should keep iterations quick, and given API is heavily experimental we could move forward with increments. I think we did a good analysis, and now the problem better. However, we still don't have a good formalization of desired semantics.

I'm going to revisit the code again, didn't have time to check this today.

@kanterov
Copy link
Member

@reuvenlax I've tried the following test:

  @Test
  public void testSelectFieldOfRecord() {
    Schema f1 = Schema.builder().addInt64Field("f0").build();
    Schema f2 = Schema.builder().addRowField("f1", f1).build();
    Schema f3 = Schema.builder().addRowField("f2", f2).build();
    
    Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42}
    Row r2 = Row.withSchema(f2).addValue(r1).build();  // {"f1": {"f0": 42}}
    Row r3 = Row.withSchema(f3).addValue(r2).build();  // {"f2": {"f1": {"f0": 42}}}

    FieldAccessDescriptor fieldAccessDescriptor = FieldAccessDescriptor
        .withFieldNames("f2.f1")
        .resolve(f3);

    Schema outputSchema = SelectHelpers.getOutputSchema(f3, fieldAccessDescriptor);

    Row out = SelectHelpers.selectRow(r3, fieldAccessDescriptor, r3.getSchema(), outputSchema);

    assertEquals(outputSchema, f2);
    assertEquals(out, r2);
  }

And it fails with an exception:

ava.lang.IllegalArgumentException: For field name f0 and type INT64 found incorrect class type class org.apache.beam.sdk.values.RowWithStorage
	at org.apache.beam.sdk.values.Row$Builder.verifyPrimitiveType(Row.java:724)
	at org.apache.beam.sdk.values.Row$Builder.verify(Row.java:587)
	at org.apache.beam.sdk.values.Row$Builder.verify(Row.java:571)
	at org.apache.beam.sdk.values.Row$Builder.build(Row.java:748)
	at org.apache.beam.sdk.schemas.utils.SelectHelpers.selectIntoRow(SelectHelpers.java:204)
	at org.apache.beam.sdk.schemas.utils.SelectHelpers.selectRow(SelectHelpers.java:167)
	

I don't see anything wrong with the test itself.

@reuvenlax
Copy link
Contributor Author

Good catch on that test. This was a bug in my code. I've fixed that bug and added your test.

@reuvenlax
Copy link
Contributor Author

@kennknowles and I discussed the array issue. We both think that the best approach is probably to distribute out array selects. However there are other options that should be discussed (e.g. completely unnesting arrays whenever a select happens), so we think this discussion should be brought to the dev list. This PR I believe is generally an improvement, so (assuming other comments are resolved) I think we should merge it and address array select semantics separately.

@reuvenlax
Copy link
Contributor Author

FYI by "distribute array selects" I mean the following:

if you have { a: {b: int, c: int}[] } and someone select "a.b", "a.c" the result should be {b: int[], c: int[]}. The same should be true if someone selects a.*. However if someone selects just a, you get the original schema back.

Copy link
Member

@kanterov kanterov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@reuvenlax sorry for long iterations

I've tried the following test, and it didn't work:

// expected {"f2": {"f1": {"f0": 42}}}
// got {"f1": {"f0": 42}}
  @Test
  public void testSelectFieldOfRecordOrRecord() {
    Schema f1 = Schema.builder().addInt64Field("f0").build();
    Schema f2 = Schema.builder().addRowField("f1", f1).build();
    Schema f3 = Schema.builder().addRowField("f2", f2).build();
    Schema f4 = Schema.builder().addRowField("f3", f3).build();

    Row r1 = Row.withSchema(f1).addValue(42L).build(); // {"f0": 42}
    Row r2 = Row.withSchema(f2).addValue(r1).build(); // {"f1": {"f0": 42}}
    Row r3 = Row.withSchema(f3).addValue(r2).build(); // {"f2": {"f1": {"f0": 42}}}
    Row r4 = Row.withSchema(f4).addValue(r3).build(); // {"f3": {"f2": {"f1": {"f0": 42}}}}

    FieldAccessDescriptor fieldAccessDescriptor =
        FieldAccessDescriptor.withFieldNames("f3.f2").resolve(f4);

    Schema outputSchema = SelectHelpers.getOutputSchema(f3, fieldAccessDescriptor);

    Row out = SelectHelpers.selectRow(r4, fieldAccessDescriptor, r4.getSchema(), outputSchema);

    assertEquals(f3, outputSchema);
    assertEquals(r3, out);
  }

Row out = SelectHelpers.selectRow(r3, fieldAccessDescriptor, r3.getSchema(), outputSchema);

assertEquals(outputSchema, f2);
assertEquals(out, r2);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parameters should be in a different order: expected, actual

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@kanterov
Copy link
Member

kanterov commented Apr 3, 2019

I think we can merge and iterate from there, but there are still issues with semantics that can be discussed separately.

@reuvenlax
Copy link
Contributor Author

@kanterov looking. I suspect my fix to the last issue you found is what caused this use case to break.

@reuvenlax
Copy link
Contributor Author

@kanterov I believe actually that the bug is in your test. You have:
Schema outputSchema = SelectHelpers.getOutputSchema(f3, fieldAccessDescriptor);
when it should be
Schema outputSchema = SelectHelpers.getOutputSchema(f4, fieldAccessDescriptor);

When I fix this, the test now passes.

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

2 similar comments
@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

@kanterov I went ahead and changed array and map selects to distribute the select. This preserves the invariant that the field name selected is always the one that appears in the resulting schema, and I believe is closer to what Spark does.

I still think we should have a thread on the dev list to discuss the detailed semantics, as we still might want to change them. However I think with this last commit, we are probably closer to where we want to end up.

@reuvenlax
Copy link
Contributor Author

@kanterov did you have any further comments on this PR?

@kanterov
Copy link
Member

@reuvenlax sorry for the delay, LGTM 👍 please feel free to merge when you are comfortable with it

@reuvenlax reuvenlax merged commit 323fc9e into apache:master Apr 12, 2019
ibzib pushed a commit to ibzib/beam that referenced this pull request Apr 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants