diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
new file mode 100644
index 0000000000000..431ace0bbd676
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSink.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Kafka 0.10 {@link KafkaTableSink} that serializes data in JSON format.
+ */
+public class Kafka010JsonTableSink extends KafkaJsonTableSink {
+
+ /**
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10
+ * topic with fixed partition assignment.
+ *
+ *
Each parallel TableSink instance will write its rows to a single Kafka partition.
+ *
+ * - If the number of Kafka partitions is less than the number of sink instances, different
+ * sink instances will write to the same partition.
+ * - If the number of Kafka partitions is higher than the number of sink instance, some
+ * Kafka partitions won't receive data.
+ *
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ */
+ public Kafka010JsonTableSink(String topic, Properties properties) {
+ super(topic, properties, new FlinkFixedPartitioner<>());
+ }
+
+ /**
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.10
+ * topic with custom partition assignment.
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ * @param partitioner Kafka partitioner
+ */
+ public Kafka010JsonTableSink(String topic, Properties properties, FlinkKafkaPartitioner partitioner) {
+ super(topic, properties, partitioner);
+ }
+
+ @Override
+ protected FlinkKafkaProducerBase createKafkaProducer(String topic, Properties properties, SerializationSchema serializationSchema, FlinkKafkaPartitioner partitioner) {
+ return new FlinkKafkaProducer010<>(topic, serializationSchema, properties, partitioner);
+ }
+
+ @Override
+ protected Kafka010JsonTableSink createCopy() {
+ return new Kafka010JsonTableSink(topic, properties, partitioner);
+ }
+}
diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
new file mode 100644
index 0000000000000..4d805d5f3ce07
--- /dev/null
+++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSinkTest.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kafka;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
+import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
+import org.apache.flink.types.Row;
+
+import java.util.Properties;
+
+/**
+ * Tests for the {@link Kafka010JsonTableSink}.
+ */
+public class Kafka010JsonTableSinkTest extends KafkaTableSinkTestBase {
+
+ @Override
+ protected KafkaTableSink createTableSink(
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner partitioner) {
+
+ return new Kafka010JsonTableSink(topic, properties, partitioner);
+ }
+
+ @Override
+ protected Class extends SerializationSchema> getSerializationSchemaClass() {
+ return JsonRowSerializationSchema.class;
+ }
+
+ @Override
+ protected Class extends FlinkKafkaProducerBase> getProducerClass() {
+ return FlinkKafkaProducer010.class;
+ }
+
+}
+
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
index a887048a5d12f..39d5cb2c9e5d1 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -32,7 +33,27 @@
public class Kafka08JsonTableSink extends KafkaJsonTableSink {
/**
- * Creates {@link KafkaTableSink} for Kafka 0.8.
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8
+ * topic with fixed partition assignment.
+ *
+ * Each parallel TableSink instance will write its rows to a single Kafka partition.
+ *
+ * - If the number of Kafka partitions is less than the number of sink instances, different
+ * sink instances will write to the same partition.
+ * - If the number of Kafka partitions is higher than the number of sink instance, some
+ * Kafka partitions won't receive data.
+ *
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ */
+ public Kafka08JsonTableSink(String topic, Properties properties) {
+ super(topic, properties, new FlinkFixedPartitioner<>());
+ }
+
+ /**
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8
+ * topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
@@ -43,7 +64,8 @@ public Kafka08JsonTableSink(String topic, Properties properties, FlinkKafkaParti
}
/**
- * Creates {@link KafkaTableSink} for Kafka 0.8.
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.8
+ * topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
index 890fc3abd7cce..d7bb683b10e2a 100644
--- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java
@@ -34,26 +34,19 @@ public class Kafka08JsonTableSinkTest extends KafkaTableSinkTestBase {
protected KafkaTableSink createTableSink(
String topic,
Properties properties,
- FlinkKafkaPartitioner partitioner,
- final FlinkKafkaProducerBase kafkaProducer) {
-
- return new Kafka08JsonTableSink(topic, properties, partitioner) {
- @Override
- protected FlinkKafkaProducerBase createKafkaProducer(
- String topic,
- Properties properties,
- SerializationSchema serializationSchema,
- FlinkKafkaPartitioner partitioner) {
-
- return kafkaProducer;
- }
- };
+ FlinkKafkaPartitioner partitioner) {
+
+ return new Kafka08JsonTableSink(topic, properties, partitioner);
+ }
+
+ @Override
+ protected Class extends SerializationSchema> getSerializationSchemaClass() {
+ return JsonRowSerializationSchema.class;
}
@Override
- @SuppressWarnings("unchecked")
- protected SerializationSchema getSerializationSchema() {
- return new JsonRowSerializationSchema(FIELD_NAMES);
+ protected Class extends FlinkKafkaProducerBase> getProducerClass() {
+ return FlinkKafkaProducer08.class;
}
}
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
index f65a02d717d1b..a4d266184ce40 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner;
@@ -32,7 +33,27 @@
public class Kafka09JsonTableSink extends KafkaJsonTableSink {
/**
- * Creates {@link KafkaTableSink} for Kafka 0.9 .
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9
+ * topic with fixed partition assignment.
+ *
+ * Each parallel TableSink instance will write its rows to a single Kafka partition.
+ *
+ * - If the number of Kafka partitions is less than the number of sink instances, different
+ * sink instances will write to the same partition.
+ * - If the number of Kafka partitions is higher than the number of sink instance, some
+ * Kafka partitions won't receive data.
+ *
+ *
+ * @param topic topic in Kafka to which table is written
+ * @param properties properties to connect to Kafka
+ */
+ public Kafka09JsonTableSink(String topic, Properties properties) {
+ super(topic, properties, new FlinkFixedPartitioner<>());
+ }
+
+ /**
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9
+ * topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
@@ -43,7 +64,8 @@ public Kafka09JsonTableSink(String topic, Properties properties, FlinkKafkaParti
}
/**
- * Creates {@link KafkaTableSink} for Kafka 0.9 .
+ * Creates {@link KafkaTableSink} to write table rows as JSON-encoded records to a Kafka 0.9
+ * topic with custom partition assignment.
*
* @param topic topic in Kafka to which table is written
* @param properties properties to connect to Kafka
diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
index c52b4ca9cc427..58f2b0555eb90 100644
--- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
+++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java
@@ -34,26 +34,19 @@ public class Kafka09JsonTableSinkTest extends KafkaTableSinkTestBase {
protected KafkaTableSink createTableSink(
String topic,
Properties properties,
- FlinkKafkaPartitioner partitioner,
- final FlinkKafkaProducerBase kafkaProducer) {
-
- return new Kafka09JsonTableSink(topic, properties, partitioner) {
- @Override
- protected FlinkKafkaProducerBase createKafkaProducer(
- String topic,
- Properties properties,
- SerializationSchema serializationSchema,
- FlinkKafkaPartitioner partitioner) {
-
- return kafkaProducer;
- }
- };
+ FlinkKafkaPartitioner partitioner) {
+
+ return new Kafka09JsonTableSink(topic, properties, partitioner);
+ }
+
+ @Override
+ protected Class extends SerializationSchema> getSerializationSchemaClass() {
+ return JsonRowSerializationSchema.class;
}
@Override
- @SuppressWarnings("unchecked")
- protected SerializationSchema getSerializationSchema() {
- return new JsonRowSerializationSchema(FIELD_NAMES);
+ protected Class extends FlinkKafkaProducerBase> getProducerClass() {
+ return FlinkKafkaProducer09.class;
}
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
index f354dadb19bee..6665dbd9bed73 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java
@@ -19,6 +19,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.types.Row;
@@ -42,7 +43,7 @@ public KafkaJsonTableSink(String topic, Properties properties, FlinkKafkaPartiti
}
@Override
- protected SerializationSchema createSerializationSchema(String[] fieldNames) {
- return new JsonRowSerializationSchema(fieldNames);
+ protected SerializationSchema createSerializationSchema(RowTypeInfo rowSchema) {
+ return new JsonRowSerializationSchema(rowSchema);
}
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
index cac71dc708af6..f42827e5f35ce 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java
@@ -77,10 +77,10 @@ protected abstract FlinkKafkaProducerBase createKafkaProducer(
/**
* Create serialization schema for converting table rows into bytes.
*
- * @param fieldNames Field names in table rows.
+ * @param rowSchema the schema of the row to serialize.
* @return Instance of serialization schema
*/
- protected abstract SerializationSchema createSerializationSchema(String[] fieldNames);
+ protected abstract SerializationSchema createSerializationSchema(RowTypeInfo rowSchema);
/**
* Create a deep copy of this sink.
@@ -92,6 +92,8 @@ protected abstract FlinkKafkaProducerBase createKafkaProducer(
@Override
public void emitDataStream(DataStream dataStream) {
FlinkKafkaProducerBase kafkaProducer = createKafkaProducer(topic, properties, serializationSchema, partitioner);
+ // always enable flush on checkpoint to achieve at-least-once if query runs with checkpointing enabled.
+ kafkaProducer.setFlushOnCheckpoint(true);
dataStream.addSink(kafkaProducer);
}
@@ -116,7 +118,9 @@ public KafkaTableSink configure(String[] fieldNames, TypeInformation>[] fieldT
copy.fieldTypes = Preconditions.checkNotNull(fieldTypes, "fieldTypes");
Preconditions.checkArgument(fieldNames.length == fieldTypes.length,
"Number of provided field names and types does not match.");
- copy.serializationSchema = createSerializationSchema(fieldNames);
+
+ RowTypeInfo rowSchema = new RowTypeInfo(fieldTypes, fieldNames);
+ copy.serializationSchema = createSerializationSchema(rowSchema);
return copy;
}
diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
index 5ece193a838b4..36d3137112a6f 100644
--- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
+++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java
@@ -18,6 +18,9 @@
package org.apache.flink.streaming.util.serialization;
import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.CompositeType;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;
@@ -43,10 +46,23 @@ public class JsonRowSerializationSchema implements SerializationSchema {
/**
* Creates a JSON serialization schema for the given fields and types.
*
- * @param fieldNames Names of JSON fields to parse.
+ * @param rowSchema The schema of the rows to encode.
*/
- public JsonRowSerializationSchema(String[] fieldNames) {
- this.fieldNames = Preconditions.checkNotNull(fieldNames);
+ public JsonRowSerializationSchema(RowTypeInfo rowSchema) {
+
+ Preconditions.checkNotNull(rowSchema);
+ String[] fieldNames = rowSchema.getFieldNames();
+ TypeInformation[] fieldTypes = rowSchema.getFieldTypes();
+
+ // check that no field is composite
+ for (int i = 0; i < fieldTypes.length; i++) {
+ if (fieldTypes[i] instanceof CompositeType) {
+ throw new IllegalArgumentException("JsonRowSerializationSchema cannot encode rows with nested schema, " +
+ "but field '" + fieldNames[i] + "' is nested: " + fieldTypes[i].toString());
+ }
+ }
+
+ this.fieldNames = fieldNames;
}
@Override
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
index 43bde35bb6a05..70140a6c584e6 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/JsonRowSerializationSchemaTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.kafka;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema;
import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema;
import org.apache.flink.table.api.Types;
@@ -36,31 +37,34 @@ public class JsonRowSerializationSchemaTest {
@Test
public void testRowSerialization() throws IOException {
- String[] fieldNames = new String[] {"f1", "f2", "f3"};
- TypeInformation>[] fieldTypes = new TypeInformation>[] { Types.INT(), Types.BOOLEAN(), Types.STRING() };
+ RowTypeInfo rowSchema = new RowTypeInfo(
+ new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()},
+ new String[] {"f1", "f2", "f3"}
+ );
+
Row row = new Row(3);
row.setField(0, 1);
row.setField(1, true);
row.setField(2, "str");
- Row resultRow = serializeAndDeserialize(fieldNames, fieldTypes, row);
+ Row resultRow = serializeAndDeserialize(rowSchema, row);
assertEqualRows(row, resultRow);
}
@Test
public void testSerializationOfTwoRows() throws IOException {
- String[] fieldNames = new String[] {"f1", "f2", "f3"};
- TypeInformation row = Types.ROW(
- fieldNames,
- new TypeInformation>[] { Types.INT(), Types.BOOLEAN(), Types.STRING() }
+ RowTypeInfo rowSchema = new RowTypeInfo(
+ new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()},
+ new String[] {"f1", "f2", "f3"}
);
+
Row row1 = new Row(3);
row1.setField(0, 1);
row1.setField(1, true);
row1.setField(2, "str");
- JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
- JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(row);
+ JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
+ JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema);
byte[] bytes = serializationSchema.serialize(row1);
assertEqualRows(row1, deserializationSchema.deserialize(bytes));
@@ -79,19 +83,33 @@ public void testInputValidation() {
new JsonRowSerializationSchema(null);
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testRejectNestedSchema() {
+ RowTypeInfo rowSchema = new RowTypeInfo(
+ new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.ROW(Types.INT(), Types.DOUBLE())},
+ new String[] {"f1", "f2", "f3"}
+ );
+
+ new JsonRowSerializationSchema(rowSchema);
+ }
+
@Test(expected = IllegalStateException.class)
public void testSerializeRowWithInvalidNumberOfFields() {
- String[] fieldNames = new String[] {"f1", "f2", "f3"};
+ RowTypeInfo rowSchema = new RowTypeInfo(
+ new TypeInformation[]{Types.INT(), Types.BOOLEAN(), Types.STRING()},
+ new String[] {"f1", "f2", "f3"}
+ );
+
Row row = new Row(1);
row.setField(0, 1);
- JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
+ JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
serializationSchema.serialize(row);
}
- private Row serializeAndDeserialize(String[] fieldNames, TypeInformation>[] fieldTypes, Row row) throws IOException {
- JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(fieldNames);
- JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(Types.ROW(fieldNames, fieldTypes));
+ private Row serializeAndDeserialize(RowTypeInfo rowSchema, Row row) throws IOException {
+ JsonRowSerializationSchema serializationSchema = new JsonRowSerializationSchema(rowSchema);
+ JsonRowDeserializationSchema deserializationSchema = new JsonRowDeserializationSchema(rowSchema);
byte[] bytes = serializationSchema.serialize(row);
return deserializationSchema.deserialize(bytes);
diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
index 313815245f428..ac5259e6919a5 100644
--- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
+++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java
@@ -23,7 +23,6 @@
import org.apache.flink.api.java.typeutils.RowTypeInfo;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner;
-import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper;
import org.apache.flink.table.api.Types;
import org.apache.flink.types.Row;
@@ -46,32 +45,27 @@
public abstract class KafkaTableSinkTestBase {
private static final String TOPIC = "testTopic";
- protected static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
+ private static final String[] FIELD_NAMES = new String[] {"field1", "field2"};
private static final TypeInformation[] FIELD_TYPES = new TypeInformation[] { Types.INT(), Types.STRING() };
private static final FlinkKafkaPartitioner PARTITIONER = new CustomPartitioner();
private static final Properties PROPERTIES = createSinkProperties();
- @SuppressWarnings("unchecked")
- private final FlinkKafkaProducerBase producer = new FlinkKafkaProducerBase(
- TOPIC, new KeyedSerializationSchemaWrapper(getSerializationSchema()), PROPERTIES, PARTITIONER) {
-
- @Override
- protected void flush() {}
- };
- @Test
@SuppressWarnings("unchecked")
+ @Test
public void testKafkaTableSink() throws Exception {
DataStream dataStream = mock(DataStream.class);
KafkaTableSink kafkaTableSink = spy(createTableSink());
kafkaTableSink.emitDataStream(dataStream);
- verify(dataStream).addSink(eq(producer));
+ // verify correct producer class
+ verify(dataStream).addSink(any(getProducerClass()));
+ // verify correctly configured producer
verify(kafkaTableSink).createKafkaProducer(
eq(TOPIC),
eq(PROPERTIES),
- any(getSerializationSchema().getClass()),
+ any(getSerializationSchemaClass()),
eq(PARTITIONER));
}
@@ -86,13 +80,17 @@ public void testConfiguration() {
assertEquals(new RowTypeInfo(FIELD_TYPES), newKafkaTableSink.getOutputType());
}
- protected abstract KafkaTableSink createTableSink(String topic, Properties properties,
- FlinkKafkaPartitioner partitioner, FlinkKafkaProducerBase kafkaProducer);
+ protected abstract KafkaTableSink createTableSink(
+ String topic,
+ Properties properties,
+ FlinkKafkaPartitioner partitioner);
+
+ protected abstract Class extends SerializationSchema> getSerializationSchemaClass();
- protected abstract SerializationSchema getSerializationSchema();
+ protected abstract Class extends FlinkKafkaProducerBase> getProducerClass();
private KafkaTableSink createTableSink() {
- return createTableSink(TOPIC, PROPERTIES, PARTITIONER, producer);
+ return createTableSink(TOPIC, PROPERTIES, PARTITIONER);
}
private static Properties createSinkProperties() {