From 909e74783153eaa903794101f9b69ce4fbd20204 Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Mon, 19 Oct 2020 14:18:46 +0200 Subject: [PATCH] [hotfix][table-common] Avoid unnecessary casting when creating type information in sources and sinks This it is not a compatible change. But given that those interfaces are still relatively new and not many people have changed to the new sources/sinks. We should do this change now or never and avoid @SuppressWarning in almost all implementations. --- .../flink/connector/jdbc/table/JdbcDynamicTableSink.java | 5 ++--- .../flink/connector/jdbc/table/JdbcDynamicTableSource.java | 7 ++----- .../table/examples/java/connectors/ChangelogCsvFormat.java | 3 +-- .../avro/registry/confluent/RegistryAvroFormatFactory.java | 2 +- .../org/apache/flink/formats/avro/AvroFormatFactory.java | 2 +- .../org/apache/flink/formats/csv/CsvFormatFactory.java | 3 +-- .../org/apache/flink/formats/json/JsonFormatFactory.java | 3 +-- .../flink/formats/json/canal/CanalJsonFormatFactory.java | 3 +-- .../formats/json/debezium/DebeziumJsonFormatFactory.java | 3 +-- .../formats/json/maxwell/MaxwellJsonFormatFactory.java | 3 +-- .../flink/table/connector/sink/DynamicTableSink.java | 2 +- .../flink/table/connector/source/DynamicTableSource.java | 2 +- .../table/planner/factories/TestValuesTableFactory.java | 5 ++--- 13 files changed, 16 insertions(+), 27 deletions(-) diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java index 3e09e1c66f4dc..d750101e6dead 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSink.java @@ -74,10 +74,9 @@ private void validatePrimaryKey(ChangelogMode requestedMode) { } @Override - @SuppressWarnings("unchecked") public SinkRuntimeProvider getSinkRuntimeProvider(Context context) { - final TypeInformation rowDataTypeInformation = (TypeInformation) context - .createTypeInformation(tableSchema.toRowDataType()); + final TypeInformation rowDataTypeInformation = + context.createTypeInformation(tableSchema.toRowDataType()); final JdbcDynamicOutputFormatBuilder builder = new JdbcDynamicOutputFormatBuilder(); builder.setJdbcOptions(jdbcOptions); diff --git a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java index a5c7d596d4eed..f7ef01e37fdf3 100644 --- a/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java +++ b/flink-connectors/flink-connector-jdbc/src/main/java/org/apache/flink/connector/jdbc/table/JdbcDynamicTableSource.java @@ -19,7 +19,6 @@ package org.apache.flink.connector.jdbc.table; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.connector.jdbc.dialect.JdbcDialect; import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions; import org.apache.flink.connector.jdbc.internal.options.JdbcOptions; @@ -33,7 +32,6 @@ import org.apache.flink.table.connector.source.ScanTableSource; import org.apache.flink.table.connector.source.TableFunctionProvider; import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown; -import org.apache.flink.table.data.RowData; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.table.utils.TableSchemaUtils; import org.apache.flink.util.Preconditions; @@ -86,7 +84,6 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) { } @Override - @SuppressWarnings("unchecked") public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { final JdbcRowDataInputFormat.Builder builder = JdbcRowDataInputFormat.builder() .setDrivername(options.getDriverName()) @@ -114,8 +111,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon builder.setQuery(query); final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType(); builder.setRowConverter(dialect.getRowConverter(rowType)); - builder.setRowDataTypeInfo((TypeInformation) runtimeProviderContext - .createTypeInformation(physicalSchema.toRowDataType())); + builder.setRowDataTypeInfo( + runtimeProviderContext.createTypeInformation(physicalSchema.toRowDataType())); return InputFormatProvider.of(builder.build()); } diff --git a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormat.java b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormat.java index c8c86899756b9..7388b345a5e87 100644 --- a/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormat.java +++ b/flink-examples/flink-examples-table/src/main/java/org/apache/flink/table/examples/java/connectors/ChangelogCsvFormat.java @@ -44,10 +44,9 @@ public ChangelogCsvFormat(String columnDelimiter) { } @Override - @SuppressWarnings("unchecked") public DeserializationSchema createRuntimeDecoder(DynamicTableSource.Context context, DataType producedDataType) { // create type information for the DeserializationSchema - final TypeInformation producedTypeInfo = (TypeInformation) context.createTypeInformation(producedDataType); + final TypeInformation producedTypeInfo = context.createTypeInformation(producedDataType); // most of the code in DeserializationSchema will not work on internal data structures // create a converter for conversion at the end diff --git a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java index 9a134254e75e9..8b3c2bcafabfb 100644 --- a/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java +++ b/flink-formats/flink-avro-confluent-registry/src/main/java/org/apache/flink/formats/avro/registry/confluent/RegistryAvroFormatFactory.java @@ -73,7 +73,7 @@ public DeserializationSchema createRuntimeDecoder( DataType producedDataType) { final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation rowDataTypeInfo = - (TypeInformation) context.createTypeInformation(producedDataType); + context.createTypeInformation(producedDataType); return new AvroRowDataDeserializationSchema( ConfluentRegistryAvroDeserializationSchema.forGeneric( AvroSchemaConverter.convertToSchema(rowType), diff --git a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java index d07d17f1a95fb..bb45beb7f7772 100644 --- a/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java +++ b/flink-formats/flink-avro/src/main/java/org/apache/flink/formats/avro/AvroFormatFactory.java @@ -62,7 +62,7 @@ public DeserializationSchema createRuntimeDecoder( DataType producedDataType) { final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation rowDataTypeInfo = - (TypeInformation) context.createTypeInformation(producedDataType); + context.createTypeInformation(producedDataType); return new AvroRowDataDeserializationSchema(rowType, rowDataTypeInfo); } diff --git a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java index 9746ae76dc59d..c86a92fc03ac5 100644 --- a/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java +++ b/flink-formats/flink-csv/src/main/java/org/apache/flink/formats/csv/CsvFormatFactory.java @@ -62,7 +62,6 @@ public final class CsvFormatFactory implements public static final String IDENTIFIER = "csv"; - @SuppressWarnings("unchecked") @Override public DecodingFormat> createDecodingFormat( DynamicTableFactory.Context context, ReadableConfig formatOptions) { @@ -76,7 +75,7 @@ public DeserializationSchema createRuntimeDecoder( DataType producedDataType) { final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation rowDataTypeInfo = - (TypeInformation) context.createTypeInformation(producedDataType); + context.createTypeInformation(producedDataType); final CsvRowDataDeserializationSchema.Builder schemaBuilder = new CsvRowDataDeserializationSchema.Builder( rowType, diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java index 57952b9209332..1fe5da128b1bd 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/JsonFormatFactory.java @@ -56,7 +56,6 @@ public class JsonFormatFactory implements public static final String IDENTIFIER = "json"; - @SuppressWarnings("unchecked") @Override public DecodingFormat> createDecodingFormat( DynamicTableFactory.Context context, @@ -75,7 +74,7 @@ public DeserializationSchema createRuntimeDecoder( DataType producedDataType) { final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation rowDataTypeInfo = - (TypeInformation) context.createTypeInformation(producedDataType); + context.createTypeInformation(producedDataType); return new JsonRowDataDeserializationSchema( rowType, rowDataTypeInfo, diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java index c5d649daa7951..0afc96c294d56 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/canal/CanalJsonFormatFactory.java @@ -67,7 +67,6 @@ public class CanalJsonFormatFactory implements DeserializationFormatFactory, Ser .noDefaultValue() .withDescription("Only read changelog rows which match the specific table (by comparing the \"table\" meta field in the record)."); - @SuppressWarnings("unchecked") @Override public DecodingFormat> createDecodingFormat( DynamicTableFactory.Context context, @@ -84,7 +83,7 @@ public DeserializationSchema createRuntimeDecoder( DynamicTableSource.Context context, DataType producedDataType) { final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation rowDataTypeInfo = - (TypeInformation) context.createTypeInformation(producedDataType); + context.createTypeInformation(producedDataType); return CanalJsonDeserializationSchema .builder(rowType, rowDataTypeInfo) .setIgnoreParseErrors(ignoreParseErrors) diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java index d81f7a1bf797d..794b1a49efb67 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonFormatFactory.java @@ -64,7 +64,6 @@ public class DebeziumJsonFormatFactory implements DeserializationFormatFactory, public static final ConfigOption TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT; - @SuppressWarnings("unchecked") @Override public DecodingFormat> createDecodingFormat( DynamicTableFactory.Context context, @@ -80,7 +79,7 @@ public DeserializationSchema createRuntimeDecoder( DynamicTableSource.Context context, DataType producedDataType) { final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation rowDataTypeInfo = - (TypeInformation) context.createTypeInformation(producedDataType); + context.createTypeInformation(producedDataType); return new DebeziumJsonDeserializationSchema( rowType, rowDataTypeInfo, diff --git a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java index e3df0434cc10a..f51e5e2ecd337 100644 --- a/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java +++ b/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/maxwell/MaxwellJsonFormatFactory.java @@ -54,7 +54,6 @@ public class MaxwellJsonFormatFactory implements DeserializationFormatFactory, S public static final ConfigOption TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT; - @SuppressWarnings("unchecked") @Override public DecodingFormat> createDecodingFormat( DynamicTableFactory.Context context, @@ -69,7 +68,7 @@ public DeserializationSchema createRuntimeDecoder( DynamicTableSource.Context context, DataType producedDataType) { final RowType rowType = (RowType) producedDataType.getLogicalType(); final TypeInformation rowDataTypeInfo = - (TypeInformation) context.createTypeInformation(producedDataType); + context.createTypeInformation(producedDataType); return new MaxwellJsonDeserializationSchema( rowType, rowDataTypeInfo, diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java index 3fce6c437b42e..192a1a057bf26 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java @@ -140,7 +140,7 @@ interface Context { * * @see TableSchema#toPhysicalRowDataType() */ - TypeInformation createTypeInformation(DataType consumedDataType); + TypeInformation createTypeInformation(DataType consumedDataType); /** * Creates a converter for mapping between Flink's internal data structures and objects specified diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java index 1e31630856d64..c91edd58c9815 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/source/DynamicTableSource.java @@ -94,7 +94,7 @@ interface Context { * * @see TableSchema#toPhysicalRowDataType() */ - TypeInformation createTypeInformation(DataType producedDataType); + TypeInformation createTypeInformation(DataType producedDataType); /** * Creates a converter for mapping between objects specified by the given {@link DataType} and diff --git a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java index aa1db92234a8b..0b7e160b54833 100644 --- a/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java +++ b/flink-table/flink-table-planner-blink/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java @@ -599,11 +599,10 @@ public ChangelogMode getChangelogMode() { return changelogMode; } - @SuppressWarnings("unchecked") @Override public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) { - TypeSerializer serializer = (TypeSerializer) runtimeProviderContext - .createTypeInformation(producedDataType) + TypeSerializer serializer = runtimeProviderContext + .createTypeInformation(producedDataType) .createSerializer(new ExecutionConfig()); DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(producedDataType); converter.open(RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader()));