diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java index 25b359cba9702..3bda1fec0a0e3 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -288,16 +288,9 @@ public enum Semantic { * ID of the Kafka topic. * @param serializationSchema * User defined (keyless) serialization schema. - * - * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)} */ - @Deprecated public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema serializationSchema) { - this( - topicId, - new KeyedSerializationSchemaWrapper<>(serializationSchema), - getPropertiesFromBrokerList(brokerList), - Optional.of(new FlinkFixedPartitioner())); + this(topicId, serializationSchema, getPropertiesFromBrokerList(brokerList)); } /** @@ -318,16 +311,41 @@ public FlinkKafkaProducer(String brokerList, String topicId, SerializationSchema * User defined key-less serialization schema. * @param producerConfig * Properties with the producer configuration. - * - * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)} */ - @Deprecated public FlinkKafkaProducer(String topicId, SerializationSchema serializationSchema, Properties producerConfig) { + this(topicId, serializationSchema, producerConfig, Optional.of(new FlinkFixedPartitioner<>())); + } + + /** + * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to + * the topic. It accepts a key-less {@link SerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}. + * + *

Since a key-less {@link SerializationSchema} is used, all records sent to Kafka will not have an + * attached key. Therefore, if a partitioner is also not provided, records will be distributed to Kafka + * partitions in a round-robin fashion. + * + * @param topicId + * The topic to write data to + * @param serializationSchema + * A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig + * Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner + * A serializable partitioner for assigning messages to Kafka partitions. If a partitioner is not + * provided, records will be distributed to Kafka partitions in a round-robin fashion. + */ + public FlinkKafkaProducer( + String topicId, + SerializationSchema serializationSchema, + Properties producerConfig, + Optional> customPartitioner) { this( topicId, - new KeyedSerializationSchemaWrapper<>(serializationSchema), + serializationSchema, producerConfig, - Optional.of(new FlinkFixedPartitioner())); + customPartitioner.orElse(null), + Semantic.AT_LEAST_ONCE, + DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); } /** @@ -339,22 +357,34 @@ public FlinkKafkaProducer(String topicId, SerializationSchema serializationS * partitions in a round-robin fashion. * * @param topicId The topic to write data to - * @param serializationSchema A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[] - * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. - * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. - * If a partitioner is not provided, records will be distributed to Kafka partitions - * in a round-robin fashion. - * - * @deprecated use {@link #FlinkKafkaProducer(String, KafkaSerializationSchema, Properties, FlinkKafkaProducer.Semantic)} + * @param serializationSchema + * A key-less serializable serialization schema for turning user objects into a kafka-consumable byte[] + * @param producerConfig + * Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. + * @param customPartitioner + * A serializable partitioner for assigning messages to Kafka partitions. If a partitioner is not + * provided, records will be distributed to Kafka partitions in a round-robin fashion. + * @param semantic + * Defines semantic that will be used by this producer (see {@link FlinkKafkaProducer.Semantic}). + * @param kafkaProducersPoolSize + * Overwrite default KafkaProducers pool size (see {@link FlinkKafkaProducer.Semantic#EXACTLY_ONCE}). */ - @Deprecated public FlinkKafkaProducer( - String topicId, - SerializationSchema serializationSchema, - Properties producerConfig, - Optional> customPartitioner) { - - this(topicId, new KeyedSerializationSchemaWrapper<>(serializationSchema), producerConfig, customPartitioner); + String topicId, + SerializationSchema serializationSchema, + Properties producerConfig, + @Nullable FlinkKafkaPartitioner customPartitioner, + FlinkKafkaProducer.Semantic semantic, + int kafkaProducersPoolSize) { + this( + topicId, + new KeyedSerializationSchemaWrapper<>(serializationSchema), + customPartitioner, + null, + producerConfig, + semantic, + kafkaProducersPoolSize + ); } // ------------------- Key/Value serialization schema constructors ----------------------