From 619ad4bd0fc522c14952ba2e9c37c3fd0099053b Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Mon, 22 Mar 2021 09:52:53 +0100 Subject: [PATCH] [FLINK-21872][table-api-java] Add utility for DataStream API's DataType, Schema, and projection This closes #15345. --- .../catalog/ExternalSchemaTranslator.java | 332 ++++++++++++++++++ .../catalog/ExternalSchemaTranslatorTest.java | 249 +++++++++++++ .../org/apache/flink/table/api/Schema.java | 9 +- .../table/expressions/SqlCallExpression.java | 18 + 4 files changed, 607 insertions(+), 1 deletion(-) create mode 100644 flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java create mode 100644 flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java new file mode 100644 index 0000000000000..6f7f240e7afba --- /dev/null +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ExternalSchemaTranslator.java @@ -0,0 +1,332 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.Schema.UnresolvedColumn; +import org.apache.flink.table.api.Schema.UnresolvedPhysicalColumn; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.logical.LogicalType; +import org.apache.flink.table.types.logical.LogicalTypeRoot; +import org.apache.flink.table.types.logical.RowType; +import org.apache.flink.table.types.logical.StructuredType; +import org.apache.flink.table.types.logical.utils.LogicalTypeChecks; +import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; +import org.apache.flink.table.types.utils.DataTypeUtils; +import org.apache.flink.table.types.utils.TypeInfoDataTypeConverter; + +import javax.annotation.Nullable; + +import java.util.Collections; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import static org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasRoot; + +/** + * Utility to derive a physical {@link DataType}, {@link Schema}, and projections when entering or + * leaving the table ecosystem from and to other APIs where {@link TypeInformation} is required. + */ +public final class ExternalSchemaTranslator { + + /** + * Converts the given {@link TypeInformation} and an optional declared {@link Schema} (possibly + * incomplete) into the final {@link InputResult}. + * + *

This method serves three types of use cases: + * + *

+ */ + public static InputResult fromExternal( + DataTypeFactory dataTypeFactory, + TypeInformation inputTypeInfo, + @Nullable Schema declaredSchema) { + final DataType inputDataType = + TypeInfoDataTypeConverter.toDataType(dataTypeFactory, inputTypeInfo); + final LogicalType inputType = inputDataType.getLogicalType(); + + // we don't allow modifying the number of columns during enrichment, therefore we preserve + // whether the original type was qualified as a top-level record or not + final boolean isTopLevelRecord = LogicalTypeChecks.isCompositeType(inputType); + + // no schema has been declared by the user, + // the schema will be entirely derived from the input + if (declaredSchema == null) { + final Schema.Builder builder = Schema.newBuilder(); + addPhysicalDataTypeFields(builder, inputDataType); + return new InputResult(inputDataType, isTopLevelRecord, builder.build(), null); + } + + final List declaredColumns = declaredSchema.getColumns(); + + // the declared schema does not contain physical information, + // thus, it only enriches the non-physical column parts + if (declaredColumns.stream().noneMatch(ExternalSchemaTranslator::isPhysical)) { + final Schema.Builder builder = Schema.newBuilder(); + addPhysicalDataTypeFields(builder, inputDataType); + builder.fromSchema(declaredSchema); + return new InputResult(inputDataType, isTopLevelRecord, builder.build(), null); + } + + // the declared schema enriches the physical data type and the derived schema, + // it possibly projects the result + final DataType patchedDataType = + patchDataTypeFromDeclaredSchema(dataTypeFactory, inputDataType, declaredColumns); + final Schema patchedSchema = + createPatchedSchema(isTopLevelRecord, patchedDataType, declaredSchema); + final int[] projections = extractProjections(patchedSchema, declaredSchema); + return new InputResult(patchedDataType, isTopLevelRecord, patchedSchema, projections); + } + + private static int[] extractProjections(Schema patchedSchema, Schema declaredSchema) { + final List patchedColumns = + patchedSchema.getColumns().stream() + .map(UnresolvedColumn::getName) + .collect(Collectors.toList()); + return declaredSchema.getColumns().stream() + .map(UnresolvedColumn::getName) + .mapToInt(patchedColumns::indexOf) + .toArray(); + } + + private static Schema createPatchedSchema( + boolean isTopLevelRecord, DataType patchedDataType, Schema declaredSchema) { + final Schema.Builder builder = Schema.newBuilder(); + + // physical columns + if (isTopLevelRecord) { + addPhysicalDataTypeFields(builder, patchedDataType); + } else { + builder.column( + LogicalTypeUtils.getAtomicName(Collections.emptyList()), patchedDataType); + } + + // remaining schema + final List nonPhysicalColumns = + declaredSchema.getColumns().stream() + .filter(c -> !isPhysical(c)) + .collect(Collectors.toList()); + builder.fromColumns(nonPhysicalColumns); + declaredSchema + .getWatermarkSpecs() + .forEach( + spec -> + builder.watermark( + spec.getColumnName(), spec.getWatermarkExpression())); + declaredSchema + .getPrimaryKey() + .ifPresent( + key -> + builder.primaryKeyNamed( + key.getConstraintName(), key.getColumnNames())); + return builder.build(); + } + + private static DataType patchDataTypeFromDeclaredSchema( + DataTypeFactory dataTypeFactory, + DataType inputDataType, + List declaredColumns) { + final List physicalColumns = + declaredColumns.stream() + .filter(ExternalSchemaTranslator::isPhysical) + .map(UnresolvedPhysicalColumn.class::cast) + .collect(Collectors.toList()); + + DataType patchedDataType = inputDataType; + for (UnresolvedPhysicalColumn physicalColumn : physicalColumns) { + patchedDataType = + patchDataTypeFromColumn(dataTypeFactory, patchedDataType, physicalColumn); + } + return patchedDataType; + } + + private static DataType patchDataTypeFromColumn( + DataTypeFactory dataTypeFactory, + DataType dataType, + UnresolvedPhysicalColumn physicalColumn) { + final List fieldNames = DataTypeUtils.flattenToNames(dataType); + final String columnName = physicalColumn.getName(); + if (!fieldNames.contains(columnName)) { + throw new ValidationException( + String.format( + "Unable to find a field named '%s' in the physical data type derived " + + "from the given type information for schema declaration. " + + "Make sure that the type information is not a generic raw " + + "type. Currently available fields are: %s", + columnName, fieldNames)); + } + final DataType columnDataType = + dataTypeFactory.createDataType(physicalColumn.getDataType()); + final LogicalType type = dataType.getLogicalType(); + + // the following lines make assumptions on what comes out of the TypeInfoDataTypeConverter + // e.g. we can assume that there will be no DISTINCT type and only anonymously defined + // structured types without a super type + if (hasRoot(type, LogicalTypeRoot.ROW)) { + return patchRowDataType(dataType, columnName, columnDataType); + } else if (hasRoot(type, LogicalTypeRoot.STRUCTURED_TYPE)) { + return patchStructuredDataType(dataType, columnName, columnDataType); + } else { + // this also covers the case where a top-level generic type enters the + // Table API, the type can be patched to a more specific type but the schema will still + // keep it nested in a single field without flattening + return columnDataType; + } + } + + private static DataType patchRowDataType( + DataType dataType, String patchedFieldName, DataType patchedFieldDataType) { + final RowType type = (RowType) dataType.getLogicalType(); + final List oldFieldNames = DataTypeUtils.flattenToNames(dataType); + final List oldFieldDataTypes = dataType.getChildren(); + final Class oldConversion = dataType.getConversionClass(); + + final DataTypes.Field[] fields = + patchFields( + oldFieldNames, oldFieldDataTypes, patchedFieldName, patchedFieldDataType); + + final DataType newDataType = DataTypes.ROW(fields).bridgedTo(oldConversion); + if (!type.isNullable()) { + return newDataType.notNull(); + } + return newDataType; + } + + private static DataType patchStructuredDataType( + DataType dataType, String patchedFieldName, DataType patchedFieldDataType) { + final StructuredType type = (StructuredType) dataType.getLogicalType(); + final List oldFieldNames = DataTypeUtils.flattenToNames(dataType); + final List oldFieldDataTypes = dataType.getChildren(); + final Class oldConversion = dataType.getConversionClass(); + + final DataTypes.Field[] fields = + patchFields( + oldFieldNames, oldFieldDataTypes, patchedFieldName, patchedFieldDataType); + + final DataType newDataType = + DataTypes.STRUCTURED( + type.getImplementationClass() + .orElseThrow(IllegalStateException::new), + fields) + .bridgedTo(oldConversion); + if (!type.isNullable()) { + return newDataType.notNull(); + } + return newDataType; + } + + private static DataTypes.Field[] patchFields( + List oldFieldNames, + List oldFieldDataTypes, + String patchedFieldName, + DataType patchedFieldDataType) { + return IntStream.range(0, oldFieldNames.size()) + .mapToObj( + pos -> { + final String oldFieldName = oldFieldNames.get(pos); + final DataType newFieldDataType; + if (oldFieldName.equals(patchedFieldName)) { + newFieldDataType = patchedFieldDataType; + } else { + newFieldDataType = oldFieldDataTypes.get(pos); + } + return DataTypes.FIELD(oldFieldName, newFieldDataType); + }) + .toArray(DataTypes.Field[]::new); + } + + private static void addPhysicalDataTypeFields(Schema.Builder builder, DataType dataType) { + final List fieldDataTypes = DataTypeUtils.flattenToDataTypes(dataType); + final List fieldNames = DataTypeUtils.flattenToNames(dataType); + builder.fromFields(fieldNames, fieldDataTypes); + } + + private static boolean isPhysical(UnresolvedColumn column) { + return column instanceof UnresolvedPhysicalColumn; + } + + // -------------------------------------------------------------------------------------------- + // Result representation + // -------------------------------------------------------------------------------------------- + + /** Result of {@link #fromExternal(DataTypeFactory, TypeInformation, Schema)}. */ + public static class InputResult { + + /** + * Data type expected from the first table ecosystem operator for input conversion. The data + * type might not be a row type and can possibly be nullable. + */ + private final DataType physicalDataType; + + /** + * Whether the first table ecosystem operator should treat the physical record as top-level + * record and thus perform implicit flattening. Otherwise the record needs to be wrapped in + * a top-level row. + */ + private final boolean isTopLevelRecord; + + /** + * Schema derived from the physical data type. It does not include the projections of the + * user-provided schema. + */ + private final Schema schema; + + /** + * List of indices to adjust the presents and order of columns from {@link #schema} for the + * final column structure. + */ + private final @Nullable int[] projections; + + private InputResult( + DataType physicalDataType, + boolean isTopLevelRecord, + Schema schema, + @Nullable int[] projections) { + this.physicalDataType = physicalDataType; + this.isTopLevelRecord = isTopLevelRecord; + this.schema = schema; + this.projections = projections; + } + + public DataType getPhysicalDataType() { + return physicalDataType; + } + + public boolean isTopLevelRecord() { + return isTopLevelRecord; + } + + public Schema getSchema() { + return schema; + } + + public @Nullable int[] getProjections() { + return projections; + } + } +} diff --git a/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java new file mode 100644 index 0000000000000..367fc95cb248c --- /dev/null +++ b/flink-table/flink-table-api-java/src/test/java/org/apache/flink/table/catalog/ExternalSchemaTranslatorTest.java @@ -0,0 +1,249 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.catalog; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeinfo.Types; +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.api.Schema; +import org.apache.flink.table.api.ValidationException; +import org.apache.flink.table.types.utils.DataTypeFactoryMock; +import org.apache.flink.types.Row; + +import org.junit.Test; + +import java.math.BigDecimal; +import java.time.DayOfWeek; +import java.util.Optional; + +import static org.apache.flink.core.testutils.FlinkMatchers.containsMessage; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** Tests for {@link ExternalSchemaTranslator}. */ +public class ExternalSchemaTranslatorTest { + + @Test + public void testInputFromRow() { + final TypeInformation inputTypeInfo = + Types.ROW(Types.ROW(Types.INT, Types.BOOLEAN), Types.ENUM(DayOfWeek.class)); + + final ExternalSchemaTranslator.InputResult result = + ExternalSchemaTranslator.fromExternal( + dataTypeFactoryWithRawType(DayOfWeek.class), inputTypeInfo, null); + + assertEquals( + DataTypes.ROW( + DataTypes.FIELD( + "f0", + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN()))), + DataTypes.FIELD( + "f1", DataTypeFactoryMock.dummyRaw(DayOfWeek.class))) + .notNull(), + result.getPhysicalDataType()); + + assertTrue(result.isTopLevelRecord()); + + assertEquals( + Schema.newBuilder() + .column( + "f0", + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BOOLEAN()))) + .column("f1", DataTypeFactoryMock.dummyRaw(DayOfWeek.class)) + .build(), + result.getSchema()); + + assertNull(result.getProjections()); + } + + @Test + public void testInputFromAtomic() { + final TypeInformation inputTypeInfo = Types.GENERIC(Row.class); + + final ExternalSchemaTranslator.InputResult result = + ExternalSchemaTranslator.fromExternal( + dataTypeFactoryWithRawType(Row.class), inputTypeInfo, null); + + assertEquals(DataTypeFactoryMock.dummyRaw(Row.class), result.getPhysicalDataType()); + + assertFalse(result.isTopLevelRecord()); + + assertEquals( + Schema.newBuilder().column("f0", DataTypeFactoryMock.dummyRaw(Row.class)).build(), + result.getSchema()); + + assertNull(result.getProjections()); + } + + @Test + public void testInputFromRowWithNonPhysicalDeclaredSchema() { + final TypeInformation inputTypeInfo = Types.ROW(Types.INT, Types.LONG); + + final ExternalSchemaTranslator.InputResult result = + ExternalSchemaTranslator.fromExternal( + dataTypeFactory(), + inputTypeInfo, + Schema.newBuilder() + .columnByExpression("computed", "f1 + 42") + .columnByExpression("computed2", "f1 - 1") + .primaryKeyNamed("pk", "f0") + .build()); + + assertEquals( + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BIGINT())) + .notNull(), + result.getPhysicalDataType()); + + assertTrue(result.isTopLevelRecord()); + + assertEquals( + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .column("f1", DataTypes.BIGINT()) + .columnByExpression("computed", "f1 + 42") + .columnByExpression("computed2", "f1 - 1") + .primaryKeyNamed("pk", "f0") + .build(), + result.getSchema()); + + assertNull(result.getProjections()); + } + + @Test + public void testInputFromRowWithPhysicalDeclaredSchema() { + final TypeInformation inputTypeInfo = + Types.ROW(Types.INT, Types.LONG, Types.GENERIC(BigDecimal.class), Types.BOOLEAN); + + final ExternalSchemaTranslator.InputResult result = + ExternalSchemaTranslator.fromExternal( + dataTypeFactoryWithRawType(BigDecimal.class), + inputTypeInfo, + Schema.newBuilder() + .primaryKeyNamed("pk", "f0") + .column("f1", DataTypes.BIGINT()) // reordered + .column("f0", DataTypes.INT()) + .columnByExpression("computed", "f1 + 42") + .column("f2", DataTypes.DECIMAL(10, 2)) // enriches + .columnByExpression("computed2", "f1 - 1") + .build()); + + assertEquals( + DataTypes.ROW( + DataTypes.FIELD("f0", DataTypes.INT()), + DataTypes.FIELD("f1", DataTypes.BIGINT()), + DataTypes.FIELD("f2", DataTypes.DECIMAL(10, 2)), + DataTypes.FIELD("f3", DataTypes.BOOLEAN())) + .notNull(), + result.getPhysicalDataType()); + + assertTrue(result.isTopLevelRecord()); + + assertEquals( + Schema.newBuilder() + .column("f0", DataTypes.INT()) + .column("f1", DataTypes.BIGINT()) + .column("f2", DataTypes.DECIMAL(10, 2)) + .column("f3", DataTypes.BOOLEAN()) + .columnByExpression("computed", "f1 + 42") + .columnByExpression("computed2", "f1 - 1") + .primaryKeyNamed("pk", "f0") + .build(), + result.getSchema()); + + assertArrayEquals(new int[] {1, 0, 4, 2, 5}, result.getProjections()); + } + + @Test + public void testInputFromAtomicWithPhysicalDeclaredSchema() { + final TypeInformation inputTypeInfo = Types.GENERIC(Row.class); + + final ExternalSchemaTranslator.InputResult result = + ExternalSchemaTranslator.fromExternal( + dataTypeFactoryWithRawType(Row.class), + inputTypeInfo, + Schema.newBuilder() + .columnByExpression("f0_0", "f0.f0_0") + .column( + "f0", + DataTypes.ROW( + DataTypes.FIELD("f0_0", DataTypes.INT()), + DataTypes.FIELD("f0_1", DataTypes.BOOLEAN()))) + .columnByExpression("f0_1", "f0.f0_1") + .build()); + + assertEquals( + DataTypes.ROW( + DataTypes.FIELD("f0_0", DataTypes.INT()), + DataTypes.FIELD("f0_1", DataTypes.BOOLEAN())), + result.getPhysicalDataType()); + + assertFalse(result.isTopLevelRecord()); + + assertEquals( + Schema.newBuilder() + .column( + "f0", + DataTypes.ROW( + DataTypes.FIELD("f0_0", DataTypes.INT()), + DataTypes.FIELD("f0_1", DataTypes.BOOLEAN()))) + .columnByExpression("f0_0", "f0.f0_0") + .columnByExpression("f0_1", "f0.f0_1") + .build(), + result.getSchema()); + + assertArrayEquals(new int[] {1, 0, 2}, result.getProjections()); + } + + @Test + public void testInvalidDeclaredSchemaColumn() { + final TypeInformation inputTypeInfo = Types.ROW(Types.INT, Types.LONG); + + try { + ExternalSchemaTranslator.fromExternal( + dataTypeFactory(), + inputTypeInfo, + Schema.newBuilder().column("INVALID", DataTypes.BIGINT()).build()); + } catch (ValidationException e) { + assertThat( + e, + containsMessage( + "Unable to find a field named 'INVALID' in the physical data type")); + } + } + + private static DataTypeFactory dataTypeFactoryWithRawType(Class rawType) { + final DataTypeFactoryMock dataTypeFactory = new DataTypeFactoryMock(); + dataTypeFactory.dataType = Optional.of(DataTypeFactoryMock.dummyRaw(rawType)); + return dataTypeFactory; + } + + private static DataTypeFactory dataTypeFactory() { + return new DataTypeFactoryMock(); + } +} diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java index bdf81f2a68246..bf99fbd8f1ba3 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/api/Schema.java @@ -61,7 +61,8 @@ * *

This class is used in the API and catalogs to define an unresolved schema that will be * translated to {@link ResolvedSchema}. Some methods of this class perform basic validation, - * however, the main validation happens during the resolution. + * however, the main validation happens during the resolution. Thus, an unresolved schema can be + * incomplete and might be enriched or merged with a different schema at a later stage. * *

Since an instance of this class is unresolved, it should not be directly persisted. The {@link * #toString()} shows only a summary of the contained objects. @@ -213,6 +214,12 @@ public Builder fromFields( return this; } + /** Adopts all columns from the given list. */ + public Builder fromColumns(List unresolvedColumns) { + columns.addAll(unresolvedColumns); + return this; + } + /** * Declares a physical column that is appended to this schema. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/SqlCallExpression.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/SqlCallExpression.java index ee0444904ebc5..637f4326b0ff7 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/SqlCallExpression.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/expressions/SqlCallExpression.java @@ -22,6 +22,7 @@ import java.util.Collections; import java.util.List; +import java.util.Objects; /** * A call to a SQL expression. @@ -65,6 +66,23 @@ public R accept(ExpressionVisitor visitor) { return visitor.visit(this); } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SqlCallExpression that = (SqlCallExpression) o; + return sqlExpression.equals(that.sqlExpression); + } + + @Override + public int hashCode() { + return Objects.hash(sqlExpression); + } + @Override public String toString() { return asSummaryString();