Skip to content

Commit

Permalink
Merge pull request apache#11290: [BEAM-9670] Fix nullability widening…
Browse files Browse the repository at this point in the history
… in CoGroup key resolution
  • Loading branch information
reuvenlax committed Apr 2, 2020
1 parent 63d57d8 commit 5504f32
Show file tree
Hide file tree
Showing 6 changed files with 261 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas;

import org.apache.beam.sdk.schemas.Schema.FieldType;

/** A set of utility functions for schemas. */
public class SchemaUtils {
/**
* Given two schema that have matching types, return a nullable-widened schema.
*
* <p>The schemas must have matching types, except for field names which can differ. The returned
* schema will contain the field names in the first schema. All field types will be nullable if
* the corresponding field type is nullable in either of the input schemas.
*/
public static Schema mergeWideningNullable(Schema schema1, Schema schema2) {
if (schema1.getFieldCount() != schema2.getFieldCount()) {
throw new IllegalArgumentException(
"Cannot merge schemas with different numbers of fields. "
+ "schema1: "
+ schema1
+ " schema2: "
+ schema2);
}
Schema.Builder builder = Schema.builder();
for (int i = 0; i < schema1.getFieldCount(); ++i) {
String name = schema1.getField(i).getName();
builder.addField(
name, widenNullableTypes(schema1.getField(i).getType(), schema2.getField(i).getType()));
}
return builder.build();
}

static FieldType widenNullableTypes(FieldType fieldType1, FieldType fieldType2) {
if (fieldType1.getTypeName() != fieldType2.getTypeName()) {
throw new IllegalArgumentException(
"Cannot merge two types: "
+ fieldType1.getTypeName()
+ " and "
+ fieldType2.getTypeName());
}

FieldType result;
switch (fieldType1.getTypeName()) {
case ROW:
result =
FieldType.row(
mergeWideningNullable(fieldType1.getRowSchema(), fieldType2.getRowSchema()));
break;
case ARRAY:
FieldType arrayElementType =
widenNullableTypes(
fieldType1.getCollectionElementType(), fieldType2.getCollectionElementType());
result = FieldType.array(arrayElementType);
break;
case ITERABLE:
FieldType iterableElementType =
widenNullableTypes(
fieldType1.getCollectionElementType(), fieldType2.getCollectionElementType());
result = FieldType.iterable(iterableElementType);
break;
case MAP:
FieldType keyType =
widenNullableTypes(fieldType1.getMapKeyType(), fieldType2.getMapKeyType());
FieldType valueType =
widenNullableTypes(fieldType1.getMapValueType(), fieldType2.getMapValueType());
result = FieldType.map(keyType, valueType);
break;
case LOGICAL_TYPE:
if (!fieldType1
.getLogicalType()
.getIdentifier()
.equals(fieldType2.getLogicalType().getIdentifier())) {
throw new IllegalArgumentException(
"Logical types don't match and cannot be merged: "
+ fieldType1.getLogicalType().getIdentifier()
+ ".v.s"
+ fieldType2.getLogicalType().getIdentifier());
}
// fall through
default:
result = fieldType1;
}
return result.withNullable(fieldType1.getNullable() || fieldType2.getNullable());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.schemas.SchemaUtils;
import org.apache.beam.sdk.schemas.transforms.CoGroup.ConvertCoGbkResult.ConvertType;
import org.apache.beam.sdk.schemas.utils.RowSelector;
import org.apache.beam.sdk.schemas.utils.SelectHelpers;
Expand Down Expand Up @@ -399,9 +400,7 @@ private static JoinInformation from(
if (keySchema == null) {
keySchema = currentKeySchema;
} else {
if (!currentKeySchema.typesEqual(keySchema)) {
throw new IllegalStateException("All keys must have the same schema");
}
keySchema = SchemaUtils.mergeWideningNullable(keySchema, currentKeySchema);
}

// Create a new tag for the output.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.schemas;

import static org.junit.Assert.assertEquals;

import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.junit.Test;

/** Tests for {@link org.apache.beam.sdk.schemas.SchemaUtils}. */
public class SchemaUtilsTest {
@Test
public void testWidenPrimitives() {
Schema schema1 =
Schema.builder()
.addField("field1", FieldType.INT32)
.addNullableField("field2", FieldType.STRING)
.build();
Schema schema2 =
Schema.builder()
.addNullableField("field3", FieldType.INT32)
.addField("field4", FieldType.STRING)
.build();
Schema expected =
Schema.builder()
.addNullableField("field1", FieldType.INT32)
.addNullableField("field2", FieldType.STRING)
.build();
assertEquals(expected, SchemaUtils.mergeWideningNullable(schema1, schema2));
}

@Test
public void testWidenNested() {
Schema schema1 =
Schema.builder()
.addField("field1", FieldType.INT32)
.addNullableField("field2", FieldType.STRING)
.build();
Schema schema2 =
Schema.builder()
.addNullableField("field3", FieldType.INT32)
.addField("field4", FieldType.STRING)
.build();
Schema top1 = Schema.builder().addField("top1", FieldType.row(schema1)).build();
Schema top2 = Schema.builder().addField("top2", FieldType.row(schema2)).build();
Schema expected =
Schema.builder()
.addNullableField("field1", FieldType.INT32)
.addNullableField("field2", FieldType.STRING)
.build();
Schema expectedTop = Schema.builder().addField("top1", FieldType.row(expected)).build();

assertEquals(expectedTop, SchemaUtils.mergeWideningNullable(top1, top2));
}

@Test
public void testWidenArray() {
Schema schema1 = Schema.builder().addArrayField("field1", FieldType.INT32).build();
Schema schema2 =
Schema.builder().addArrayField("field1", FieldType.INT32.withNullable(true)).build();
Schema expected =
Schema.builder().addArrayField("field1", FieldType.INT32.withNullable(true)).build();
assertEquals(expected, SchemaUtils.mergeWideningNullable(schema1, schema2));
}

@Test
public void testWidenIterable() {
Schema schema1 = Schema.builder().addIterableField("field1", FieldType.INT32).build();
Schema schema2 =
Schema.builder().addIterableField("field1", FieldType.INT32.withNullable(true)).build();
Schema expected =
Schema.builder().addIterableField("field1", FieldType.INT32.withNullable(true)).build();
assertEquals(expected, SchemaUtils.mergeWideningNullable(schema1, schema2));
}

@Test
public void testWidenMap() {
Schema schema1 =
Schema.builder().addMapField("field1", FieldType.INT32, FieldType.INT32).build();
Schema schema2 =
Schema.builder()
.addMapField(
"field1", FieldType.INT32.withNullable(true), FieldType.INT32.withNullable(true))
.build();
Schema expected =
Schema.builder()
.addMapField(
"field1", FieldType.INT32.withNullable(true), FieldType.INT32.withNullable(true))
.build();
assertEquals(expected, SchemaUtils.mergeWideningNullable(schema1, schema2));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ public void testMismatchingKeys() {
Create.of(Row.withSchema(CG_SCHEMA_1).addValues("user1", 9, "us").build()))
.setRowSchema(CG_SCHEMA_1);

thrown.expect(IllegalStateException.class);
thrown.expect(IllegalArgumentException.class);
PCollection<Row> joined =
PCollectionTuple.of("pc1", pc1, "pc2", pc2)
.apply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@

import java.io.Serializable;
import org.apache.beam.sdk.extensions.sql.impl.transform.BeamSetOperatorsTransforms;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.transforms.CoGroup;
import org.apache.beam.sdk.schemas.transforms.CoGroup.By;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -63,16 +62,6 @@ public PCollection<Row> expand(PCollectionList<Row> inputs) {
inputs);
PCollection<Row> leftRows = inputs.get(0);
PCollection<Row> rightRows = inputs.get(1);
Schema leftSchema = leftRows.getSchema();
Schema rightSchema = rightRows.getSchema();
if (!leftSchema.typesEqual(rightSchema)) {
throw new IllegalArgumentException(
"Can't intersect two tables with different schemas."
+ "lhsSchema: "
+ leftSchema
+ " rhsSchema: "
+ rightSchema);
}

WindowFn leftWindow = leftRows.getWindowingStrategy().getWindowFn();
WindowFn rightWindow = rightRows.getWindowingStrategy().getWindowFn();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2270,6 +2270,56 @@ public void testTimestampLiteralWithUTCTimeZone() {
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}

@Test
public void testSelectNullIntersectDistinct() {
String sql = "SELECT NULL INTERSECT DISTINCT SELECT 2";

ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
System.err.println("SCHEMA " + stream.getSchema());

PAssert.that(stream).empty();
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}

@Test
public void testSelectNullIntersectAll() {
String sql = "SELECT NULL INTERSECT ALL SELECT 2";

ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);
System.err.println("SCHEMA " + stream.getSchema());

PAssert.that(stream).empty();
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}

@Test
public void testSelectNullExceptDistinct() {
String sql = "SELECT NULL EXCEPT DISTINCT SELECT 2";

ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);

PAssert.that(stream).containsInAnyOrder(Row.nullRow(stream.getSchema()));
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}

@Test
public void testSelectNullExceptAll() {
String sql = "SELECT NULL EXCEPT ALL SELECT 2";

ZetaSQLQueryPlanner zetaSQLQueryPlanner = new ZetaSQLQueryPlanner(config);
BeamRelNode beamRelNode = zetaSQLQueryPlanner.convertToBeamRel(sql);
PCollection<Row> stream = BeamSqlRelUtils.toPCollection(pipeline, beamRelNode);

PAssert.that(stream).containsInAnyOrder(Row.nullRow(stream.getSchema()));
pipeline.run().waitUntilFinish(Duration.standardMinutes(PIPELINE_EXECUTION_WAITTIME_MINUTES));
}

@Test
public void testTimestampLiteralWithNonUTCTimeZone() {
String sql = "SELECT TIMESTAMP '2018-12-10 10:38:59-10:00'";
Expand Down

0 comments on commit 5504f32

Please sign in to comment.