From 628b71dfabe72a99eb6d54994fc870abed1f0268 Mon Sep 17 00:00:00 2001 From: Timo Walther Date: Fri, 27 Jul 2018 14:03:50 +0200 Subject: [PATCH] [FLINK-9979] [table] Support a FlinkKafkaPartitioner for Kafka table sink factory Adds the possibility to add a FlinkKafkaPartitioner to a Kafka table sink factory. It povides shortcuts for the built-in "fixed" and "round-robin" partitioning. This closes #6440. --- .../kafka/Kafka010JsonTableSink.java | 14 +++- .../connectors/kafka/Kafka010TableSink.java | 7 +- .../kafka/Kafka010TableSourceSinkFactory.java | 2 +- .../kafka/Kafka010JsonTableSinkTest.java | 5 +- .../Kafka010TableSourceSinkFactoryTest.java | 2 +- .../connectors/kafka/Kafka011TableSink.java | 6 +- .../kafka/Kafka011TableSourceSinkFactory.java | 2 +- .../Kafka011TableSourceSinkFactoryTest.java | 2 +- .../kafka/Kafka08JsonTableSink.java | 14 +++- .../connectors/kafka/Kafka08TableSink.java | 7 +- .../kafka/Kafka08TableSourceSinkFactory.java | 2 +- .../kafka/Kafka08JsonTableSinkTest.java | 5 +- .../Kafka08TableSourceSinkFactoryTest.java | 2 +- .../kafka/Kafka09JsonTableSink.java | 14 +++- .../connectors/kafka/Kafka09TableSink.java | 7 +- .../kafka/Kafka09TableSourceSinkFactory.java | 2 +- .../kafka/Kafka09JsonTableSinkTest.java | 5 +- .../Kafka09TableSourceSinkFactoryTest.java | 2 +- .../connectors/kafka/KafkaTableSink.java | 10 +-- .../KafkaTableSourceSinkFactoryBase.java | 33 ++++++-- .../apache/flink/table/descriptors/Kafka.java | 78 +++++++++++++++++++ .../table/descriptors/KafkaValidator.java | 33 +++++++- .../kafka/KafkaTableSinkTestBase.java | 5 +- .../KafkaTableSourceSinkFactoryTestBase.java | 6 +- .../flink/table/descriptors/KafkaTest.java | 7 +- 25 files changed, 223 insertions(+), 49 deletions(-) diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java index 2ad31420789a4..8471908a9cf9a 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java @@ -24,6 +24,7 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.types.Row; +import java.util.Optional; import java.util.Properties; /** @@ -73,16 +74,23 @@ public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPart } @Override - protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, FlinkKafkaPartitioner partitioner) { + protected FlinkKafkaProducerBase createKafkaProducer( + String topic, + Properties properties, + SerializationSchema serializationSchema, + Optional> partitioner) { return new FlinkKafkaProducer010<>( topic, serializationSchema, properties, - partitioner); + partitioner.orElse(new FlinkFixedPartitioner<>())); } @Override protected Kafka010JsonTableSink createCopy() { - return new Kafka010JsonTableSink(topic, properties, partitioner); + return new Kafka010JsonTableSink( + topic, + properties, + partitioner.orElse(new FlinkFixedPartitioner<>())); } } diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java index a8c655398243d..1d408b8ab5289 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSink.java @@ -24,6 +24,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; +import java.util.Optional; import java.util.Properties; /** @@ -36,7 +37,7 @@ public Kafka010TableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { super( schema, @@ -51,11 +52,11 @@ protected FlinkKafkaProducerBase createKafkaProducer( String topic, Properties properties, SerializationSchema serializationSchema, - FlinkKafkaPartitioner partitioner) { + Optional> partitioner) { return new FlinkKafkaProducer010<>( topic, serializationSchema, properties, - partitioner); + partitioner.orElse(null)); } } diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java index 0cf94995bdd49..ecf12b27a08e7 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactory.java @@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { return new Kafka010TableSink( diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java index 339420cede3d3..9208f6583b877 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java @@ -40,7 +40,10 @@ protected KafkaTableSink createTableSink( Properties properties, FlinkKafkaPartitioner partitioner) { - return new Kafka010JsonTableSink(topic, properties, partitioner); + return new Kafka010JsonTableSink( + topic, + properties, + partitioner); } @Override diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java index cc198c9159503..dac8a4dacdd1e 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSourceSinkFactoryTest.java @@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { return new Kafka010TableSink( diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java index 22c6da13b05a8..8d81a5b59a1c3 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSink.java @@ -39,7 +39,7 @@ public Kafka011TableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { super( schema, @@ -54,11 +54,11 @@ protected SinkFunction createKafkaProducer( String topic, Properties properties, SerializationSchema serializationSchema, - FlinkKafkaPartitioner partitioner) { + Optional> partitioner) { return new FlinkKafkaProducer011<>( topic, new KeyedSerializationSchemaWrapper<>(serializationSchema), properties, - Optional.of(partitioner)); + partitioner); } } diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java index c26df42ed4ac4..e6f677fb56861 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactory.java @@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { return new Kafka011TableSink( diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java index 996c50838603f..f4614761d21dc 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSourceSinkFactoryTest.java @@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { return new Kafka011TableSink( diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index 45588cdb14127..189a9fdf46bb9 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -26,6 +26,7 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.types.Row; +import java.util.Optional; import java.util.Properties; /** @@ -92,17 +93,24 @@ public Kafka08JsonTableSink(String topic, Properties properties, KafkaPartitione } @Override - protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, FlinkKafkaPartitioner partitioner) { + protected FlinkKafkaProducerBase createKafkaProducer( + String topic, + Properties properties, + SerializationSchema serializationSchema, + Optional> partitioner) { return new FlinkKafkaProducer08<>( topic, serializationSchema, properties, - partitioner); + partitioner.orElse(new FlinkFixedPartitioner<>())); } @Override protected Kafka08JsonTableSink createCopy() { - return new Kafka08JsonTableSink(topic, properties, partitioner); + return new Kafka08JsonTableSink( + topic, + properties, + partitioner.orElse(new FlinkFixedPartitioner<>())); } } diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java index c34de13efde33..146cfc907390d 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSink.java @@ -24,6 +24,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; +import java.util.Optional; import java.util.Properties; /** @@ -36,7 +37,7 @@ public Kafka08TableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { super( schema, @@ -51,11 +52,11 @@ protected FlinkKafkaProducerBase createKafkaProducer( String topic, Properties properties, SerializationSchema serializationSchema, - FlinkKafkaPartitioner partitioner) { + Optional> partitioner) { return new FlinkKafkaProducer08<>( topic, serializationSchema, properties, - partitioner); + partitioner.orElse(null)); } } diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java index 3e93b6fdeacc0..aeccd4f1ac360 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactory.java @@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { return new Kafka08TableSink( diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index 32bd3b69c06ee..fc46ad4c6ee50 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -40,7 +40,10 @@ protected KafkaTableSink createTableSink( Properties properties, FlinkKafkaPartitioner partitioner) { - return new Kafka08JsonTableSink(topic, properties, partitioner); + return new Kafka08JsonTableSink( + topic, + properties, + partitioner); } @Override diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java index b67501e449ec9..ff633ec0246d7 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSourceSinkFactoryTest.java @@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { return new Kafka08TableSink( diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index b3cc0aa77cad4..336345900615e 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -26,6 +26,7 @@ import org.apache.flink.table.descriptors.ConnectorDescriptor; import org.apache.flink.types.Row; +import java.util.Optional; import java.util.Properties; /** @@ -92,16 +93,23 @@ public Kafka09JsonTableSink(String topic, Properties properties, KafkaPartitione } @Override - protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, FlinkKafkaPartitioner partitioner) { + protected FlinkKafkaProducerBase createKafkaProducer( + String topic, + Properties properties, + SerializationSchema serializationSchema, + Optional> partitioner) { return new FlinkKafkaProducer09<>( topic, serializationSchema, properties, - partitioner); + partitioner.orElse(new FlinkFixedPartitioner<>())); } @Override protected Kafka09JsonTableSink createCopy() { - return new Kafka09JsonTableSink(topic, properties, partitioner); + return new Kafka09JsonTableSink( + topic, + properties, + partitioner.orElse(new FlinkFixedPartitioner<>())); } } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java index 8c349d7a0b23c..6e38aad1a3939 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSink.java @@ -24,6 +24,7 @@ import org.apache.flink.table.api.TableSchema; import org.apache.flink.types.Row; +import java.util.Optional; import java.util.Properties; /** @@ -36,7 +37,7 @@ public Kafka09TableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { super( schema, @@ -51,11 +52,11 @@ protected FlinkKafkaProducerBase createKafkaProducer( String topic, Properties properties, SerializationSchema serializationSchema, - FlinkKafkaPartitioner partitioner) { + Optional> partitioner) { return new FlinkKafkaProducer09<>( topic, serializationSchema, properties, - partitioner); + partitioner.orElse(null)); } } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java index 9958b4ef3161e..19f51508b9364 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactory.java @@ -77,7 +77,7 @@ protected KafkaTableSink createKafkaTableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { return new Kafka09TableSink( diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index 79f251b83023b..97b5c7d88a2c9 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -40,7 +40,10 @@ protected KafkaTableSink createTableSink( Properties properties, FlinkKafkaPartitioner partitioner) { - return new Kafka09JsonTableSink(topic, properties, partitioner); + return new Kafka09JsonTableSink( + topic, + properties, + partitioner); } @Override diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java index a6c8bd4b27980..d54c3945949b4 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSourceSinkFactoryTest.java @@ -85,7 +85,7 @@ protected KafkaTableSink getExpectedKafkaTableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { return new Kafka09TableSink( diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index 7853bb702a5dd..a85d536eac99b 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -40,7 +40,7 @@ * A version-agnostic Kafka {@link AppendStreamTableSink}. * *

The version-specific Kafka consumers need to extend this class and - * override {@link #createKafkaProducer(String, Properties, SerializationSchema, FlinkKafkaPartitioner)}}. + * override {@link #createKafkaProducer(String, Properties, SerializationSchema, Optional)}}. */ @Internal public abstract class KafkaTableSink implements AppendStreamTableSink { @@ -60,7 +60,7 @@ public abstract class KafkaTableSink implements AppendStreamTableSink { protected Optional> serializationSchema; /** Partitioner to select Kafka partition for each item. */ - protected final FlinkKafkaPartitioner partitioner; + protected final Optional> partitioner; // legacy variables protected String[] fieldNames; @@ -70,7 +70,7 @@ protected KafkaTableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema) { this.schema = Optional.of(Preconditions.checkNotNull(schema, "Schema must not be null.")); this.topic = Preconditions.checkNotNull(topic, "Topic must not be null."); @@ -96,7 +96,7 @@ public KafkaTableSink( this.schema = Optional.empty(); this.topic = Preconditions.checkNotNull(topic, "topic"); this.properties = Preconditions.checkNotNull(properties, "properties"); - this.partitioner = Preconditions.checkNotNull(partitioner, "partitioner"); + this.partitioner = Optional.of(Preconditions.checkNotNull(partitioner, "partitioner")); this.serializationSchema = Optional.empty(); } @@ -113,7 +113,7 @@ protected abstract SinkFunction createKafkaProducer( String topic, Properties properties, SerializationSchema serializationSchema, - FlinkKafkaPartitioner partitioner); + Optional> partitioner); /** * Create serialization schema for converting table rows into bytes. diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java index 27b2e67ce0bfe..5634331adbb3c 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryBase.java @@ -38,6 +38,7 @@ import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.types.Row; +import org.apache.flink.util.InstantiationUtil; import java.util.ArrayList; import java.util.Arrays; @@ -54,6 +55,11 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES; import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY; import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_CLASS; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_FIXED; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN; import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS; import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET; import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION; @@ -105,6 +111,8 @@ public List supportedProperties() { properties.add(CONNECTOR_STARTUP_MODE); properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_PARTITION); properties.add(CONNECTOR_SPECIFIC_OFFSETS + ".#." + CONNECTOR_SPECIFIC_OFFSETS_OFFSET); + properties.add(CONNECTOR_SINK_PARTITIONER); + properties.add(CONNECTOR_SINK_PARTITIONER_CLASS); // schema properties.add(SCHEMA() + ".#." + SCHEMA_TYPE()); @@ -170,7 +178,7 @@ public StreamTableSink createStreamTableSink(Map properties schema, topic, getKafkaProperties(descriptorProperties), - getFlinkKafkaPartitioner(), + getFlinkKafkaPartitioner(descriptorProperties), getSerializationSchema(properties)); } @@ -228,7 +236,7 @@ protected abstract KafkaTableSink createKafkaTableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema); // -------------------------------------------------------------------------------------------- @@ -314,9 +322,24 @@ private StartupOptions getStartupOptions( return options; } - private FlinkKafkaPartitioner getFlinkKafkaPartitioner() { - // we don't support custom partitioner so far - return new FlinkFixedPartitioner<>(); + @SuppressWarnings("unchecked") + private Optional> getFlinkKafkaPartitioner(DescriptorProperties descriptorProperties) { + return descriptorProperties + .getOptionalString(CONNECTOR_SINK_PARTITIONER) + .flatMap((String partitionerString) -> { + switch (partitionerString) { + case CONNECTOR_SINK_PARTITIONER_VALUE_FIXED: + return Optional.of(new FlinkFixedPartitioner<>()); + case CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN: + return Optional.empty(); + case CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM: + final Class partitionerClass = + descriptorProperties.getClass(CONNECTOR_SINK_PARTITIONER_CLASS, FlinkKafkaPartitioner.class); + return Optional.of(InstantiationUtil.instantiate(partitionerClass)); + default: + throw new TableException("Unsupported sink partitioner. Validator should have checked that."); + } + }); } private boolean checkForCustomFieldMapping(DescriptorProperties descriptorProperties, TableSchema schema) { diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java index 45359587c1cd0..e44341a991eed 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/Kafka.java @@ -20,6 +20,7 @@ import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.util.Preconditions; import java.util.ArrayList; @@ -34,6 +35,11 @@ import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES; import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_KEY; import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_PROPERTIES_VALUE; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_CLASS; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_FIXED; +import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN; import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS; import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_OFFSET; import static org.apache.flink.table.descriptors.KafkaValidator.CONNECTOR_SPECIFIC_OFFSETS_PARTITION; @@ -51,6 +57,8 @@ public class Kafka extends ConnectorDescriptor { private StartupMode startupMode; private Map specificOffsets; private Map kafkaProperties; + private String sinkPartitionerType; + private Class sinkPartitionerClass; /** * Connector descriptor for the Apache Kafka message queue. @@ -175,6 +183,69 @@ public Kafka startFromSpecificOffset(int partition, long specificOffset) { return this; } + /** + * Configures how to partition records from Flink's partitions into Kafka's partitions. + * + *

This strategy ensures that each Flink partition ends up in one Kafka partition. + * + *

Note: One Kafka partition can contain multiple Flink partitions. Examples: + * + *

More Flink partitions than Kafka partitions. Some (or all) Kafka partitions contain + * the output of more than one flink partition: + *

+	 *     Flink Sinks            Kafka Partitions
+	 *         1    ---------------->    1
+	 *         2    --------------/
+	 *         3    -------------/
+	 *         4    ------------/
+	 * 
+ * + * + *

Fewer Flink partitions than Kafka partitions: + *

+	 *     Flink Sinks            Kafka Partitions
+	 *         1    ---------------->    1
+	 *         2    ---------------->    2
+	 *                                      3
+	 *                                      4
+	 *                                      5
+	 * 
+ * + * @see org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner + */ + public Kafka sinkPartitionerFixed() { + sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_FIXED; + sinkPartitionerClass = null; + return this; + } + + /** + * Configures how to partition records from Flink's partitions into Kafka's partitions. + * + *

This strategy ensures that records will be distributed to Kafka partitions in a + * round-robin fashion. + * + *

Note: This strategy is useful to avoid an unbalanced partitioning. However, it will + * cause a lot of network connections between all the Flink instances and all the Kafka brokers. + */ + public Kafka sinkPartitionerRoundRobin() { + sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN; + sinkPartitionerClass = null; + return this; + } + + /** + * Configures how to partition records from Flink's partitions into Kafka's partitions. + * + *

This strategy allows for a custom partitioner by providing an implementation + * of {@link FlinkKafkaPartitioner}. + */ + public Kafka sinkPartitionerCustom(Class partitionerClass) { + sinkPartitionerType = CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM; + sinkPartitionerClass = Preconditions.checkNotNull(partitionerClass); + return this; + } + /** * Internal method for connector properties conversion. */ @@ -212,5 +283,12 @@ public void addConnectorProperties(DescriptorProperties properties) { .collect(Collectors.toList()) ); } + + if (sinkPartitionerType != null) { + properties.putString(CONNECTOR_SINK_PARTITIONER, sinkPartitionerType); + if (sinkPartitionerClass != null) { + properties.putClass(CONNECTOR_SINK_PARTITIONER_CLASS, sinkPartitionerClass); + } + } } } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java index 3adc7c518a447..cad37f8f8cd26 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/table/descriptors/KafkaValidator.java @@ -48,12 +48,27 @@ public class KafkaValidator extends ConnectorDescriptorValidator { public static final String CONNECTOR_PROPERTIES = "connector.properties"; public static final String CONNECTOR_PROPERTIES_KEY = "key"; public static final String CONNECTOR_PROPERTIES_VALUE = "value"; + public static final String CONNECTOR_SINK_PARTITIONER = "connector.sink-partitioner"; + public static final String CONNECTOR_SINK_PARTITIONER_VALUE_FIXED = "fixed"; + public static final String CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN = "round-robin"; + public static final String CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM = "custom"; + public static final String CONNECTOR_SINK_PARTITIONER_CLASS = "connector.sink-partitioner-class"; @Override public void validate(DescriptorProperties properties) { super.validate(properties); properties.validateValue(CONNECTOR_TYPE(), CONNECTOR_TYPE_VALUE_KAFKA, false); + validateVersion(properties); + + validateStartupMode(properties); + + validateKafkaProperties(properties); + + validateSinkPartitioner(properties); + } + + private void validateVersion(DescriptorProperties properties) { final List versions = Arrays.asList( CONNECTOR_VERSION_VALUE_08, CONNECTOR_VERSION_VALUE_09, @@ -61,7 +76,9 @@ public void validate(DescriptorProperties properties) { CONNECTOR_VERSION_VALUE_011); properties.validateEnumValues(CONNECTOR_VERSION(), false, versions); properties.validateString(CONNECTOR_TOPIC, false, 1, Integer.MAX_VALUE); + } + private void validateStartupMode(DescriptorProperties properties) { final Map> specificOffsetValidators = new HashMap<>(); specificOffsetValidators.put( CONNECTOR_SPECIFIC_OFFSETS_PARTITION, @@ -86,17 +103,29 @@ public void validate(DescriptorProperties properties) { CONNECTOR_STARTUP_MODE_VALUE_SPECIFIC_OFFSETS, prefix -> properties.validateFixedIndexedProperties(CONNECTOR_SPECIFIC_OFFSETS, false, specificOffsetValidators)); properties.validateEnum(CONNECTOR_STARTUP_MODE, true, startupModeValidation); + } + private void validateKafkaProperties(DescriptorProperties properties) { final Map> propertyValidators = new HashMap<>(); propertyValidators.put( CONNECTOR_PROPERTIES_KEY, - prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_KEY, false, 1, Integer.MAX_VALUE)); + prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_KEY, false, 1)); propertyValidators.put( CONNECTOR_PROPERTIES_VALUE, - prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_VALUE, false, 0, Integer.MAX_VALUE)); + prefix -> properties.validateString(prefix + CONNECTOR_PROPERTIES_VALUE, false, 0)); properties.validateFixedIndexedProperties(CONNECTOR_PROPERTIES, true, propertyValidators); } + private void validateSinkPartitioner(DescriptorProperties properties) { + final Map> sinkPartitionerValidators = new HashMap<>(); + sinkPartitionerValidators.put(CONNECTOR_SINK_PARTITIONER_VALUE_FIXED, properties.noValidation()); + sinkPartitionerValidators.put(CONNECTOR_SINK_PARTITIONER_VALUE_ROUND_ROBIN, properties.noValidation()); + sinkPartitionerValidators.put( + CONNECTOR_SINK_PARTITIONER_VALUE_CUSTOM, + prefix -> properties.validateString(CONNECTOR_SINK_PARTITIONER_CLASS, false, 1)); + properties.validateEnum(CONNECTOR_SINK_PARTITIONER, true, sinkPartitionerValidators); + } + // utilities public static String normalizeStartupMode(StartupMode startupMode) { diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index 946b6eb589508..b4bb89dc04883 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -30,6 +30,7 @@ import org.junit.Test; +import java.util.Optional; import java.util.Properties; import static org.junit.Assert.assertArrayEquals; @@ -59,7 +60,7 @@ public abstract class KafkaTableSinkTestBase { @SuppressWarnings("unchecked") @Test - public void testKafkaTableSink() throws Exception { + public void testKafkaTableSink() { DataStream dataStream = mock(DataStream.class); when(dataStream.addSink(any(SinkFunction.class))).thenReturn(mock(DataStreamSink.class)); @@ -74,7 +75,7 @@ public void testKafkaTableSink() throws Exception { eq(TOPIC), eq(PROPERTIES), any(getSerializationSchemaClass()), - eq(PARTITIONER)); + eq(Optional.of(PARTITIONER))); } @Test diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java index 504bed16a74db..5e9144c1e57a4 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceSinkFactoryTestBase.java @@ -153,6 +153,7 @@ public void testTableSource() { .version(getKafkaVersion()) .topic(TOPIC) .properties(KAFKA_PROPERTIES) + .sinkPartitionerRoundRobin() // test if accepted although not needed .startFromSpecificOffsets(OFFSETS)) .withFormat(new TestTableFormat()) .withSchema( @@ -194,7 +195,7 @@ public void testTableSink() { schema, TOPIC, KAFKA_PROPERTIES, - new FlinkFixedPartitioner<>(), // a custom partitioner is not support yet + Optional.of(new FlinkFixedPartitioner<>()), new TestSerializationSchema(schema.toRowType())); // construct table sink using descriptors and table sink factory @@ -204,6 +205,7 @@ public void testTableSink() { .version(getKafkaVersion()) .topic(TOPIC) .properties(KAFKA_PROPERTIES) + .sinkPartitionerFixed() .startFromSpecificOffsets(OFFSETS)) // test if they accepted although not needed .withFormat(new TestTableFormat()) .withSchema( @@ -299,6 +301,6 @@ protected abstract KafkaTableSink getExpectedKafkaTableSink( TableSchema schema, String topic, Properties properties, - FlinkKafkaPartitioner partitioner, + Optional> partitioner, SerializationSchema serializationSchema); } diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java index f3d96f1c443aa..c67bc4dcf20cd 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/table/descriptors/KafkaTest.java @@ -18,6 +18,8 @@ package org.apache.flink.table.descriptors; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; + import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -59,7 +61,8 @@ public List descriptors() { .version("0.11") .topic("MyTable") .startFromSpecificOffsets(offsets) - .properties(properties); + .properties(properties) + .sinkPartitionerCustom(FlinkFixedPartitioner.class); return Arrays.asList(earliestDesc, specificOffsetsDesc, specificOffsetsMapDesc); } @@ -102,6 +105,8 @@ public List> properties() { props3.put("connector.properties.0.value", "12"); props3.put("connector.properties.1.key", "kafka.stuff"); props3.put("connector.properties.1.value", "42"); + props3.put("connector.sink-partitioner", "custom"); + props3.put("connector.sink-partitioner-class", FlinkFixedPartitioner.class.getName()); return Arrays.asList(props1, props2, props3); }