Skip to content

Commit

Permalink
[FLINK-17000][table] Ensure that every logical type can be represente…
Browse files Browse the repository at this point in the history
…d as TypeInformation

Introduces a WrapperTypeInfo that can replace most (if not all) TypeInformation classes
in the Blink planner. It is backed by logical types and uses internal serializers.

This closes apache#12852.
  • Loading branch information
twalthr committed Jul 9, 2020
1 parent df4f9bc commit 584dca1
Show file tree
Hide file tree
Showing 23 changed files with 184 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestDynamicTableFactory;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.typeutils.WrapperTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;

Expand All @@ -58,7 +58,7 @@ public class AvroFormatFactoryTest extends TestLogger {
@Test
public void testSeDeSchema() {
final AvroRowDataDeserializationSchema expectedDeser =
new AvroRowDataDeserializationSchema(ROW_TYPE, new RowDataTypeInfo(ROW_TYPE));
new AvroRowDataDeserializationSchema(ROW_TYPE, WrapperTypeInfo.of(ROW_TYPE));

final Map<String, String> options = getAllOptions();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import org.apache.flink.formats.avro.typeutils.AvroSchemaConverter;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.typeutils.WrapperTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;

Expand Down Expand Up @@ -96,7 +96,7 @@ public void testSerializeDeserialize() throws Exception {
FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))),
FIELD("map2array", MAP(STRING(), ARRAY(INT()))));
final RowType rowType = (RowType) dataType.getLogicalType();
final TypeInformation<RowData> typeInfo = new RowDataTypeInfo(rowType);
final TypeInformation<RowData> typeInfo = WrapperTypeInfo.of(rowType);

final Schema schema = AvroSchemaConverter.convertToSchema(rowType);
final GenericRecord record = new GenericData.Record(schema);
Expand Down Expand Up @@ -180,7 +180,7 @@ public void testSpecificType() throws Exception {
FIELD("type_date", DATE()),
FIELD("type_time_millis", TIME(3)));
final RowType rowType = (RowType) dataType.getLogicalType();
final TypeInformation<RowData> typeInfo = new RowDataTypeInfo(rowType);
final TypeInformation<RowData> typeInfo = WrapperTypeInfo.of(rowType);
AvroRowDataSerializationSchema serializationSchema = new AvroRowDataSerializationSchema(rowType);
serializationSchema.open(null);
AvroRowDataDeserializationSchema deserializationSchema =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestDynamicTableFactory;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.typeutils.WrapperTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -65,7 +65,7 @@ public class CsvFormatFactoryTest extends TestLogger {
@Test
public void testSeDeSchema() {
final CsvRowDataDeserializationSchema expectedDeser =
new CsvRowDataDeserializationSchema.Builder(ROW_TYPE, new RowDataTypeInfo(ROW_TYPE))
new CsvRowDataDeserializationSchema.Builder(ROW_TYPE, WrapperTypeInfo.of(ROW_TYPE))
.setFieldDelimiter(';')
.setQuoteCharacter('\'')
.setAllowComments(true)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.typeutils.WrapperTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -276,7 +276,7 @@ private void testField(

// deserialization
CsvRowDataDeserializationSchema.Builder deserSchemaBuilder =
new CsvRowDataDeserializationSchema.Builder(rowType, new RowDataTypeInfo(rowType));
new CsvRowDataDeserializationSchema.Builder(rowType, WrapperTypeInfo.of(rowType));
deserializationConfig.accept(deserSchemaBuilder);
RowData deserializedRow = deserialize(deserSchemaBuilder, expectedCsv);

Expand Down Expand Up @@ -304,7 +304,7 @@ private void testField(

// deserialization
CsvRowDataDeserializationSchema.Builder deserSchemaBuilder =
new CsvRowDataDeserializationSchema.Builder(rowType, new RowDataTypeInfo(rowType));
new CsvRowDataDeserializationSchema.Builder(rowType, WrapperTypeInfo.of(rowType));
deserializationConfig.accept(deserSchemaBuilder);
RowData deserializedRow = deserialize(deserSchemaBuilder, csv);
Row actualRow = (Row) DataFormatConverters.getConverterForDataType(dataType)
Expand All @@ -323,7 +323,7 @@ private Row testDeserialization(
FIELD("f2", STRING()));
RowType rowType = (RowType) dataType.getLogicalType();
CsvRowDataDeserializationSchema.Builder deserSchemaBuilder =
new CsvRowDataDeserializationSchema.Builder(rowType, new RowDataTypeInfo(rowType))
new CsvRowDataDeserializationSchema.Builder(rowType, WrapperTypeInfo.of(rowType))
.setIgnoreParseErrors(allowParsingErrors)
.setAllowComments(allowComments);
RowData deserializedRow = deserialize(deserSchemaBuilder, string);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
import org.apache.flink.table.factories.TestDynamicTableFactory;
import org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.typeutils.WrapperTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -118,7 +118,7 @@ private void testSchemaDeserializationSchema(Map<String, String> options) {
final JsonRowDataDeserializationSchema expectedDeser =
new JsonRowDataDeserializationSchema(
ROW_TYPE,
new RowDataTypeInfo(ROW_TYPE),
WrapperTypeInfo.of(ROW_TYPE),
false,
true,
TimestampFormat.ISO_8601);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@

package org.apache.flink.formats.json;

import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.util.DataFormatConverters;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.typeutils.WrapperTypeInfo;
import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -140,7 +141,7 @@ public void testSerDe() throws Exception {
FIELD("map", MAP(STRING(), BIGINT())),
FIELD("map2map", MAP(STRING(), MAP(STRING(), INT()))));
RowType schema = (RowType) dataType.getLogicalType();
RowDataTypeInfo resultTypeInfo = new RowDataTypeInfo(schema);
TypeInformation<RowData> resultTypeInfo = WrapperTypeInfo.of(schema);

JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
schema, resultTypeInfo, false, false, TimestampFormat.ISO_8601);
Expand Down Expand Up @@ -211,7 +212,7 @@ public void testSlowDeserialization() throws Exception {
RowType rowType = (RowType) dataType.getLogicalType();

JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType, new RowDataTypeInfo(rowType), false, false, TimestampFormat.ISO_8601);
rowType, WrapperTypeInfo.of(rowType), false, false, TimestampFormat.ISO_8601);

Row expected = new Row(7);
expected.setField(0, bool);
Expand All @@ -236,7 +237,7 @@ public void testSerDeMultiRows() throws Exception {
).getLogicalType();

JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType, new RowDataTypeInfo(rowType), false, false, TimestampFormat.ISO_8601);
rowType, WrapperTypeInfo.of(rowType), false, false, TimestampFormat.ISO_8601);
JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema(rowType, TimestampFormat.ISO_8601);

ObjectMapper objectMapper = new ObjectMapper();
Expand Down Expand Up @@ -290,7 +291,7 @@ public void testSerDeMultiRowsWithNullValues() throws Exception {
).getLogicalType();

JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType, new RowDataTypeInfo(rowType), false, true, TimestampFormat.ISO_8601);
rowType, WrapperTypeInfo.of(rowType), false, true, TimestampFormat.ISO_8601);
JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema(rowType, TimestampFormat.ISO_8601);

for (int i = 0; i < jsons.length; i++) {
Expand All @@ -315,23 +316,23 @@ public void testDeserializationMissingNode() throws Exception {

// pass on missing field
JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
schema, new RowDataTypeInfo(schema), false, false, TimestampFormat.ISO_8601);
schema, WrapperTypeInfo.of(schema), false, false, TimestampFormat.ISO_8601);

Row expected = new Row(1);
Row actual = convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
assertEquals(expected, actual);

// fail on missing field
deserializationSchema = deserializationSchema = new JsonRowDataDeserializationSchema(
schema, new RowDataTypeInfo(schema), true, false, TimestampFormat.ISO_8601);
schema, WrapperTypeInfo.of(schema), true, false, TimestampFormat.ISO_8601);

thrown.expect(IOException.class);
thrown.expectMessage("Failed to deserialize JSON '{\"id\":123123123}'");
deserializationSchema.deserialize(serializedJson);

// ignore on parse error
deserializationSchema = new JsonRowDataDeserializationSchema(
schema, new RowDataTypeInfo(schema), false, true, TimestampFormat.ISO_8601);
schema, WrapperTypeInfo.of(schema), false, true, TimestampFormat.ISO_8601);
actual = convertToExternal(deserializationSchema.deserialize(serializedJson), dataType);
assertEquals(expected, actual);

Expand All @@ -340,7 +341,7 @@ public void testDeserializationMissingNode() throws Exception {
// failOnMissingField and ignoreParseErrors both enabled
//noinspection ConstantConditions
new JsonRowDataDeserializationSchema(
schema, new RowDataTypeInfo(schema), true, true, TimestampFormat.ISO_8601);
schema, WrapperTypeInfo.of(schema), true, true, TimestampFormat.ISO_8601);
}

@Test
Expand All @@ -351,7 +352,7 @@ public void testSerDeSQLTimestampFormat() throws Exception{
).getLogicalType();

JsonRowDataDeserializationSchema deserializationSchema = new JsonRowDataDeserializationSchema(
rowType, new RowDataTypeInfo(rowType), false, false, TimestampFormat.SQL);
rowType, WrapperTypeInfo.of(rowType), false, false, TimestampFormat.SQL);
JsonRowDataSerializationSchema serializationSchema = new JsonRowDataSerializationSchema(rowType, TimestampFormat.SQL);

ObjectMapper objectMapper = new ObjectMapper();
Expand All @@ -378,7 +379,7 @@ public void testJsonParse() throws Exception {
private void testIgnoreParseErrors(TestSpec spec) throws Exception {
// the parsing field should be null and no exception is thrown
JsonRowDataDeserializationSchema ignoreErrorsSchema = new JsonRowDataDeserializationSchema(
spec.rowType, new RowDataTypeInfo(spec.rowType), false, true,
spec.rowType, WrapperTypeInfo.of(spec.rowType), false, true,
TimestampFormat.ISO_8601);
Row expected;
if (spec.expected != null) {
Expand All @@ -396,7 +397,7 @@ spec.rowType, new RowDataTypeInfo(spec.rowType), false, true,
private void testParseErrors(TestSpec spec) throws Exception {
// expect exception if parse error is not ignored
JsonRowDataDeserializationSchema failingSchema = new JsonRowDataDeserializationSchema(
spec.rowType, new RowDataTypeInfo(spec.rowType), false, false,
spec.rowType, WrapperTypeInfo.of(spec.rowType), false, false,
spec.timestampFormat);

thrown.expectMessage(spec.errorMessage);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.typeutils.WrapperTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;

Expand Down Expand Up @@ -65,7 +65,7 @@ public void testDeserialization() throws Exception {
List<String> lines = readLines("canal-data.txt");
CanalJsonDeserializationSchema deserializationSchema = new CanalJsonDeserializationSchema(
SCHEMA,
new RowDataTypeInfo(SCHEMA),
WrapperTypeInfo.of(SCHEMA),
false,
TimestampFormat.ISO_8601);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestDynamicTableFactory;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.typeutils.WrapperTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -65,7 +65,7 @@ public class CanalJsonFormatFactoryTest extends TestLogger {
public void testSeDeSchema() {
final CanalJsonDeserializationSchema expectedDeser = new CanalJsonDeserializationSchema(
ROW_TYPE,
new RowDataTypeInfo(ROW_TYPE),
WrapperTypeInfo.of(ROW_TYPE),
true,
TimestampFormat.ISO_8601);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.flink.formats.json.TimestampFormat;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.typeutils.WrapperTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.Collector;

Expand Down Expand Up @@ -74,7 +74,7 @@ private void testDeserialization(String resourceFile, boolean schemaInclude) thr
List<String> lines = readLines(resourceFile);
DebeziumJsonDeserializationSchema deserializationSchema = new DebeziumJsonDeserializationSchema(
SCHEMA,
new RowDataTypeInfo(SCHEMA),
WrapperTypeInfo.of(SCHEMA),
schemaInclude,
false,
TimestampFormat.ISO_8601);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.factories.TestDynamicTableFactory;
import org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
import org.apache.flink.table.runtime.typeutils.WrapperTypeInfo;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.util.TestLogger;

Expand Down Expand Up @@ -65,7 +65,7 @@ public class DebeziumJsonFormatFactoryTest extends TestLogger {
public void testSeDeSchema() {
final DebeziumJsonDeserializationSchema expectedDeser = new DebeziumJsonDeserializationSchema(
ROW_TYPE,
new RowDataTypeInfo(ROW_TYPE),
WrapperTypeInfo.of(ROW_TYPE),
true,
true,
TimestampFormat.ISO_8601);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.CompositeType;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.api.ValidationException;
import org.apache.flink.table.types.AtomicDataType;
import org.apache.flink.table.types.CollectionDataType;
import org.apache.flink.table.types.DataType;
Expand Down Expand Up @@ -115,12 +116,64 @@ public static TableSchema expandCompositeTypeToSchema(DataType dataType) {
throw new IllegalArgumentException("Expected a composite type");
}

/**
* The {@link DataType} class can only partially verify the conversion class. This method can perform
* the final check when we know if the data type should be used for input.
*/
public static void validateInputDataType(DataType dataType) {
dataType.accept(DataTypeInputClassValidator.INSTANCE);
}

/**
* The {@link DataType} class can only partially verify the conversion class. This method can perform
* the final check when we know if the data type should be used for output.
*/
public static void validateOutputDataType(DataType dataType) {
dataType.accept(DataTypeOutputClassValidator.INSTANCE);
}

private DataTypeUtils() {
// no instantiation
}

// ------------------------------------------------------------------------------------------

private static class DataTypeInputClassValidator extends DataTypeDefaultVisitor<Void> {

private static final DataTypeInputClassValidator INSTANCE = new DataTypeInputClassValidator();

@Override
protected Void defaultMethod(DataType dataType) {
if (!dataType.getLogicalType().supportsInputConversion(dataType.getConversionClass())) {
throw new ValidationException(
String.format(
"Data type '%s' does not support an input conversion from class '%s'.",
dataType,
dataType.getConversionClass().getName()));
}
dataType.getChildren().forEach(child -> child.accept(this));
return null;
}
}

private static class DataTypeOutputClassValidator extends DataTypeDefaultVisitor<Void> {

private static final DataTypeOutputClassValidator INSTANCE = new DataTypeOutputClassValidator();

@Override
protected Void defaultMethod(DataType dataType) {
if (!dataType.getLogicalType().supportsOutputConversion(dataType.getConversionClass())) {
throw new ValidationException(
String.format(
"Data type '%s' does not support an output conversion to class '%s'.",
dataType,
dataType.getConversionClass().getName()));
}
dataType.getChildren().forEach(child -> child.accept(this));
return null;
}
}

private static class DataTypeTransformer implements DataTypeVisitor<DataType> {

private final TypeTransformation transformation;
Expand Down
Loading

0 comments on commit 584dca1

Please sign in to comment.