Skip to content

Commit

Permalink
[FLINK-20379][connector/kafka] Add a convenient method setValueOnlyDe…
Browse files Browse the repository at this point in the history
…serializer(DeserializationSchema) to KafkaSourceBuilder.
  • Loading branch information
lindong28 authored and becketqin committed Mar 11, 2021
1 parent 1b7939b commit 3102bcf
Showing 1 changed file with 19 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.connector.kafka.source;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
Expand Down Expand Up @@ -54,7 +55,7 @@
* .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
* .setGroupId("myGroup")
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
* .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
* .build();
* }</pre>
*
Expand All @@ -76,7 +77,7 @@
* .setBootstrapServers(MY_BOOTSTRAP_SERVERS)
* .setGroupId("myGroup")
* .setTopics(Arrays.asList(TOPIC1, TOPIC2))
* .setDeserializer(KafkaRecordDeserializer.valueOnly(StringDeserializer.class))
* .setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
* .setUnbounded(OffsetsInitializer.latest())
* .build();
* }</pre>
Expand Down Expand Up @@ -316,6 +317,22 @@ public KafkaSourceBuilder<OUT> setDeserializer(
return this;
}

/**
* Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link
* org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. The given
* {@link DeserializationSchema} will be used to deserialize the value of ConsumerRecord. The
* other information (e.g. key) in a ConsumerRecord will be ignored.
*
* @param deserializationSchema the {@link DeserializationSchema} to use for deserialization.
* @return this KafkaSourceBuilder.
*/
public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
DeserializationSchema<OUT> deserializationSchema) {
this.deserializationSchema =
KafkaRecordDeserializationSchema.valueOnly(deserializationSchema);
return this;
}

/**
* Sets the client id prefix of this KafkaSource.
*
Expand Down

0 comments on commit 3102bcf

Please sign in to comment.