Skip to content

Commit

Permalink
[FLINK-18075] Remove deprecation of Kafka producer ctor that take
Browse files Browse the repository at this point in the history
SerializationSchema

SerializationSchema is an important interface that is widely spread and
used in other components such as Table API. It is also the most common
interface for reusable interfaces. Therefore we should support it long
term in our connectors. This commit removes the deprecation of ctors
that take this interface.

Moreover it adds the most general ctor that takes all producer
configuration options along with SerializationSchema. This makes it
feature equivalent with KafkaSerializationSchema in respect to
configuration of the producer.
  • Loading branch information
dawidwys committed Jun 8, 2020
1 parent 0358292 commit bebe503
Showing 1 changed file with 57 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,16 +288,9 @@ public enum Semantic {
* ID of the Kafka topic.
* @param serializationSchema
* User defined (keyless) serialization schema.
*
* @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
*/
@Deprecated
public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema<IN> serializationSchema) {
this(
topicId,
new KeyedSerializationSchemaWrapper<>(serializationSchema),
getPropertiesFromBrokerList(brokerList),
Optional.of(new FlinkFixedPartitioner<IN>()));
this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList));
}

/**
Expand All @@ -318,16 +311,41 @@ public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema
* User defined key-less serialization schema.
* @param producerConfig
* Properties with the producer configuration.
*
* @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
*/
@Deprecated
public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationSchema, Properties producerConfig) {
this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner<>()));
}

/**
* Creates a FlinkKafkaProducer for a given topic. The sink produces its input to
* the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}.
*
* <p>Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an
* attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka
* partitions in a round-robin fashion.
*
* @param topicId
* The topic to write data to
* @param serializationSchema
* A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig
* Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner
* A serializable partitioner for assigning messages to Kafka partitions. If a partitioner is not
* provided, records will be distributed to Kafka partitions in a round-robin fashion.
*/
public FlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {
this(
topicId,
new KeyedSerializationSchemaWrapper<>(serializationSchema),
serializationSchema,
producerConfig,
Optional.of(new FlinkFixedPartitioner<IN>()));
customPartitioner.orElse(null),
Semantic.AT_LEAST_ONCE,
DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}

/**
Expand All @@ -339,22 +357,34 @@ public FlinkKafkaProducer(String topicId, SerializationSchema<IN> serializationS
* partitions in a round-robin fashion.
*
* @param topicId The topic to write data to
* @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions.
* If a partitioner is not provided, records will be distributed to Kafka partitions
* in a round-robin fashion.
*
* @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)}
* @param serializationSchema
* A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[]
* @param producerConfig
* Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument.
* @param customPartitioner
* A serializable partitioner for assigning messages to Kafka partitions. If a partitioner is not
* provided, records will be distributed to Kafka partitions in a round-robin fashion.
* @param semantic
* Defines semantic that will be used by this producer (see {@link FlinkKafkaProducer.Semantic}).
* @param kafkaProducersPoolSize
* Overwrite default KafkaProducers pool size (see {@link FlinkKafkaProducer.Semantic#EXACTLY_ONCE}).
*/
@Deprecated
public FlinkKafkaProducer(
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
Optional<FlinkKafkaPartitioner<IN>> customPartitioner) {

this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner);
String topicId,
SerializationSchema<IN> serializationSchema,
Properties producerConfig,
@Nullable FlinkKafkaPartitioner<IN> customPartitioner,
FlinkKafkaProducer.Semantic semantic,
int kafkaProducersPoolSize) {
this(
topicId,
new KeyedSerializationSchemaWrapper<>(serializationSchema),
customPartitioner,
null,
producerConfig,
semantic,
kafkaProducersPoolSize
);
}

// ------------------- Key/Value serialization schema constructors ----------------------
Expand Down

0 comments on commit bebe503

Please sign in to comment.