Skip to content

Commit

Permalink
[FLINK-18075][hotfix] Use DeserializationSchema instead of KeyedDeser…
Browse files Browse the repository at this point in the history
…ializationSchema in DynamicTableSink
  • Loading branch information
dawidwys committed Jun 8, 2020
1 parent 50b6d9d commit 87d6a76
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -57,7 +56,7 @@ protected SinkFunction<Row> createKafkaProducer(
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer011<>(
topic,
new KeyedSerializationSchemaWrapper<>(serializationSchema),
serializationSchema,
properties,
partitioner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
Expand Down Expand Up @@ -60,7 +59,7 @@ protected SinkFunction<RowData> createKafkaProducer(
Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
return new FlinkKafkaProducer011<>(
topic,
new KeyedSerializationSchemaWrapper<>(serializationSchema),
serializationSchema,
properties,
partitioner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.types.Row;
Expand Down Expand Up @@ -52,7 +51,7 @@ protected SinkFunction<Row> createKafkaProducer(
Optional<FlinkKafkaPartitioner<Row>> partitioner) {
return new FlinkKafkaProducer<>(
topic,
new KeyedSerializationSchemaWrapper<>(serializationSchema),
serializationSchema,
properties,
partitioner);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.connectors.kafka.internals.KeyedSerializationSchemaWrapper;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.table.connector.format.EncodingFormat;
import org.apache.flink.table.connector.sink.DynamicTableSink;
Expand Down Expand Up @@ -59,7 +58,7 @@ protected SinkFunction<RowData> createKafkaProducer(
Optional<FlinkKafkaPartitioner<RowData>> partitioner) {
return new FlinkKafkaProducer<>(
topic,
new KeyedSerializationSchemaWrapper<>(serializationSchema),
serializationSchema,
properties,
partitioner);
}
Expand Down

0 comments on commit 87d6a76

Please sign in to comment.