From 50fba9aa4e96632f7b32cf98d704683364196cbd Mon Sep 17 00:00:00 2001 From: Fabian Hueske Date: Tue, 7 Nov 2017 17:59:43 +0100 Subject: [PATCH] [FLINK-8014] [table] Add Kafka010JsonTableSink. - Refactor KafkaTableSink tests. --- .../kafka/Kafka010JsonTableSink.java | 73 +++++++++++++++++++ .../kafka/Kafka010JsonTableSinkTest.java | 53 ++++++++++++++ .../kafka/Kafka08JsonTableSink.java | 26 ++++++- .../kafka/Kafka08JsonTableSinkTest.java | 27 +++---- .../kafka/Kafka09JsonTableSink.java | 26 ++++++- .../kafka/Kafka09JsonTableSinkTest.java | 27 +++---- .../connectors/kafka/KafkaJsonTableSink.java | 5 +- .../connectors/kafka/KafkaTableSink.java | 10 ++- .../JsonRowSerializationSchema.java | 22 +++++- .../kafka/JsonRowSerializationSchemaTest.java | 46 ++++++++---- .../kafka/KafkaTableSinkTestBase.java | 30 ++++---- 11 files changed, 269 insertions(+), 76 deletions(-) create mode 100644 flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java create mode 100644 flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java new file mode 100644 index 0000000000000..431ace0bbd676 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format. + */ +public class Kafka010JsonTableSink extends KafkaJsonTableSink { + + /** + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10 + * topic with fixed partition assignment. + * + *

Each parallel TableSink instance will write its rows to a single Kafka partition.

+ * + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + */ + public Kafka010JsonTableSink(String topic, Properties properties) { + super(topic, properties, new FlinkFixedPartitioner<>()); + } + + /** + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10 + * topic with custom partition assignment. + * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + * @param partitioner Kafka partitioner + */ + public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner partitioner) { + super(topic, properties, partitioner); + } + + @Override + protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, FlinkKafkaPartitioner partitioner) { + return new FlinkKafkaProducer010<>(topic, serializationSchema, properties, partitioner); + } + + @Override + protected Kafka010JsonTableSink createCopy() { + return new Kafka010JsonTableSink(topic, properties, partitioner); + } +} diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java new file mode 100644 index 0000000000000..4d805d5f3ce07 --- /dev/null +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.kafka; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; +import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; +import org.apache.flink.types.Row; + +import java.util.Properties; + +/** + * Tests for the {@link Kafka010JsonTableSink}. + */ +public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase { + + @Override + protected KafkaTableSink createTableSink( + String topic, + Properties properties, + FlinkKafkaPartitioner partitioner) { + + return new Kafka010JsonTableSink(topic, properties, partitioner); + } + + @Override + protected Class> getSerializationSchemaClass() { + return JsonRowSerializationSchema.class; + } + + @Override + protected Class getProducerClass() { + return FlinkKafkaProducer010.class; + } + +} + diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index a887048a5d12f..39d5cb2c9e5d1 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; @@ -32,7 +33,27 @@ public class Kafka08JsonTableSink extends KafkaJsonTableSink { /** - * Creates {@link KafkaTableSink} for Kafka 0.8. + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8 + * topic with fixed partition assignment. + * + *

Each parallel TableSink instance will write its rows to a single Kafka partition.

+ *
    + *
  • If the number of Kafka partitions is less than the number of sink instances, different + * sink instances will write to the same partition.
  • + *
  • If the number of Kafka partitions is higher than the number of sink instance, some + * Kafka partitions won't receive data.
  • + *
+ * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + */ + public Kafka08JsonTableSink(String topic, Properties properties) { + super(topic, properties, new FlinkFixedPartitioner<>()); + } + + /** + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8 + * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka @@ -43,7 +64,8 @@ public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaParti } /** - * Creates {@link KafkaTableSink} for Kafka 0.8. + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8 + * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index 890fc3abd7cce..d7bb683b10e2a 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -34,26 +34,19 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase { protected KafkaTableSink createTableSink( String topic, Properties properties, - FlinkKafkaPartitioner partitioner, - final FlinkKafkaProducerBase kafkaProducer) { - - return new Kafka08JsonTableSink(topic, properties, partitioner) { - @Override - protected FlinkKafkaProducerBase createKafkaProducer( - String topic, - Properties properties, - SerializationSchema serializationSchema, - FlinkKafkaPartitioner partitioner) { - - return kafkaProducer; - } - }; + FlinkKafkaPartitioner partitioner) { + + return new Kafka08JsonTableSink(topic, properties, partitioner); + } + + @Override + protected Class> getSerializationSchemaClass() { + return JsonRowSerializationSchema.class; } @Override - @SuppressWarnings("unchecked") - protected SerializationSchema getSerializationSchema() { - return new JsonRowSerializationSchema(FIELD_NAMES); + protected Class getProducerClass() { + return FlinkKafkaProducer08.class; } } diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index f65a02d717d1b..a4d266184ce40 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; @@ -32,7 +33,27 @@ public class Kafka09JsonTableSink extends KafkaJsonTableSink { /** - * Creates {@link KafkaTableSink} for Kafka 0.9 . + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9 + * topic with fixed partition assignment. + * + *

Each parallel TableSink instance will write its rows to a single Kafka partition.

+ *
    + *
  • If the number of Kafka partitions is less than the number of sink instances, different + * sink instances will write to the same partition.
  • + *
  • If the number of Kafka partitions is higher than the number of sink instance, some + * Kafka partitions won't receive data.
  • + *
+ * + * @param topic topic in Kafka to which table is written + * @param properties properties to connect to Kafka + */ + public Kafka09JsonTableSink(String topic, Properties properties) { + super(topic, properties, new FlinkFixedPartitioner<>()); + } + + /** + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9 + * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka @@ -43,7 +64,8 @@ public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaParti } /** - * Creates {@link KafkaTableSink} for Kafka 0.9 . + * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9 + * topic with custom partition assignment. * * @param topic topic in Kafka to which table is written * @param properties properties to connect to Kafka diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index c52b4ca9cc427..58f2b0555eb90 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -34,26 +34,19 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase { protected KafkaTableSink createTableSink( String topic, Properties properties, - FlinkKafkaPartitioner partitioner, - final FlinkKafkaProducerBase kafkaProducer) { - - return new Kafka09JsonTableSink(topic, properties, partitioner) { - @Override - protected FlinkKafkaProducerBase createKafkaProducer( - String topic, - Properties properties, - SerializationSchema serializationSchema, - FlinkKafkaPartitioner partitioner) { - - return kafkaProducer; - } - }; + FlinkKafkaPartitioner partitioner) { + + return new Kafka09JsonTableSink(topic, properties, partitioner); + } + + @Override + protected Class> getSerializationSchemaClass() { + return JsonRowSerializationSchema.class; } @Override - @SuppressWarnings("unchecked") - protected SerializationSchema getSerializationSchema() { - return new JsonRowSerializationSchema(FIELD_NAMES); + protected Class getProducerClass() { + return FlinkKafkaProducer09.class; } } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index f354dadb19bee..6665dbd9bed73 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.types.Row; @@ -42,7 +43,7 @@ public KafkaJsonTableSink(String topic, Properties properties, FlinkKafkaPartiti } @Override - protected SerializationSchema createSerializationSchema(String[] fieldNames) { - return new JsonRowSerializationSchema(fieldNames); + protected SerializationSchema createSerializationSchema(RowTypeInfo rowSchema) { + return new JsonRowSerializationSchema(rowSchema); } } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index cac71dc708af6..f42827e5f35ce 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -77,10 +77,10 @@ protected abstract FlinkKafkaProducerBase createKafkaProducer( /** * Create serialization schema for converting table rows into bytes. * - * @param fieldNames Field names in table rows. + * @param rowSchema the schema of the row to serialize. * @return Instance of serialization schema */ - protected abstract SerializationSchema createSerializationSchema(String[] fieldNames); + protected abstract SerializationSchema createSerializationSchema(RowTypeInfo rowSchema); /** * Create a deep copy of this sink. @@ -92,6 +92,8 @@ protected abstract FlinkKafkaProducerBase createKafkaProducer( @Override public void emitDataStream(DataStream dataStream) { FlinkKafkaProducerBase kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner); + // always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled. + kafkaProducer.setFlushOnCheckpoint(true); dataStream.addSink(kafkaProducer); } @@ -116,7 +118,9 @@ public KafkaTableSink configure(String[] fieldNames, TypeInformation[] fieldT copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes"); Preconditions.checkArgument(fieldNames.length == fieldTypes.length, "Number of provided field names and types does not match."); - copy.serializationSchema = createSerializationSchema(fieldNames); + + RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames); + copy.serializationSchema = createSerializationSchema(rowSchema); return copy; } diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java index 5ece193a838b4..36d3137112a6f 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java @@ -18,6 +18,9 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.CompositeType; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; @@ -43,10 +46,23 @@ public class JsonRowSerializationSchema implements SerializationSchema { /** * Creates a JSON serialization schema for the given fields and types. * - * @param fieldNames Names of JSON fields to parse. + * @param rowSchema The schema of the rows to encode. */ - public JsonRowSerializationSchema(String[] fieldNames) { - this.fieldNames = Preconditions.checkNotNull(fieldNames); + public JsonRowSerializationSchema(RowTypeInfo rowSchema) { + + Preconditions.checkNotNull(rowSchema); + String[] fieldNames = rowSchema.getFieldNames(); + TypeInformation[] fieldTypes = rowSchema.getFieldTypes(); + + // check that no field is composite + for (int i = 0; i < fieldTypes.length; i++) { + if (fieldTypes[i] instanceof CompositeType) { + throw new IllegalArgumentException("JsonRowSerializationSchema cannot encode rows with nested schema, " + + "but field '" + fieldNames[i] + "' is nested: " + fieldTypes[i].toString()); + } + } + + this.fieldNames = fieldNames; } @Override diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java index 43bde35bb6a05..70140a6c584e6 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; import org.apache.flink.table.api.Types; @@ -36,31 +37,34 @@ public class JsonRowSerializationSchemaTest { @Test public void testRowSerialization() throws IOException { - String[] fieldNames = new String[] {"f1", "f2", "f3"}; - TypeInformation[] fieldTypes = new TypeInformation[] { Types.INT(), Types.BOOLEAN(), Types.STRING() }; + RowTypeInfo rowSchema = new RowTypeInfo( + new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()}, + new String[] {"f1", "f2", "f3"} + ); + Row row = new Row(3); row.setField(0, 1); row.setField(1, true); row.setField(2, "str"); - Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, row); + Row resultRow = serializeAndDeserialize(rowSchema, row); assertEqualRows(row, resultRow); } @Test public void testSerializationOfTwoRows() throws IOException { - String[] fieldNames = new String[] {"f1", "f2", "f3"}; - TypeInformation row = Types.ROW( - fieldNames, - new TypeInformation[] { Types.INT(), Types.BOOLEAN(), Types.STRING() } + RowTypeInfo rowSchema = new RowTypeInfo( + new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()}, + new String[] {"f1", "f2", "f3"} ); + Row row1 = new Row(3); row1.setField(0, 1); row1.setField(1, true); row1.setField(2, "str"); - JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); - JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(row); + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema); byte[] bytes = serializationSchema.serialize(row1); assertEqualRows(row1, deserializationSchema.deserialize(bytes)); @@ -79,19 +83,33 @@ public void testInputValidation() { new JsonRowSerializationSchema(null); } + @Test(expected = IllegalArgumentException.class) + public void testRejectNestedSchema() { + RowTypeInfo rowSchema = new RowTypeInfo( + new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.ROW(Types.INT(), Types.DOUBLE())}, + new String[] {"f1", "f2", "f3"} + ); + + new JsonRowSerializationSchema(rowSchema); + } + @Test(expected = IllegalStateException.class) public void testSerializeRowWithInvalidNumberOfFields() { - String[] fieldNames = new String[] {"f1", "f2", "f3"}; + RowTypeInfo rowSchema = new RowTypeInfo( + new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()}, + new String[] {"f1", "f2", "f3"} + ); + Row row = new Row(1); row.setField(0, 1); - JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema); serializationSchema.serialize(row); } - private Row serializeAndDeserialize(String[] fieldNames, TypeInformation[] fieldTypes, Row row) throws IOException { - JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames); - JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(Types.ROW(fieldNames, fieldTypes)); + private Row serializeAndDeserialize(RowTypeInfo rowSchema, Row row) throws IOException { + JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema); + JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema); byte[] bytes = serializationSchema.serialize(row); return deserializationSchema.deserialize(bytes); diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index 313815245f428..ac5259e6919a5 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -23,7 +23,6 @@ import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; import org.apache.flink.table.api.Types; import org.apache.flink.types.Row; @@ -46,32 +45,27 @@ public abstract class KafkaTableSinkTestBase { private static final String TOPIC = "testTopic"; - protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; + private static final String[] FIELD_NAMES = new String[] {"field1", "field2"}; private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() }; private static final FlinkKafkaPartitioner PARTITIONER = new CustomPartitioner(); private static final Properties PROPERTIES = createSinkProperties(); - @SuppressWarnings("unchecked") - private final FlinkKafkaProducerBase producer = new FlinkKafkaProducerBase( - TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) { - - @Override - protected void flush() {} - }; - @Test @SuppressWarnings("unchecked") + @Test public void testKafkaTableSink() throws Exception { DataStream dataStream = mock(DataStream.class); KafkaTableSink kafkaTableSink = spy(createTableSink()); kafkaTableSink.emitDataStream(dataStream); - verify(dataStream).addSink(eq(producer)); + // verify correct producer class + verify(dataStream).addSink(any(getProducerClass())); + // verify correctly configured producer verify(kafkaTableSink).createKafkaProducer( eq(TOPIC), eq(PROPERTIES), - any(getSerializationSchema().getClass()), + any(getSerializationSchemaClass()), eq(PARTITIONER)); } @@ -86,13 +80,17 @@ public void testConfiguration() { assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType()); } - protected abstract KafkaTableSink createTableSink(String topic, Properties properties, - FlinkKafkaPartitioner partitioner, FlinkKafkaProducerBase kafkaProducer); + protected abstract KafkaTableSink createTableSink( + String topic, + Properties properties, + FlinkKafkaPartitioner partitioner); + + protected abstract Class> getSerializationSchemaClass(); - protected abstract SerializationSchema getSerializationSchema(); + protected abstract Class getProducerClass(); private KafkaTableSink createTableSink() { - return createTableSink(TOPIC, PROPERTIES, PARTITIONER, producer); + return createTableSink(TOPIC, PROPERTIES, PARTITIONER); } private static Properties createSinkProperties() {