From 3102bcf6277ff057e75ce5f778db5f70f28c6a01 Mon Sep 17 00:00:00 2001 From: Dong Lin Date: Wed, 3 Mar 2021 21:19:55 +0800 Subject: [PATCH] [FLINK-20379][connector/kafka] Add a convenient method setValueOnlyDeserializer(DeserializationSchema) to KafkaSourceBuilder. --- .../kafka/source/KafkaSourceBuilder.java | 21 +++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java index 0588f0cbce06c..73ba67595dc6e 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSourceBuilder.java @@ -18,6 +18,7 @@ package org.apache.flink.connector.kafka.source; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer; import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer; @@ -54,7 +55,7 @@ * .setBootstrapServers(MY_BOOTSTRAP_SERVERS) * .setGroupId("myGroup") * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) - * .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) + * .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) * .build(); * } * @@ -76,7 +77,7 @@ * .setBootstrapServers(MY_BOOTSTRAP_SERVERS) * .setGroupId("myGroup") * .setTopics(Arrays.asList(TOPIC1, TOPIC2)) - * .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class)) + * .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class)) * .setUnbounded(OffsetsInitializer.latest()) * .build(); * } @@ -316,6 +317,22 @@ public KafkaSourceBuilder setDeserializer( return this; } + /** + * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link + * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. The given + * {@link DeserializationSchema} will be used to deserialize the value of ConsumerRecord. The + * other information (e.g. key) in a ConsumerRecord will be ignored. + * + * @param deserializationSchema the {@link DeserializationSchema} to use for deserialization. + * @return this KafkaSourceBuilder. + */ + public KafkaSourceBuilder setValueOnlyDeserializer( + DeserializationSchema deserializationSchema) { + this.deserializationSchema = + KafkaRecordDeserializationSchema.valueOnly(deserializationSchema); + return this; + } + /** * Sets the client id prefix of this KafkaSource. *