From fe931d075f031e9494fd26dbeed4bb1024bd52cf Mon Sep 17 00:00:00 2001 From: Stephan Ewen Date: Thu, 2 Nov 2017 18:07:25 +0100 Subject: [PATCH] [FLINK-7972] [core] Move SerializationSchema to 'flink-core' Moves the SerializationSchema and its related from flink-streaming-java to flink-core. That helps API level projects that depend on those classes to not pull in a dependency on runtime classes, and to not be Scala version dependent. --- docs/quickstart/run_example_quickstart.md | 2 +- .../kafka/FlinkKafkaConsumer010.java | 2 +- .../kafka/FlinkKafkaProducer010.java | 2 +- .../kafka/Kafka010AvroTableSource.java | 2 +- .../kafka/Kafka010JsonTableSource.java | 2 +- .../connectors/kafka/Kafka010TableSource.java | 2 +- .../kafka/Kafka010AvroTableSourceTest.java | 4 +- .../connectors/kafka/Kafka010ITCase.java | 2 +- .../kafka/Kafka010JsonTableSourceTest.java | 2 +- .../kafka/internal/Kafka010FetcherTest.java | 2 +- .../kafka/FlinkKafkaConsumer011.java | 2 +- .../kafka/FlinkKafkaProducer011.java | 3 +- .../kafka/Kafka011AvroTableSource.java | 2 +- .../kafka/Kafka011JsonTableSource.java | 2 +- .../connectors/kafka/Kafka011TableSource.java | 2 +- .../kafka/FlinkKafkaProducer011ITCase.java | 2 +- .../kafka/Kafka011AvroTableSourceTest.java | 4 +- .../connectors/kafka/Kafka011ITCase.java | 2 +- .../kafka/Kafka011JsonTableSourceTest.java | 2 +- .../kafka/FlinkKafkaConsumer08.java | 2 +- .../kafka/FlinkKafkaConsumer081.java | 2 +- .../kafka/FlinkKafkaConsumer082.java | 2 +- .../connectors/kafka/FlinkKafkaProducer.java | 2 +- .../kafka/FlinkKafkaProducer08.java | 2 +- .../kafka/Kafka08AvroTableSource.java | 2 +- .../kafka/Kafka08JsonTableSink.java | 2 +- .../kafka/Kafka08JsonTableSource.java | 2 +- .../connectors/kafka/Kafka08TableSource.java | 2 +- .../kafka/Kafka08AvroTableSourceTest.java | 4 +- .../kafka/Kafka08JsonTableSinkTest.java | 2 +- .../kafka/Kafka08JsonTableSourceTest.java | 2 +- .../connectors/kafka/KafkaConsumer08Test.java | 4 +- .../connectors/kafka/KafkaProducerTest.java | 2 +- .../kafka/FlinkKafkaConsumer09.java | 2 +- .../kafka/FlinkKafkaProducer09.java | 2 +- .../kafka/Kafka09AvroTableSource.java | 2 +- .../kafka/Kafka09JsonTableSink.java | 2 +- .../kafka/Kafka09JsonTableSource.java | 2 +- .../connectors/kafka/Kafka09TableSource.java | 2 +- .../kafka/Kafka09AvroTableSourceTest.java | 4 +- .../kafka/Kafka09JsonTableSinkTest.java | 2 +- .../kafka/Kafka09JsonTableSourceTest.java | 2 +- .../connectors/kafka/KafkaProducerTest.java | 2 +- .../kafka/internal/Kafka09FetcherTest.java | 2 +- .../kafka/KafkaAvroTableSource.java | 6 +- .../connectors/kafka/KafkaJsonTableSink.java | 2 +- .../kafka/KafkaJsonTableSource.java | 2 +- .../connectors/kafka/KafkaTableSink.java | 2 +- .../connectors/kafka/KafkaTableSource.java | 2 +- .../AvroRowDeserializationSchema.java | 1 + .../AvroRowSerializationSchema.java | 1 + .../JSONDeserializationSchema.java | 2 + .../JsonRowDeserializationSchema.java | 1 + .../JsonRowSerializationSchema.java | 1 + .../KeyedDeserializationSchemaWrapper.java | 1 + .../KeyedSerializationSchemaWrapper.java | 2 + .../kafka/FlinkKafkaProducerBaseTest.java | 2 +- .../kafka/KafkaConsumerTestBase.java | 6 +- .../kafka/KafkaProducerTestBase.java | 4 +- .../kafka/KafkaShortRetentionTestBase.java | 2 +- .../kafka/KafkaTableSinkTestBase.java | 2 +- .../kafka/KafkaTableSourceTestBase.java | 2 +- .../kafka/KafkaTestEnvironment.java | 3 +- .../kafka/testutils/DataGenerators.java | 4 +- .../connectors/rabbitmq/RMQSink.java | 2 +- .../connectors/rabbitmq/RMQSource.java | 2 +- .../connectors/rabbitmq/RMQSinkTest.java | 2 +- .../connectors/rabbitmq/RMQSourceTest.java | 2 +- flink-core/pom.xml | 15 ++ .../AbstractDeserializationSchema.java | 70 ++++++++++ .../serialization/DeserializationSchema.java | 56 ++++++++ .../serialization/SerializationSchema.java | 42 ++++++ .../serialization/SimpleStringSchema.java | 107 ++++++++++++++ .../TypeInformationSerializationSchema.java | 131 ++++++++++++++++++ .../AbstractDeserializationSchemaTest.java | 3 +- .../serialization/SimpleStringSchemaTest.java | 2 +- ...ypeInformationSerializationSchemaTest.java | 3 +- .../examples/kafka/Kafka010Example.java | 2 +- .../examples/kafka/Kafka010Example.scala | 2 +- flink-streaming-java/pom.xml | 6 - .../streaming/api/datastream/DataStream.java | 2 +- .../api/functions/sink/SocketClientSink.java | 2 +- .../AbstractDeserializationSchema.java | 43 ++---- .../serialization/DeserializationSchema.java | 10 +- .../serialization/SerializationSchema.java | 7 +- .../serialization/SimpleStringSchema.java | 73 ++-------- .../TypeInformationSerializationSchema.java | 96 ++----------- .../functions/sink/SocketClientSinkTest.java | 2 +- .../streaming/api/scala/DataStream.scala | 2 +- .../api/scala/OutputFormatTestPrograms.scala | 2 +- 90 files changed, 555 insertions(+), 278 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java rename {flink-streaming-java/src/test/java/org/apache/flink/streaming/util => flink-core/src/test/java/org/apache/flink/api/common/serialization}/AbstractDeserializationSchemaTest.java (96%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming/util => flink-core/src/test/java/org/apache/flink/api/common}/serialization/SimpleStringSchemaTest.java (97%) rename {flink-streaming-java/src/test/java/org/apache/flink/streaming/util => flink-core/src/test/java/org/apache/flink/api/common/serialization}/TypeInformationSerializationSchemaTest.java (96%) diff --git a/docs/quickstart/run_example_quickstart.md b/docs/quickstart/run_example_quickstart.md index cf73799ffe606..d5c48c96bbc98 100644 --- a/docs/quickstart/run_example_quickstart.md +++ b/docs/quickstart/run_example_quickstart.md @@ -326,7 +326,7 @@ result The related classes also need to be imported: {% highlight java %} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer08; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.functions.MapFunction; {% endhighlight %} diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java index 3a6a13b59cc66..f56947729aadf 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer010.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -27,7 +28,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.util.PropertiesUtil; diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java index 857526823692d..184a2e714536d 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer010.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.functions.sink.SinkFunction; @@ -28,7 +29,6 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.kafka.clients.producer.ProducerRecord; diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java index 01e6329ee0730..fbc58ea0bdd08 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSource.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java index e263cf24ba332..bbdb32ffa846e 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSource.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; diff --git a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java index 5475c9f25043c..bc675eb6cd124 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka010TableSource.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.types.Row; diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java index e977d494fe861..f5f8af138d8fd 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010AvroTableSourceTest.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDeserializationSchema; import org.apache.flink.types.Row; /** diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java index 3aa0d1aab11f8..c2b3dfa1056bd 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010ITCase.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.GenericTypeInfo; @@ -37,7 +38,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.junit.Test; diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java index 9a0ab04f16951..0f6c53138796b 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka010JsonTableSourceTest.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.apache.flink.types.Row; diff --git a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java index f895e2f35e5da..45ceadc1863fc 100644 --- a/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.10/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka010FetcherTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka.internal; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; @@ -29,7 +30,6 @@ import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java index 8d165c3d80b71..6f75828692e8f 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer011.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java index c27c620f3dd01..873ef08f71205 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.time.Time; @@ -42,7 +43,6 @@ import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.NetUtils; @@ -59,6 +59,7 @@ import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.errors.InvalidTxnStateException; import org.apache.kafka.common.serialization.ByteArraySerializer; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java index 81d34969a865b..af3b5afc38248 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSource.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java index 1aff6704e8677..71158f6a9e40e 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSource.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; diff --git a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java index 576a4212c5e99..dbf980b7c2b2c 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka011TableSource.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.types.Row; diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java index 295451f9bb71a..922344d18b947 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer011ITCase.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeutils.base.IntSerializer; import org.apache.flink.runtime.state.OperatorStateHandle; @@ -27,7 +28,6 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.apache.flink.shaded.guava18.com.google.common.collect.Iterables; diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java index bde0761d4842c..e348aa6bfb33e 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011AvroTableSourceTest.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDeserializationSchema; import org.apache.flink.types.Row; /** diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java index 6d259fafc36ae..99d5b56d94bc1 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011ITCase.java @@ -19,6 +19,7 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.typeutils.GenericTypeInfo; @@ -37,7 +38,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.junit.BeforeClass; import org.junit.Test; diff --git a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java index 6870eccb23a34..fc2957e7d829e 100644 --- a/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.11/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka011JsonTableSourceTest.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.apache.flink.types.Row; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java index f41a4e3f5dedf..0a70f61e2716e 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer08.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -27,7 +28,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.Kafka08PartitionDiscoverer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.util.PropertiesUtil; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java index 4102bf8107fdf..c65ccc15fd41b 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer081.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import java.util.Properties; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java index 7ba510308fc37..e5374fbc0710a 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer082.java @@ -17,7 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import java.util.Properties; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java index 434286e5e89d0..1911c8ded964f 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java @@ -17,11 +17,11 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import java.util.Properties; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java index a14768b351796..2fce9f9655557 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer08.java @@ -17,13 +17,13 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import java.util.Properties; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java index 998f4d4e29711..8f45881ab7e8c 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSource.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java index 79406d86dbf00..a887048a5d12f 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSink.java @@ -18,10 +18,10 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.types.Row; import java.util.Properties; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java index aab9ea8583ada..b3b37c6c3a331 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSource.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java index b2b949b425fbf..8270b789417af 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka08TableSource.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.types.Row; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java index 348f0f2e09475..22daafe8e1aa7 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08AvroTableSourceTest.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDeserializationSchema; import org.apache.flink.types.Row; /** diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java index ac92c8a1876db..890fc3abd7cce 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSinkTest.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.types.Row; import java.util.Properties; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java index 01a0123f1c9fc..7e3349c5a3be7 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka08JsonTableSourceTest.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.apache.flink.types.Row; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java index 9e4d3b779c88c..8627ccbbcb957 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumer08Test.java @@ -19,11 +19,11 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.kafka.internals.Kafka08PartitionDiscoverer; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.NetUtils; import org.apache.kafka.clients.consumer.ConsumerConfig; diff --git a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java index fc8678fa37b11..304351cd14300 100644 --- a/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java +++ b/flink-connectors/flink-connector-kafka-0.8/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -18,12 +18,12 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.TestLogger; import org.apache.kafka.clients.producer.Callback; diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java index 0cf40e67ae155..65be712a35607 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumer09.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; @@ -27,7 +28,6 @@ import org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition; import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicsDescriptor; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.util.PropertiesUtil; diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java index 6b9768ed1c69a..946f7e95c7215 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer09.java @@ -17,13 +17,13 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkFixedPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import java.util.Properties; diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java index a90a8d86426fe..808be012512f2 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSource.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java index b2227cd67d3d1..f65a02d717d1b 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSink.java @@ -18,10 +18,10 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaDelegatePartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.partitioner.KafkaPartitioner; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.types.Row; import java.util.Properties; diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java index 2f057d729062d..a699d65f56dd4 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSource.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; import org.apache.flink.table.sources.StreamTableSource; diff --git a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java index 4d1166cf63f40..1d2c02898cb55 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/main/java/org/apache/flink/streaming/connectors/kafka/Kafka09TableSource.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.StreamTableSource; import org.apache.flink.types.Row; diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java index 27445e15f1772..0ab8a4be61cb4 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09AvroTableSourceTest.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDeserializationSchema; import org.apache.flink.types.Row; /** diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java index c8fb4cd4d0cad..c52b4ca9cc427 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSinkTest.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.types.Row; import java.util.Properties; diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java index ed3fafba5ef72..aeee175977507 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/Kafka09JsonTableSourceTest.java @@ -18,7 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.apache.flink.types.Row; diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java index 6b6c43f72144b..37f3fe8a30b4e 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTest.java @@ -18,12 +18,12 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.FakeStandardProducerConfig; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.TestLogger; import org.apache.kafka.clients.producer.Callback; diff --git a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java index 33ec17e2e0455..e4e276a966971 100644 --- a/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java +++ b/flink-connectors/flink-connector-kafka-0.9/src/test/java/org/apache/flink/streaming/connectors/kafka/internal/Kafka09FetcherTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka.internal; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.core.testutils.MultiShotLatch; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; @@ -29,7 +30,6 @@ import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java index 0cc9801ed67ba..8cea36ca5aa69 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaAvroTableSource.java @@ -18,13 +18,13 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.AvroTypeInfo; import org.apache.flink.api.java.typeutils.GenericTypeInfo; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.streaming.util.serialization.AvroRowDeserializationSchema; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; +import org.apache.flink.formats.avro.AvroRowDeserializationSchema; +import org.apache.flink.formats.avro.typeutils.AvroTypeInfo; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.DefinedFieldMapping; import org.apache.flink.table.sources.StreamTableSource; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java index 51fd9523397a3..f354dadb19bee 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSink.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.JsonRowSerializationSchema; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.types.Row; import java.util.Properties; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java index a91cc25702037..9a6525ce88f73 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaJsonTableSource.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.java.typeutils.RowTypeInfo; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.JsonRowDeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.sources.DefinedFieldMapping; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java index a94936c354574..cac71dc708af6 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSink.java @@ -18,11 +18,11 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.table.sinks.AppendStreamTableSink; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java index 0bd04e47b6c3b..3291f7dc068ac 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSource.java @@ -18,10 +18,10 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.api.ValidationException; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java index c7f1d82433342..0d36f4c9e0be0 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowDeserializationSchema.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.util.serialization; +import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java index 09acc6a44e32f..6f03b12e9280a 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/AvroRowSerializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.util.serialization; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java index 4523da7a261b3..f60a0b71d9865 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JSONDeserializationSchema.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.util.serialization; +import org.apache.flink.api.common.serialization.AbstractDeserializationSchema; + import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java index d777c8dd4af53..100f9608f6214 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowDeserializationSchema.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.util.serialization; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.types.Row; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java index 13a3677e315e4..5ece193a838b4 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/JsonRowSerializationSchema.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.util.serialization; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.types.Row; import org.apache.flink.util.Preconditions; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java index e128abab3fb6a..93b4f68147ca8 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedDeserializationSchemaWrapper.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.util.serialization; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import java.io.IOException; diff --git a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java index 0a181d1e1a2db..70ae89799f9d9 100644 --- a/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java +++ b/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/util/serialization/KeyedSerializationSchemaWrapper.java @@ -17,6 +17,8 @@ package org.apache.flink.streaming.util.serialization; +import org.apache.flink.api.common.serialization.SerializationSchema; + /** * A simple wrapper for using the SerializationSchema with the KeyedDeserializationSchema * interface. diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java index 6b4b6ffe055de..d462953d5db4e 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducerBaseTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CheckedThread; import org.apache.flink.core.testutils.MultiShotLatch; @@ -31,7 +32,6 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.KafkaProducer; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java index e9a0331247ed0..0a5608a8d4c27 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaConsumerTestBase.java @@ -23,6 +23,9 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.DeserializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -60,14 +63,11 @@ import org.apache.flink.streaming.connectors.kafka.testutils.ThrottledMapper; import org.apache.flink.streaming.connectors.kafka.testutils.Tuple2FlinkPartitioner; import org.apache.flink.streaming.connectors.kafka.testutils.ValidatingExactlyOnceSink; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.streaming.util.serialization.TypeInformationKeyValueSerializationSchema; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.apache.flink.test.util.SuccessException; import org.apache.flink.testutils.junit.RetryOnException; import org.apache.flink.testutils.junit.RetryRule; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java index f81fcf129f201..7ba3c95a404ff 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaProducerTestBase.java @@ -21,6 +21,8 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; @@ -41,8 +43,6 @@ import org.apache.flink.streaming.connectors.kafka.testutils.IntegerSource; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.apache.flink.test.util.SuccessException; import org.apache.flink.test.util.TestUtils; import org.apache.flink.util.Preconditions; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java index 30f6dc276dd7d..de72985f6b986 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaShortRetentionTestBase.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.kafka; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.configuration.ConfigConstants; @@ -32,7 +33,6 @@ import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; import org.apache.flink.util.InstantiationUtil; import org.junit.AfterClass; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java index dcf316713b9ec..313815245f428 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSinkTestBase.java @@ -18,12 +18,12 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.RowTypeInfo; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.table.api.Types; import org.apache.flink.types.Row; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java index 218401cbdeebc..7a882f48d5f5a 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTableSourceTestBase.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.Types; import org.apache.flink.table.sources.RowtimeAttributeDescriptor; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java index 21171f881e370..68514741aa10f 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaTestEnvironment.java @@ -17,17 +17,18 @@ package org.apache.flink.streaming.connectors.kafka; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.networking.NetworkFailuresProxy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.kafka.partitioner.FlinkKafkaPartitioner; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchema; import org.apache.flink.streaming.util.serialization.KeyedDeserializationSchemaWrapper; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchema; import kafka.server.KafkaServer; + import org.apache.kafka.clients.consumer.ConsumerRecord; import java.util.ArrayList; diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java index b204ea991050b..e432a6589dad5 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/testutils/DataGenerators.java @@ -21,6 +21,8 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.serialization.TypeInformationSerializationSchema; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.streaming.api.datastream.DataStream; @@ -36,8 +38,6 @@ import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.serialization.KeyedSerializationSchemaWrapper; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import java.util.Collection; import java.util.Properties; diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java index b3c6f8c203526..c1118ed6b0e54 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java +++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSink.java @@ -17,10 +17,10 @@ package org.apache.flink.streaming.connectors.rabbitmq; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; diff --git a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java index 5018bcf83a21d..d4541533f477c 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java +++ b/flink-connectors/flink-connector-rabbitmq/src/main/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSource.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.rabbitmq; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; @@ -25,7 +26,6 @@ import org.apache.flink.streaming.api.functions.source.MultipleIdsMessageAcknowledgingSourceBase; import org.apache.flink.streaming.api.operators.StreamingRuntimeContext; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.flink.util.Preconditions; import com.rabbitmq.client.Channel; diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java index 93f884b3c2ce1..53b834d2ead45 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSinkTest.java @@ -17,10 +17,10 @@ package org.apache.flink.streaming.connectors.rabbitmq; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.SinkContextUtil; import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; diff --git a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java index 0996355986d16..bbf893f46bb30 100644 --- a/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java +++ b/flink-connectors/flink-connector-rabbitmq/src/test/java/org/apache/flink/streaming/connectors/rabbitmq/RMQSourceTest.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.connectors.rabbitmq; import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.api.common.serialization.DeserializationSchema; import org.apache.flink.api.common.state.OperatorStateStore; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; @@ -32,7 +33,6 @@ import org.apache.flink.streaming.connectors.rabbitmq.common.RMQConnectionConfig; import org.apache.flink.streaming.runtime.tasks.OperatorStateHandles; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; -import org.apache.flink.streaming.util.serialization.DeserializationSchema; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; diff --git a/flink-core/pom.xml b/flink-core/pom.xml index 5b97273175f2a..ae3f56eb926c9 100644 --- a/flink-core/pom.xml +++ b/flink-core/pom.xml @@ -107,6 +107,9 @@ under the License. test + + joda-time joda-time @@ -118,6 +121,13 @@ under the License. joda-convert test + + + org.apache.flink + flink-shaded-jackson + test + + @@ -208,6 +218,11 @@ under the License. org.apache.flink.configuration.ConfigConstants#ENABLE_QUARANTINE_MONITOR org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_INITIAL_KEY org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_MAX_KEY + + + org.apache.flink.api.common.serialization.DeserializationSchema + org.apache.flink.api.common.serialization.SerializationSchema diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java new file mode 100644 index 0000000000000..871b7b106c180 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.TypeExtractor; + +import java.io.IOException; + +/** + * The deserialization schema describes how to turn the byte messages delivered by certain + * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are + * processed by Flink. + * + *

This base variant of the deserialization schema produces the type information + * automatically by extracting it from the generic class arguments. + * + * @param The type created by the deserialization schema. + */ +@PublicEvolving +public abstract class AbstractDeserializationSchema implements DeserializationSchema { + + private static final long serialVersionUID = 1L; + + /** + * De-serializes the byte message. + * + * @param message The message, as a byte array. + * @return The de-serialized message as an object. + */ + @Override + public abstract T deserialize(byte[] message) throws IOException; + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + *

This default implementation returns always false, meaning the stream is interpreted + * to be unbounded. + * + * @param nextElement The element to test for the end-of-stream signal. + * @return True, if the element signals end of stream, false otherwise. + */ + @Override + public boolean isEndOfStream(T nextElement) { + return false; + } + + @Override + public TypeInformation getProducedType() { + return TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 0, null, null); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java new file mode 100644 index 0000000000000..9de474391e0aa --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; + +import java.io.IOException; +import java.io.Serializable; + +/** + * The deserialization schema describes how to turn the byte messages delivered by certain + * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are + * processed by Flink. + * + *

Note: In most cases, one should start from {@link AbstractDeserializationSchema}, which + * takes care of producing the return type information automatically. + * + * @param The type created by the deserialization schema. + */ +@Public +public interface DeserializationSchema extends Serializable, ResultTypeQueryable { + + /** + * Deserializes the byte message. + * + * @param message The message, as a byte array. + * + * @return The deserialized message as an object (null if the message cannot be deserialized). + */ + T deserialize(byte[] message) throws IOException; + + /** + * Method to decide whether the element signals the end of the stream. If + * true is returned the element won't be emitted. + * + * @param nextElement The element to test for the end-of-stream signal. + * @return True, if the element signals end of stream, false otherwise. + */ + boolean isEndOfStream(T nextElement); +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java new file mode 100644 index 0000000000000..3a4eaeb467de5 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; + +import java.io.Serializable; + +/** + * The serialization schema describes how to turn a data object into a different serialized + * representation. Most data sinks (for example Apache Kafka) require the data to be handed + * to them in a specific format (for example as byte strings). + * + * @param The type to be serialized. + */ +@Public +public interface SerializationSchema extends Serializable { + + /** + * Serializes the incoming element to a specified type. + * + * @param element + * The incoming element to be serialized + * @return The serialized element. + */ + byte[] serialize(T element); +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java new file mode 100644 index 0000000000000..3130a10f2d896 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java @@ -0,0 +1,107 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; + +import java.io.IOException; +import java.io.ObjectOutputStream; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Very simple serialization schema for strings. + * + *

By default, the serializer uses "UTF-8" for string/byte conversion. + */ +@PublicEvolving +public class SimpleStringSchema implements DeserializationSchema, SerializationSchema { + + private static final long serialVersionUID = 1L; + + /** The charset to use to convert between strings and bytes. + * The field is transient because we serialize a different delegate object instead */ + private transient Charset charset; + + /** + * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding. + */ + public SimpleStringSchema() { + this(StandardCharsets.UTF_8); + } + + /** + * Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes. + * + * @param charset The charset to use to convert between strings and bytes. + */ + public SimpleStringSchema(Charset charset) { + this.charset = checkNotNull(charset); + } + + /** + * Gets the charset used by this schema for serialization. + * @return The charset used by this schema for serialization. + */ + public Charset getCharset() { + return charset; + } + + // ------------------------------------------------------------------------ + // Kafka Serialization + // ------------------------------------------------------------------------ + + @Override + public String deserialize(byte[] message) { + return new String(message, charset); + } + + @Override + public boolean isEndOfStream(String nextElement) { + return false; + } + + @Override + public byte[] serialize(String element) { + return element.getBytes(charset); + } + + @Override + public TypeInformation getProducedType() { + return BasicTypeInfo.STRING_TYPE_INFO; + } + + // ------------------------------------------------------------------------ + // Java Serialization + // ------------------------------------------------------------------------ + + private void writeObject (ObjectOutputStream out) throws IOException { + out.defaultWriteObject(); + out.writeUTF(charset.name()); + } + + private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { + in.defaultReadObject(); + String charsetName = in.readUTF(); + this.charset = Charset.forName(charsetName); + } +} diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java new file mode 100644 index 0000000000000..217a889b86879 --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.common.serialization; + +import org.apache.flink.annotation.Public; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; + +import java.io.IOException; + +/** + * A serialization and deserialization schema that uses Flink's serialization stack to + * transform typed from and to byte arrays. + * + * @param The type to be serialized. + */ +@Public +public class TypeInformationSerializationSchema implements DeserializationSchema, SerializationSchema { + + private static final long serialVersionUID = -5359448468131559102L; + + /** The serializer for the actual de-/serialization. */ + private final TypeSerializer serializer; + + /** The reusable output serialization buffer. */ + private transient DataOutputSerializer dos; + + /** The reusable input deserialization buffer. */ + private transient DataInputDeserializer dis; + + /** + * The type information, to be returned by {@link #getProducedType()}. It is transient, because + * it is not serializable. Note that this means that the type information is not available at + * runtime, but only prior to the first serialization / deserialization. + */ + private transient TypeInformation typeInfo; + + // ------------------------------------------------------------------------ + + /** + * Creates a new de-/serialization schema for the given type. + * + * @param typeInfo The type information for the type de-/serialized by this schema. + * @param ec The execution config, which is used to parametrize the type serializers. + */ + public TypeInformationSerializationSchema(TypeInformation typeInfo, ExecutionConfig ec) { + this.typeInfo = typeInfo; + this.serializer = typeInfo.createSerializer(ec); + } + + // ------------------------------------------------------------------------ + + @Override + public T deserialize(byte[] message) { + if (dis != null) { + dis.setBuffer(message, 0, message.length); + } else { + dis = new DataInputDeserializer(message, 0, message.length); + } + + try { + return serializer.deserialize(dis); + } + catch (IOException e) { + throw new RuntimeException("Unable to deserialize message", e); + } + } + + /** + * This schema never considers an element to signal end-of-stream, so this method returns always false. + * @param nextElement The element to test for the end-of-stream signal. + * @return Returns false. + */ + @Override + public boolean isEndOfStream(T nextElement) { + return false; + } + + @Override + public byte[] serialize(T element) { + if (dos == null) { + dos = new DataOutputSerializer(16); + } + + try { + serializer.serialize(element, dos); + } + catch (IOException e) { + throw new RuntimeException("Unable to serialize record", e); + } + + byte[] ret = dos.getByteArray(); + if (ret.length != dos.length()) { + byte[] n = new byte[dos.length()]; + System.arraycopy(ret, 0, n, 0, dos.length()); + ret = n; + } + dos.clear(); + return ret; + } + + @Override + public TypeInformation getProducedType() { + if (typeInfo != null) { + return typeInfo; + } + else { + throw new IllegalStateException( + "The type information is not available after this class has been serialized and distributed."); + } + } +} diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java similarity index 96% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java rename to flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java index 818a43bc18887..ec241b49af71e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java @@ -16,13 +16,12 @@ * limitations under the License. */ -package org.apache.flink.streaming.util; +package org.apache.flink.api.common.serialization; import org.apache.flink.api.common.functions.InvalidTypesException; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.JSONPObject; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java similarity index 97% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java rename to flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java index 6081ed1727e1a..482ff13a1e039 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.streaming.util.serialization; +package org.apache.flink.api.common.serialization; import org.apache.flink.core.testutils.CommonTestUtils; diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java similarity index 96% rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java rename to flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java index 317f2e3da5720..ef5f4b0a3e34f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java @@ -16,13 +16,12 @@ * limitations under the License. */ -package org.apache.flink.streaming.util; +package org.apache.flink.api.common.serialization; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema; import org.junit.Test; diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java index b5abbc50a5774..3fbd2b462ccea 100644 --- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java @@ -19,12 +19,12 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010; -import org.apache.flink.streaming.util.serialization.SimpleStringSchema; /** diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala index 2a52811940346..9f4fdc4c29489 100644 --- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala +++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala @@ -19,10 +19,10 @@ package org.apache.flink.streaming.scala.examples.kafka import org.apache.flink.api.common.restartstrategy.RestartStrategies +import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010} -import org.apache.flink.streaming.util.serialization.SimpleStringSchema /** * An example that shows how to read from and write to Kafka. This will read String messages diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml index 5b15b5a9b5b24..2683546274e2d 100644 --- a/flink-streaming-java/pom.xml +++ b/flink-streaming-java/pom.xml @@ -75,12 +75,6 @@ under the License. - - org.apache.flink - flink-shaded-jackson - test - - org.apache.flink flink-test-utils-junit diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java index d0769c69783e6..2274968a343a5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java @@ -31,6 +31,7 @@ import org.apache.flink.api.common.io.OutputFormat; import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.operators.ResourceSpec; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo; import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -91,7 +92,6 @@ import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner; import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner; import org.apache.flink.streaming.util.keys.KeySelectorUtil; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.Preconditions; import java.util.ArrayList; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java index 214d5c2eca92c..80e0dbe6770b1 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java @@ -18,8 +18,8 @@ package org.apache.flink.streaming.api.functions.sink; import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.SerializableObject; import org.slf4j.Logger; diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java index 02ea0041f0d73..7d30f9133b1a7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java @@ -18,10 +18,7 @@ package org.apache.flink.streaming.util.serialization; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.typeutils.TypeExtractor; - -import java.io.IOException; +import org.apache.flink.annotation.PublicEvolving; /** * The deserialization schema describes how to turn the byte messages delivered by certain @@ -32,37 +29,15 @@ * automatically by extracting it from the generic class arguments. * * @param The type created by the deserialization schema. + * + * @deprecated Use {@link org.apache.flink.api.common.serialization.AbstractDeserializationSchema} instead. */ -public abstract class AbstractDeserializationSchema implements DeserializationSchema { +@Deprecated +@PublicEvolving +@SuppressWarnings("deprecation") +public abstract class AbstractDeserializationSchema + extends org.apache.flink.api.common.serialization.AbstractDeserializationSchema + implements DeserializationSchema { private static final long serialVersionUID = 1L; - - /** - * De-serializes the byte message. - * - * @param message The message, as a byte array. - * @return The de-serialized message as an object. - */ - @Override - public abstract T deserialize(byte[] message) throws IOException; - - /** - * Method to decide whether the element signals the end of the stream. If - * true is returned the element won't be emitted. - * - *

This default implementation returns always false, meaning the stream is interpreted - * to be unbounded. - * - * @param nextElement The element to test for the end-of-stream signal. - * @return True, if the element signals end of stream, false otherwise. - */ - @Override - public boolean isEndOfStream(T nextElement) { - return false; - } - - @Override - public TypeInformation getProducedType() { - return TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 0, null, null); - } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java index 15ecb2cbff88d..cbaa004da6945 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java @@ -32,9 +32,15 @@ * takes care of producing the return type information automatically. * * @param The type created by the deserialization schema. + * + * @deprecated Use {@link org.apache.flink.api.common.serialization.DeserializationSchema} instead. */ @Public -public interface DeserializationSchema extends Serializable, ResultTypeQueryable { +@Deprecated +public interface DeserializationSchema extends + org.apache.flink.api.common.serialization.DeserializationSchema, + Serializable, + ResultTypeQueryable { /** * Deserializes the byte message. @@ -43,6 +49,7 @@ public interface DeserializationSchema extends Serializable, ResultTypeQuerya * * @return The deserialized message as an object (null if the message cannot be deserialized). */ + @Override T deserialize(byte[] message) throws IOException; /** @@ -52,5 +59,6 @@ public interface DeserializationSchema extends Serializable, ResultTypeQuerya * @param nextElement The element to test for the end-of-stream signal. * @return True, if the element signals end of stream, false otherwise. */ + @Override boolean isEndOfStream(T nextElement); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java index 986cfb3039fda..c7c1de06fb90e 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java @@ -27,9 +27,13 @@ * to them in a specific format (for example as byte strings). * * @param The type to be serialized. + * + * @deprecated Use {@link org.apache.flink.api.common.serialization.SerializationSchema} instead. */ @Public -public interface SerializationSchema extends Serializable { +@Deprecated +public interface SerializationSchema + extends org.apache.flink.api.common.serialization.SerializationSchema, Serializable { /** * Serializes the incoming element to a specified type. @@ -38,5 +42,6 @@ public interface SerializationSchema extends Serializable { * The incoming element to be serialized * @return The serialized element. */ + @Override byte[] serialize(T element); } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java index 27ba9e913331f..01ce30dd48c03 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java @@ -18,35 +18,27 @@ package org.apache.flink.streaming.util.serialization; import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.typeinfo.BasicTypeInfo; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import java.io.IOException; -import java.io.ObjectOutputStream; import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; - -import static org.apache.flink.util.Preconditions.checkNotNull; /** * Very simple serialization schema for strings. * *

By default, the serializer uses "UTF-8" for string/byte conversion. + * + * @deprecated Use {@link org.apache.flink.api.common.serialization.SimpleStringSchema} instead. */ @PublicEvolving -public class SimpleStringSchema implements DeserializationSchema, SerializationSchema { +@Deprecated +@SuppressWarnings("deprecation") +public class SimpleStringSchema + extends org.apache.flink.api.common.serialization.SimpleStringSchema + implements SerializationSchema, DeserializationSchema { private static final long serialVersionUID = 1L; - /** The charset to use to convert between strings and bytes. - * The field is transient because we serialize a different delegate object instead */ - private transient Charset charset; - - /** - * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding. - */ public SimpleStringSchema() { - this(StandardCharsets.UTF_8); + super(); } /** @@ -55,53 +47,6 @@ public SimpleStringSchema() { * @param charset The charset to use to convert between strings and bytes. */ public SimpleStringSchema(Charset charset) { - this.charset = checkNotNull(charset); - } - - /** - * Gets the charset used by this schema for serialization. - * @return The charset used by this schema for serialization. - */ - public Charset getCharset() { - return charset; - } - - // ------------------------------------------------------------------------ - // Kafka Serialization - // ------------------------------------------------------------------------ - - @Override - public String deserialize(byte[] message) { - return new String(message, charset); - } - - @Override - public boolean isEndOfStream(String nextElement) { - return false; - } - - @Override - public byte[] serialize(String element) { - return element.getBytes(charset); - } - - @Override - public TypeInformation getProducedType() { - return BasicTypeInfo.STRING_TYPE_INFO; - } - - // ------------------------------------------------------------------------ - // Java Serialization - // ------------------------------------------------------------------------ - - private void writeObject (ObjectOutputStream out) throws IOException { - out.defaultWriteObject(); - out.writeUTF(charset.name()); - } - - private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException { - in.defaultReadObject(); - String charsetName = in.readUTF(); - this.charset = Charset.forName(charsetName); + super(charset); } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java index 1c50dc2987f7c..b771fe0b96ca5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java @@ -21,41 +21,24 @@ import org.apache.flink.annotation.Public; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.core.memory.DataInputDeserializer; -import org.apache.flink.core.memory.DataOutputSerializer; - -import java.io.IOException; /** * A serialization and deserialization schema that uses Flink's serialization stack to * transform typed from and to byte arrays. * * @param The type to be serialized. + * + * @deprecated Use {@link org.apache.flink.api.common.serialization.TypeInformationSerializationSchema} instead. */ @Public -public class TypeInformationSerializationSchema implements DeserializationSchema, SerializationSchema { +@Deprecated +@SuppressWarnings("deprecation") +public class TypeInformationSerializationSchema + extends org.apache.flink.api.common.serialization.TypeInformationSerializationSchema + implements DeserializationSchema, SerializationSchema { private static final long serialVersionUID = -5359448468131559102L; - /** The serializer for the actual de-/serialization. */ - private final TypeSerializer serializer; - - /** The reusable output serialization buffer. */ - private transient DataOutputSerializer dos; - - /** The reusable input deserialization buffer. */ - private transient DataInputDeserializer dis; - - /** - * The type information, to be returned by {@link #getProducedType()}. It is transient, because - * it is not serializable. Note that this means that the type information is not available at - * runtime, but only prior to the first serialization / deserialization. - */ - private transient TypeInformation typeInfo; - - // ------------------------------------------------------------------------ - /** * Creates a new de-/serialization schema for the given type. * @@ -63,69 +46,6 @@ public class TypeInformationSerializationSchema implements DeserializationSch * @param ec The execution config, which is used to parametrize the type serializers. */ public TypeInformationSerializationSchema(TypeInformation typeInfo, ExecutionConfig ec) { - this.typeInfo = typeInfo; - this.serializer = typeInfo.createSerializer(ec); - } - - // ------------------------------------------------------------------------ - - @Override - public T deserialize(byte[] message) { - if (dis != null) { - dis.setBuffer(message, 0, message.length); - } else { - dis = new DataInputDeserializer(message, 0, message.length); - } - - try { - return serializer.deserialize(dis); - } - catch (IOException e) { - throw new RuntimeException("Unable to deserialize message", e); - } - } - - /** - * This schema never considers an element to signal end-of-stream, so this method returns always false. - * @param nextElement The element to test for the end-of-stream signal. - * @return Returns false. - */ - @Override - public boolean isEndOfStream(T nextElement) { - return false; - } - - @Override - public byte[] serialize(T element) { - if (dos == null) { - dos = new DataOutputSerializer(16); - } - - try { - serializer.serialize(element, dos); - } - catch (IOException e) { - throw new RuntimeException("Unable to serialize record", e); - } - - byte[] ret = dos.getByteArray(); - if (ret.length != dos.length()) { - byte[] n = new byte[dos.length()]; - System.arraycopy(ret, 0, n, 0, dos.length()); - ret = n; - } - dos.clear(); - return ret; - } - - @Override - public TypeInformation getProducedType() { - if (typeInfo != null) { - return typeInfo; - } - else { - throw new IllegalStateException( - "The type information is not available after this class has been serialized and distributed."); - } + super(typeInfo, ec); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java index 6cdce11d40f07..b3c4ee93ede60 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java @@ -18,9 +18,9 @@ package org.apache.flink.streaming.api.functions.sink; +import org.apache.flink.api.common.serialization.SerializationSchema; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.util.serialization.SerializationSchema; import org.apache.flink.util.TestLogger; import org.apache.commons.io.IOUtils; diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala index b5a7cd687d112..ef2e741ab53e4 100644 --- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala +++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala @@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner} import org.apache.flink.api.common.io.OutputFormat import org.apache.flink.api.common.operators.ResourceSpec +import org.apache.flink.api.common.serialization.SerializationSchema import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.functions.KeySelector import org.apache.flink.api.java.tuple.{Tuple => JavaTuple} @@ -38,7 +39,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator import org.apache.flink.streaming.api.windowing.assigners._ import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window} -import org.apache.flink.streaming.util.serialization.SerializationSchema import org.apache.flink.util.Collector import scala.collection.JavaConverters._ diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala index 3b47429f22727..991f241f7f5d5 100644 --- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala +++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala @@ -18,8 +18,8 @@ package org.apache.flink.streaming.api.scala +import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.core.fs.FileSystem -import org.apache.flink.streaming.util.serialization.SimpleStringSchema import scala.language.existentials