Skip to content

Commit

Permalink
[hotfix][table-common] Avoid unnecessary casting when creating type i…
Browse files Browse the repository at this point in the history
…nformation in sources and sinks

This it is not a compatible change. But given that those interfaces are still relatively new and
not many people have changed to the new sources/sinks. We should do this change now or never
and avoid @SuppressWarning in almost all implementations.
  • Loading branch information
twalthr committed Oct 23, 2020
1 parent 7b04b29 commit 909e747
Show file tree
Hide file tree
Showing 13 changed files with 16 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,9 @@ private void validatePrimaryKey(ChangelogMode requestedMode) {
}

@Override
@SuppressWarnings("unchecked")
public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
final TypeInformation<RowData> rowDataTypeInformation = (TypeInformation<RowData>) context
.createTypeInformation(tableSchema.toRowDataType());
final TypeInformation<RowData> rowDataTypeInformation =
context.createTypeInformation(tableSchema.toRowDataType());
final JdbcDynamicOutputFormatBuilder builder = new JdbcDynamicOutputFormatBuilder();

builder.setJdbcOptions(jdbcOptions);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
package org.apache.flink.connector.jdbc.table;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.connector.jdbc.dialect.JdbcDialect;
import org.apache.flink.connector.jdbc.internal.options.JdbcLookupOptions;
import org.apache.flink.connector.jdbc.internal.options.JdbcOptions;
Expand All @@ -33,7 +32,6 @@
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.TableFunctionProvider;
import org.apache.flink.table.connector.source.abilities.SupportsProjectionPushDown;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.table.utils.TableSchemaUtils;
import org.apache.flink.util.Preconditions;
Expand Down Expand Up @@ -86,7 +84,6 @@ public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
}

@Override
@SuppressWarnings("unchecked")
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
final JdbcRowDataInputFormat.Builder builder = JdbcRowDataInputFormat.builder()
.setDrivername(options.getDriverName())
Expand Down Expand Up @@ -114,8 +111,8 @@ public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderCon
builder.setQuery(query);
final RowType rowType = (RowType) physicalSchema.toRowDataType().getLogicalType();
builder.setRowConverter(dialect.getRowConverter(rowType));
builder.setRowDataTypeInfo((TypeInformation<RowData>) runtimeProviderContext
.createTypeInformation(physicalSchema.toRowDataType()));
builder.setRowDataTypeInfo(
runtimeProviderContext.createTypeInformation(physicalSchema.toRowDataType()));

return InputFormatProvider.of(builder.build());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,9 @@ public ChangelogCsvFormat(String columnDelimiter) {
}

@Override
@SuppressWarnings("unchecked")
public DeserializationSchema<RowData> createRuntimeDecoder(DynamicTableSource.Context context, DataType producedDataType) {
// create type information for the DeserializationSchema
final TypeInformation<RowData> producedTypeInfo = (TypeInformation<RowData>) context.createTypeInformation(producedDataType);
final TypeInformation<RowData> producedTypeInfo = context.createTypeInformation(producedDataType);

// most of the code in DeserializationSchema will not work on internal data structures
// create a converter for conversion at the end
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
context.createTypeInformation(producedDataType);
return new AvroRowDataDeserializationSchema(
ConfluentRegistryAvroDeserializationSchema.forGeneric(
AvroSchemaConverter.convertToSchema(rowType),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
context.createTypeInformation(producedDataType);
return new AvroRowDataDeserializationSchema(rowType, rowDataTypeInfo);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,6 @@ public final class CsvFormatFactory implements

public static final String IDENTIFIER = "csv";

@SuppressWarnings("unchecked")
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context, ReadableConfig formatOptions) {
Expand All @@ -76,7 +75,7 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
context.createTypeInformation(producedDataType);
final CsvRowDataDeserializationSchema.Builder schemaBuilder =
new CsvRowDataDeserializationSchema.Builder(
rowType,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ public class JsonFormatFactory implements

public static final String IDENTIFIER = "json";

@SuppressWarnings("unchecked")
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
Expand All @@ -75,7 +74,7 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
context.createTypeInformation(producedDataType);
return new JsonRowDataDeserializationSchema(
rowType,
rowDataTypeInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ public class CanalJsonFormatFactory implements DeserializationFormatFactory, Ser
.noDefaultValue()
.withDescription("Only read changelog rows which match the specific table (by comparing the \"table\" meta field in the record).");

@SuppressWarnings("unchecked")
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
Expand All @@ -84,7 +83,7 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context, DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
context.createTypeInformation(producedDataType);
return CanalJsonDeserializationSchema
.builder(rowType, rowDataTypeInfo)
.setIgnoreParseErrors(ignoreParseErrors)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ public class DebeziumJsonFormatFactory implements DeserializationFormatFactory,

public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT;

@SuppressWarnings("unchecked")
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
Expand All @@ -80,7 +79,7 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context, DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
context.createTypeInformation(producedDataType);
return new DebeziumJsonDeserializationSchema(
rowType,
rowDataTypeInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@ public class MaxwellJsonFormatFactory implements DeserializationFormatFactory, S

public static final ConfigOption<String> TIMESTAMP_FORMAT = JsonOptions.TIMESTAMP_FORMAT;

@SuppressWarnings("unchecked")
@Override
public DecodingFormat<DeserializationSchema<RowData>> createDecodingFormat(
DynamicTableFactory.Context context,
Expand All @@ -69,7 +68,7 @@ public DeserializationSchema<RowData> createRuntimeDecoder(
DynamicTableSource.Context context, DataType producedDataType) {
final RowType rowType = (RowType) producedDataType.getLogicalType();
final TypeInformation<RowData> rowDataTypeInfo =
(TypeInformation<RowData>) context.createTypeInformation(producedDataType);
context.createTypeInformation(producedDataType);
return new MaxwellJsonDeserializationSchema(
rowType,
rowDataTypeInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ interface Context {
*
* @see TableSchema#toPhysicalRowDataType()
*/
TypeInformation<?> createTypeInformation(DataType consumedDataType);
<T> TypeInformation<T> createTypeInformation(DataType consumedDataType);

/**
* Creates a converter for mapping between Flink's internal data structures and objects specified
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ interface Context {
*
* @see TableSchema#toPhysicalRowDataType()
*/
TypeInformation<?> createTypeInformation(DataType producedDataType);
<T> TypeInformation<T> createTypeInformation(DataType producedDataType);

/**
* Creates a converter for mapping between objects specified by the given {@link DataType} and
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,11 +599,10 @@ public ChangelogMode getChangelogMode() {
return changelogMode;
}

@SuppressWarnings("unchecked")
@Override
public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {
TypeSerializer<RowData> serializer = (TypeSerializer<RowData>) runtimeProviderContext
.createTypeInformation(producedDataType)
TypeSerializer<RowData> serializer = runtimeProviderContext
.<RowData>createTypeInformation(producedDataType)
.createSerializer(new ExecutionConfig());
DataStructureConverter converter = runtimeProviderContext.createDataStructureConverter(producedDataType);
converter.open(RuntimeConverter.Context.create(TestValuesTableFactory.class.getClassLoader()));
Expand Down

0 comments on commit 909e747

Please sign in to comment.