@@ -208,6 +218,11 @@ under the License.
org.apache.flink.configuration.ConfigConstants#ENABLE_QUARANTINE_MONITOR
org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_INITIAL_KEY
org.apache.flink.configuration.ConfigConstants#NETWORK_REQUEST_BACKOFF_MAX_KEY
+
+
+ org.apache.flink.api.common.serialization.DeserializationSchema
+ org.apache.flink.api.common.serialization.SerializationSchema
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
new file mode 100644
index 0000000000000..871b7b106c180
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchema.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+
+import java.io.IOException;
+
+/**
+ * The deserialization schema describes how to turn the byte messages delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
+ * processed by Flink.
+ *
+ * This base variant of the deserialization schema produces the type information
+ * automatically by extracting it from the generic class arguments.
+ *
+ * @param The type created by the deserialization schema.
+ */
+@PublicEvolving
+public abstract class AbstractDeserializationSchema implements DeserializationSchema {
+
+ private static final long serialVersionUID = 1L;
+
+ /**
+ * De-serializes the byte message.
+ *
+ * @param message The message, as a byte array.
+ * @return The de-serialized message as an object.
+ */
+ @Override
+ public abstract T deserialize(byte[] message) throws IOException;
+
+ /**
+ * Method to decide whether the element signals the end of the stream. If
+ * true is returned the element won't be emitted.
+ *
+ * This default implementation returns always false, meaning the stream is interpreted
+ * to be unbounded.
+ *
+ * @param nextElement The element to test for the end-of-stream signal.
+ * @return True, if the element signals end of stream, false otherwise.
+ */
+ @Override
+ public boolean isEndOfStream(T nextElement) {
+ return false;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 0, null, null);
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
new file mode 100644
index 0000000000000..9de474391e0aa
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/DeserializationSchema.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+
+import java.io.IOException;
+import java.io.Serializable;
+
+/**
+ * The deserialization schema describes how to turn the byte messages delivered by certain
+ * data sources (for example Apache Kafka) into data types (Java/Scala objects) that are
+ * processed by Flink.
+ *
+ * Note: In most cases, one should start from {@link AbstractDeserializationSchema}, which
+ * takes care of producing the return type information automatically.
+ *
+ * @param The type created by the deserialization schema.
+ */
+@Public
+public interface DeserializationSchema extends Serializable, ResultTypeQueryable {
+
+ /**
+ * Deserializes the byte message.
+ *
+ * @param message The message, as a byte array.
+ *
+ * @return The deserialized message as an object (null if the message cannot be deserialized).
+ */
+ T deserialize(byte[] message) throws IOException;
+
+ /**
+ * Method to decide whether the element signals the end of the stream. If
+ * true is returned the element won't be emitted.
+ *
+ * @param nextElement The element to test for the end-of-stream signal.
+ * @return True, if the element signals end of stream, false otherwise.
+ */
+ boolean isEndOfStream(T nextElement);
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java
new file mode 100644
index 0000000000000..3a4eaeb467de5
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SerializationSchema.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+
+import java.io.Serializable;
+
+/**
+ * The serialization schema describes how to turn a data object into a different serialized
+ * representation. Most data sinks (for example Apache Kafka) require the data to be handed
+ * to them in a specific format (for example as byte strings).
+ *
+ * @param The type to be serialized.
+ */
+@Public
+public interface SerializationSchema extends Serializable {
+
+ /**
+ * Serializes the incoming element to a specified type.
+ *
+ * @param element
+ * The incoming element to be serialized
+ * @return The serialized element.
+ */
+ byte[] serialize(T element);
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java
new file mode 100644
index 0000000000000..3130a10f2d896
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/SimpleStringSchema.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
+import java.io.IOException;
+import java.io.ObjectOutputStream;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Very simple serialization schema for strings.
+ *
+ * By default, the serializer uses "UTF-8" for string/byte conversion.
+ */
+@PublicEvolving
+public class SimpleStringSchema implements DeserializationSchema, SerializationSchema {
+
+ private static final long serialVersionUID = 1L;
+
+ /** The charset to use to convert between strings and bytes.
+ * The field is transient because we serialize a different delegate object instead */
+ private transient Charset charset;
+
+ /**
+ * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
+ */
+ public SimpleStringSchema() {
+ this(StandardCharsets.UTF_8);
+ }
+
+ /**
+ * Creates a new SimpleStringSchema that uses the given charset to convert between strings and bytes.
+ *
+ * @param charset The charset to use to convert between strings and bytes.
+ */
+ public SimpleStringSchema(Charset charset) {
+ this.charset = checkNotNull(charset);
+ }
+
+ /**
+ * Gets the charset used by this schema for serialization.
+ * @return The charset used by this schema for serialization.
+ */
+ public Charset getCharset() {
+ return charset;
+ }
+
+ // ------------------------------------------------------------------------
+ // Kafka Serialization
+ // ------------------------------------------------------------------------
+
+ @Override
+ public String deserialize(byte[] message) {
+ return new String(message, charset);
+ }
+
+ @Override
+ public boolean isEndOfStream(String nextElement) {
+ return false;
+ }
+
+ @Override
+ public byte[] serialize(String element) {
+ return element.getBytes(charset);
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ return BasicTypeInfo.STRING_TYPE_INFO;
+ }
+
+ // ------------------------------------------------------------------------
+ // Java Serialization
+ // ------------------------------------------------------------------------
+
+ private void writeObject (ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+ out.writeUTF(charset.name());
+ }
+
+ private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ String charsetName = in.readUTF();
+ this.charset = Charset.forName(charsetName);
+ }
+}
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
new file mode 100644
index 0000000000000..217a889b86879
--- /dev/null
+++ b/flink-core/src/main/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchema.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.api.common.serialization;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeutils.TypeSerializer;
+import org.apache.flink.core.memory.DataInputDeserializer;
+import org.apache.flink.core.memory.DataOutputSerializer;
+
+import java.io.IOException;
+
+/**
+ * A serialization and deserialization schema that uses Flink's serialization stack to
+ * transform typed from and to byte arrays.
+ *
+ * @param The type to be serialized.
+ */
+@Public
+public class TypeInformationSerializationSchema implements DeserializationSchema, SerializationSchema {
+
+ private static final long serialVersionUID = -5359448468131559102L;
+
+ /** The serializer for the actual de-/serialization. */
+ private final TypeSerializer serializer;
+
+ /** The reusable output serialization buffer. */
+ private transient DataOutputSerializer dos;
+
+ /** The reusable input deserialization buffer. */
+ private transient DataInputDeserializer dis;
+
+ /**
+ * The type information, to be returned by {@link #getProducedType()}. It is transient, because
+ * it is not serializable. Note that this means that the type information is not available at
+ * runtime, but only prior to the first serialization / deserialization.
+ */
+ private transient TypeInformation typeInfo;
+
+ // ------------------------------------------------------------------------
+
+ /**
+ * Creates a new de-/serialization schema for the given type.
+ *
+ * @param typeInfo The type information for the type de-/serialized by this schema.
+ * @param ec The execution config, which is used to parametrize the type serializers.
+ */
+ public TypeInformationSerializationSchema(TypeInformation typeInfo, ExecutionConfig ec) {
+ this.typeInfo = typeInfo;
+ this.serializer = typeInfo.createSerializer(ec);
+ }
+
+ // ------------------------------------------------------------------------
+
+ @Override
+ public T deserialize(byte[] message) {
+ if (dis != null) {
+ dis.setBuffer(message, 0, message.length);
+ } else {
+ dis = new DataInputDeserializer(message, 0, message.length);
+ }
+
+ try {
+ return serializer.deserialize(dis);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Unable to deserialize message", e);
+ }
+ }
+
+ /**
+ * This schema never considers an element to signal end-of-stream, so this method returns always false.
+ * @param nextElement The element to test for the end-of-stream signal.
+ * @return Returns false.
+ */
+ @Override
+ public boolean isEndOfStream(T nextElement) {
+ return false;
+ }
+
+ @Override
+ public byte[] serialize(T element) {
+ if (dos == null) {
+ dos = new DataOutputSerializer(16);
+ }
+
+ try {
+ serializer.serialize(element, dos);
+ }
+ catch (IOException e) {
+ throw new RuntimeException("Unable to serialize record", e);
+ }
+
+ byte[] ret = dos.getByteArray();
+ if (ret.length != dos.length()) {
+ byte[] n = new byte[dos.length()];
+ System.arraycopy(ret, 0, n, 0, dos.length());
+ ret = n;
+ }
+ dos.clear();
+ return ret;
+ }
+
+ @Override
+ public TypeInformation getProducedType() {
+ if (typeInfo != null) {
+ return typeInfo;
+ }
+ else {
+ throw new IllegalStateException(
+ "The type information is not available after this class has been serialized and distributed.");
+ }
+ }
+}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
similarity index 96%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
rename to flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
index 818a43bc18887..ec241b49af71e 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/AbstractDeserializationSchemaTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/AbstractDeserializationSchemaTest.java
@@ -16,13 +16,12 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.util;
+package org.apache.flink.api.common.serialization;
import org.apache.flink.api.common.functions.InvalidTypesException;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.util.serialization.AbstractDeserializationSchema;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.util.JSONPObject;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java
similarity index 97%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
rename to flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java
index 6081ed1727e1a..482ff13a1e039 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/serialization/SimpleStringSchemaTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/SimpleStringSchemaTest.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.util.serialization;
+package org.apache.flink.api.common.serialization;
import org.apache.flink.core.testutils.CommonTestUtils;
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java b/flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java
similarity index 96%
rename from flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
rename to flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java
index 317f2e3da5720..ef5f4b0a3e34f 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/TypeInformationSerializationSchemaTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/serialization/TypeInformationSerializationSchemaTest.java
@@ -16,13 +16,12 @@
* limitations under the License.
*/
-package org.apache.flink.streaming.util;
+package org.apache.flink.api.common.serialization;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.core.testutils.CommonTestUtils;
-import org.apache.flink.streaming.util.serialization.TypeInformationSerializationSchema;
import org.junit.Test;
diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
index b5abbc50a5774..3fbd2b462ccea 100644
--- a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
+++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/kafka/Kafka010Example.java
@@ -19,12 +19,12 @@
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer010;
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
/**
diff --git a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
index 2a52811940346..9f4fdc4c29489 100644
--- a/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
+++ b/flink-examples/flink-examples-streaming/src/main/scala/org/apache/flink/streaming/scala/examples/kafka/Kafka010Example.scala
@@ -19,10 +19,10 @@
package org.apache.flink.streaming.scala.examples.kafka
import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.api.java.utils.ParameterTool
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer010, FlinkKafkaProducer010}
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema
/**
* An example that shows how to read from and write to Kafka. This will read String messages
diff --git a/flink-streaming-java/pom.xml b/flink-streaming-java/pom.xml
index 5b15b5a9b5b24..2683546274e2d 100644
--- a/flink-streaming-java/pom.xml
+++ b/flink-streaming-java/pom.xml
@@ -75,12 +75,6 @@ under the License.
-
- org.apache.flink
- flink-shaded-jackson
- test
-
-
org.apache.flink
flink-test-utils-junit
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
index d0769c69783e6..2274968a343a5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/DataStream.java
@@ -31,6 +31,7 @@
import org.apache.flink.api.common.io.OutputFormat;
import org.apache.flink.api.common.operators.Keys;
import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.api.common.typeinfo.BasicArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.PrimitiveArrayTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
@@ -91,7 +92,6 @@
import org.apache.flink.streaming.runtime.partitioner.ShufflePartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.util.keys.KeySelectorUtil;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.Preconditions;
import java.util.ArrayList;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
index 214d5c2eca92c..80e0dbe6770b1 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/sink/SocketClientSink.java
@@ -18,8 +18,8 @@
package org.apache.flink.streaming.api.functions.sink;
import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.SerializableObject;
import org.slf4j.Logger;
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
index 02ea0041f0d73..7d30f9133b1a7 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/AbstractDeserializationSchema.java
@@ -18,10 +18,7 @@
package org.apache.flink.streaming.util.serialization;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-
-import java.io.IOException;
+import org.apache.flink.annotation.PublicEvolving;
/**
* The deserialization schema describes how to turn the byte messages delivered by certain
@@ -32,37 +29,15 @@
* automatically by extracting it from the generic class arguments.
*
* @param The type created by the deserialization schema.
+ *
+ * @deprecated Use {@link org.apache.flink.api.common.serialization.AbstractDeserializationSchema} instead.
*/
-public abstract class AbstractDeserializationSchema implements DeserializationSchema {
+@Deprecated
+@PublicEvolving
+@SuppressWarnings("deprecation")
+public abstract class AbstractDeserializationSchema
+ extends org.apache.flink.api.common.serialization.AbstractDeserializationSchema
+ implements DeserializationSchema {
private static final long serialVersionUID = 1L;
-
- /**
- * De-serializes the byte message.
- *
- * @param message The message, as a byte array.
- * @return The de-serialized message as an object.
- */
- @Override
- public abstract T deserialize(byte[] message) throws IOException;
-
- /**
- * Method to decide whether the element signals the end of the stream. If
- * true is returned the element won't be emitted.
- *
- * This default implementation returns always false, meaning the stream is interpreted
- * to be unbounded.
- *
- * @param nextElement The element to test for the end-of-stream signal.
- * @return True, if the element signals end of stream, false otherwise.
- */
- @Override
- public boolean isEndOfStream(T nextElement) {
- return false;
- }
-
- @Override
- public TypeInformation getProducedType() {
- return TypeExtractor.createTypeInfo(AbstractDeserializationSchema.class, getClass(), 0, null, null);
- }
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
index 15ecb2cbff88d..cbaa004da6945 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/DeserializationSchema.java
@@ -32,9 +32,15 @@
* takes care of producing the return type information automatically.
*
* @param The type created by the deserialization schema.
+ *
+ * @deprecated Use {@link org.apache.flink.api.common.serialization.DeserializationSchema} instead.
*/
@Public
-public interface DeserializationSchema extends Serializable, ResultTypeQueryable {
+@Deprecated
+public interface DeserializationSchema extends
+ org.apache.flink.api.common.serialization.DeserializationSchema,
+ Serializable,
+ ResultTypeQueryable {
/**
* Deserializes the byte message.
@@ -43,6 +49,7 @@ public interface DeserializationSchema extends Serializable, ResultTypeQuerya
*
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
+ @Override
T deserialize(byte[] message) throws IOException;
/**
@@ -52,5 +59,6 @@ public interface DeserializationSchema extends Serializable, ResultTypeQuerya
* @param nextElement The element to test for the end-of-stream signal.
* @return True, if the element signals end of stream, false otherwise.
*/
+ @Override
boolean isEndOfStream(T nextElement);
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
index 986cfb3039fda..c7c1de06fb90e 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SerializationSchema.java
@@ -27,9 +27,13 @@
* to them in a specific format (for example as byte strings).
*
* @param The type to be serialized.
+ *
+ * @deprecated Use {@link org.apache.flink.api.common.serialization.SerializationSchema} instead.
*/
@Public
-public interface SerializationSchema extends Serializable {
+@Deprecated
+public interface SerializationSchema
+ extends org.apache.flink.api.common.serialization.SerializationSchema, Serializable {
/**
* Serializes the incoming element to a specified type.
@@ -38,5 +42,6 @@ public interface SerializationSchema extends Serializable {
* The incoming element to be serialized
* @return The serialized element.
*/
+ @Override
byte[] serialize(T element);
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
index 27ba9e913331f..01ce30dd48c03 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/SimpleStringSchema.java
@@ -18,35 +18,27 @@
package org.apache.flink.streaming.util.serialization;
import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import java.io.IOException;
-import java.io.ObjectOutputStream;
import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-
-import static org.apache.flink.util.Preconditions.checkNotNull;
/**
* Very simple serialization schema for strings.
*
* By default, the serializer uses "UTF-8" for string/byte conversion.
+ *
+ * @deprecated Use {@link org.apache.flink.api.common.serialization.SimpleStringSchema} instead.
*/
@PublicEvolving
-public class SimpleStringSchema implements DeserializationSchema, SerializationSchema {
+@Deprecated
+@SuppressWarnings("deprecation")
+public class SimpleStringSchema
+ extends org.apache.flink.api.common.serialization.SimpleStringSchema
+ implements SerializationSchema, DeserializationSchema {
private static final long serialVersionUID = 1L;
- /** The charset to use to convert between strings and bytes.
- * The field is transient because we serialize a different delegate object instead */
- private transient Charset charset;
-
- /**
- * Creates a new SimpleStringSchema that uses "UTF-8" as the encoding.
- */
public SimpleStringSchema() {
- this(StandardCharsets.UTF_8);
+ super();
}
/**
@@ -55,53 +47,6 @@ public SimpleStringSchema() {
* @param charset The charset to use to convert between strings and bytes.
*/
public SimpleStringSchema(Charset charset) {
- this.charset = checkNotNull(charset);
- }
-
- /**
- * Gets the charset used by this schema for serialization.
- * @return The charset used by this schema for serialization.
- */
- public Charset getCharset() {
- return charset;
- }
-
- // ------------------------------------------------------------------------
- // Kafka Serialization
- // ------------------------------------------------------------------------
-
- @Override
- public String deserialize(byte[] message) {
- return new String(message, charset);
- }
-
- @Override
- public boolean isEndOfStream(String nextElement) {
- return false;
- }
-
- @Override
- public byte[] serialize(String element) {
- return element.getBytes(charset);
- }
-
- @Override
- public TypeInformation getProducedType() {
- return BasicTypeInfo.STRING_TYPE_INFO;
- }
-
- // ------------------------------------------------------------------------
- // Java Serialization
- // ------------------------------------------------------------------------
-
- private void writeObject (ObjectOutputStream out) throws IOException {
- out.defaultWriteObject();
- out.writeUTF(charset.name());
- }
-
- private void readObject(java.io.ObjectInputStream in) throws IOException, ClassNotFoundException {
- in.defaultReadObject();
- String charsetName = in.readUTF();
- this.charset = Charset.forName(charsetName);
+ super(charset);
}
}
diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
index 1c50dc2987f7c..b771fe0b96ca5 100644
--- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
+++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/util/serialization/TypeInformationSerializationSchema.java
@@ -21,41 +21,24 @@
import org.apache.flink.annotation.Public;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.common.typeutils.TypeSerializer;
-import org.apache.flink.core.memory.DataInputDeserializer;
-import org.apache.flink.core.memory.DataOutputSerializer;
-
-import java.io.IOException;
/**
* A serialization and deserialization schema that uses Flink's serialization stack to
* transform typed from and to byte arrays.
*
* @param The type to be serialized.
+ *
+ * @deprecated Use {@link org.apache.flink.api.common.serialization.TypeInformationSerializationSchema} instead.
*/
@Public
-public class TypeInformationSerializationSchema implements DeserializationSchema, SerializationSchema {
+@Deprecated
+@SuppressWarnings("deprecation")
+public class TypeInformationSerializationSchema
+ extends org.apache.flink.api.common.serialization.TypeInformationSerializationSchema
+ implements DeserializationSchema, SerializationSchema {
private static final long serialVersionUID = -5359448468131559102L;
- /** The serializer for the actual de-/serialization. */
- private final TypeSerializer serializer;
-
- /** The reusable output serialization buffer. */
- private transient DataOutputSerializer dos;
-
- /** The reusable input deserialization buffer. */
- private transient DataInputDeserializer dis;
-
- /**
- * The type information, to be returned by {@link #getProducedType()}. It is transient, because
- * it is not serializable. Note that this means that the type information is not available at
- * runtime, but only prior to the first serialization / deserialization.
- */
- private transient TypeInformation typeInfo;
-
- // ------------------------------------------------------------------------
-
/**
* Creates a new de-/serialization schema for the given type.
*
@@ -63,69 +46,6 @@ public class TypeInformationSerializationSchema implements DeserializationSch
* @param ec The execution config, which is used to parametrize the type serializers.
*/
public TypeInformationSerializationSchema(TypeInformation typeInfo, ExecutionConfig ec) {
- this.typeInfo = typeInfo;
- this.serializer = typeInfo.createSerializer(ec);
- }
-
- // ------------------------------------------------------------------------
-
- @Override
- public T deserialize(byte[] message) {
- if (dis != null) {
- dis.setBuffer(message, 0, message.length);
- } else {
- dis = new DataInputDeserializer(message, 0, message.length);
- }
-
- try {
- return serializer.deserialize(dis);
- }
- catch (IOException e) {
- throw new RuntimeException("Unable to deserialize message", e);
- }
- }
-
- /**
- * This schema never considers an element to signal end-of-stream, so this method returns always false.
- * @param nextElement The element to test for the end-of-stream signal.
- * @return Returns false.
- */
- @Override
- public boolean isEndOfStream(T nextElement) {
- return false;
- }
-
- @Override
- public byte[] serialize(T element) {
- if (dos == null) {
- dos = new DataOutputSerializer(16);
- }
-
- try {
- serializer.serialize(element, dos);
- }
- catch (IOException e) {
- throw new RuntimeException("Unable to serialize record", e);
- }
-
- byte[] ret = dos.getByteArray();
- if (ret.length != dos.length()) {
- byte[] n = new byte[dos.length()];
- System.arraycopy(ret, 0, n, 0, dos.length());
- ret = n;
- }
- dos.clear();
- return ret;
- }
-
- @Override
- public TypeInformation getProducedType() {
- if (typeInfo != null) {
- return typeInfo;
- }
- else {
- throw new IllegalStateException(
- "The type information is not available after this class has been serialized and distributed.");
- }
+ super(typeInfo, ec);
}
}
diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
index 6cdce11d40f07..b3c4ee93ede60 100644
--- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
+++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/functions/sink/SocketClientSinkTest.java
@@ -18,9 +18,9 @@
package org.apache.flink.streaming.api.functions.sink;
+import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
-import org.apache.flink.streaming.util.serialization.SerializationSchema;
import org.apache.flink.util.TestLogger;
import org.apache.commons.io.IOUtils;
diff --git a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
index b5a7cd687d112..ef2e741ab53e4 100644
--- a/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
+++ b/flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/DataStream.scala
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.functions.{FilterFunction, FlatMapFunction, MapFunction, Partitioner}
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.common.operators.ResourceSpec
+import org.apache.flink.api.common.serialization.SerializationSchema
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.api.java.functions.KeySelector
import org.apache.flink.api.java.tuple.{Tuple => JavaTuple}
@@ -38,7 +39,6 @@ import org.apache.flink.streaming.api.operators.OneInputStreamOperator
import org.apache.flink.streaming.api.windowing.assigners._
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.{GlobalWindow, TimeWindow, Window}
-import org.apache.flink.streaming.util.serialization.SerializationSchema
import org.apache.flink.util.Collector
import scala.collection.JavaConverters._
diff --git a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
index 3b47429f22727..991f241f7f5d5 100644
--- a/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
+++ b/flink-streaming-scala/src/test/scala/org/apache/flink/streaming/api/scala/OutputFormatTestPrograms.scala
@@ -18,8 +18,8 @@
package org.apache.flink.streaming.api.scala
+import org.apache.flink.api.common.serialization.SimpleStringSchema
import org.apache.flink.core.fs.FileSystem
-import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import scala.language.existentials