From 7bc78cbf97d341ebfed32fdfe20f21e4d146a869 Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Fri, 30 Jan 2015 16:43:31 +0100 Subject: [PATCH] [FLINK-1463] Fix stateful/stateless Serializers and Comparators Before, Serializers would announce whether they are stateful or not and rely on RuntimeStatefulSerializerFactory to do the duplication. Comparators, on the other hand, had a duplicate method that the user was required to call. This commit removes the statful/stateless property from Serializers but instead introduces a duplicate() method, similar to Comparators, that can return the same instance. The two serializer factories are merged into one that always calls duplicate() before returning a serializer. --- .../compiler/postpass/JavaApiPostPass.java | 11 +- .../api/common/typeutils/TypeSerializer.java | 16 +- .../typeutils/base/BooleanSerializer.java | 5 - .../base/BooleanValueSerializer.java | 5 - .../common/typeutils/base/ByteSerializer.java | 5 - .../typeutils/base/ByteValueSerializer.java | 5 - .../common/typeutils/base/CharSerializer.java | 5 - .../typeutils/base/CharValueSerializer.java | 5 - .../common/typeutils/base/DateSerializer.java | 5 - .../typeutils/base/DoubleSerializer.java | 5 - .../typeutils/base/DoubleValueSerializer.java | 5 - .../common/typeutils/base/EnumSerializer.java | 4 +- .../typeutils/base/FloatSerializer.java | 5 - .../typeutils/base/FloatValueSerializer.java | 5 - .../base/GenericArraySerializer.java | 12 +- .../common/typeutils/base/IntSerializer.java | 5 - .../typeutils/base/IntValueSerializer.java | 5 - .../common/typeutils/base/LongSerializer.java | 5 - .../typeutils/base/LongValueSerializer.java | 5 - .../typeutils/base/ShortSerializer.java | 5 - .../typeutils/base/ShortValueSerializer.java | 5 - .../typeutils/base/StringSerializer.java | 5 - .../typeutils/base/StringValueSerializer.java | 5 - .../base/TypeSerializerSingleton.java | 7 +- .../common/typeutils/base/VoidSerializer.java | 5 - .../BooleanPrimitiveArraySerializer.java | 5 - .../array/BytePrimitiveArraySerializer.java | 5 - .../array/CharPrimitiveArraySerializer.java | 5 - .../array/DoublePrimitiveArraySerializer.java | 5 - .../array/FloatPrimitiveArraySerializer.java | 5 - .../array/IntPrimitiveArraySerializer.java | 5 - .../array/LongPrimitiveArraySerializer.java | 5 - .../array/ShortPrimitiveArraySerializer.java | 5 - .../base/array/StringArraySerializer.java | 5 - .../typeutils/record/RecordSerializer.java | 5 +- .../typeutils/runtime/AvroSerializer.java | 4 +- .../runtime/CopyableValueSerializer.java | 4 +- .../runtime/GenericTypeComparator.java | 14 +- .../typeutils/runtime/KryoSerializer.java | 20 ++- .../typeutils/runtime/PojoSerializer.java | 33 ++-- ...ory.java => RuntimeSerializerFactory.java} | 28 ++-- .../RuntimeStatefulSerializerFactory.java | 143 ------------------ .../runtime/TupleComparatorBase.java | 28 +--- .../typeutils/runtime/TupleSerializer.java | 20 +++ .../runtime/TupleSerializerBase.java | 19 +-- .../typeutils/runtime/ValueSerializer.java | 4 +- .../typeutils/runtime/WritableSerializer.java | 4 +- .../java/io/CollectionInputFormatTest.java | 4 +- .../operators/sort/LargeRecordHandler.java | 30 +--- .../operators/drivers/TestTaskContext.java | 6 +- .../sort/ExternalSortLargeRecordsITCase.java | 10 +- .../sort/MassiveStringSortingITCase.java | 7 +- .../sort/MassiveStringValueSortingITCase.java | 7 +- .../testutils/types/IntListSerializer.java | 4 +- .../testutils/types/IntPairSerializer.java | 8 +- .../testutils/types/StringPairSerializer.java | 4 +- .../scala/typeutils/CaseClassSerializer.scala | 16 +- .../scala/typeutils/EitherSerializer.scala | 2 +- .../scala/typeutils/NothingSerializer.scala | 2 +- .../scala/typeutils/OptionSerializer.scala | 2 +- .../typeutils/TraversableSerializer.scala | 18 ++- .../api/scala/typeutils/TrySerializer.scala | 2 +- .../function/source/FileSourceFunction.java | 10 +- .../streamrecord/StreamRecordSerializer.java | 4 +- .../VertexWithAdjacencyListSerializer.java | 5 - .../VertexWithRankAndDanglingSerializer.java | 5 - .../types/VertexWithRankSerializer.java | 5 - .../misc/MassiveCaseClassSortingITCase.scala | 4 +- 68 files changed, 182 insertions(+), 494 deletions(-) rename flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/{RuntimeStatelessSerializerFactory.java => RuntimeSerializerFactory.java} (82%) delete mode 100644 flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java diff --git a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java index ae5b6a3aff251..208ff2e41e9f4 100644 --- a/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java +++ b/flink-compiler/src/main/java/org/apache/flink/compiler/postpass/JavaApiPostPass.java @@ -43,8 +43,7 @@ import org.apache.flink.api.java.typeutils.PojoTypeInfo; import org.apache.flink.api.java.typeutils.runtime.RuntimeComparatorFactory; import org.apache.flink.api.java.typeutils.runtime.RuntimePairComparatorFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.compiler.CompilerException; import org.apache.flink.compiler.CompilerPostPassException; import org.apache.flink.compiler.plan.BulkIterationPlanNode; @@ -278,12 +277,8 @@ private static TypeInformation getTypeInfoFromSource(SourcePlanNode node) private static TypeSerializerFactory createSerializer(TypeInformation typeInfo) { TypeSerializer serializer = typeInfo.createSerializer(); - - if (serializer.isStateful()) { - return new RuntimeStatefulSerializerFactory(serializer, typeInfo.getTypeClass()); - } else { - return new RuntimeStatelessSerializerFactory(serializer, typeInfo.getTypeClass()); - } + + return new RuntimeSerializerFactory(serializer, typeInfo.getTypeClass()); } @SuppressWarnings("unchecked") diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java index 5e32c86cf403e..329e8267455e8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializer.java @@ -50,17 +50,15 @@ public abstract class TypeSerializer implements Serializable { */ public abstract boolean isImmutableType(); - /** - * Gets whether the serializer is stateful. Statefulness means in this context that some of the serializer's - * methods have objects with state and are thus not inherently thread-safe. A stateful serializer might be used by - * multiple threads concurrently. For a stateful one, different instances will be used by different threads. - * - * @return True, if the serializer is stateful, false if it is stateless; + * Creates a deep copy of this serializer if it is necessary, i.e. if it is stateful. This + * can return itself if the serializer is not stateful. + * + * We need this because Serializers might be used in several threads. Stateless serializers + * are inherently thread-safe while stateful serializers might not be thread-safe. */ - public abstract boolean isStateful(); - - + public abstract TypeSerializer duplicate(); + // -------------------------------------------------------------------------------------------- // Instantiation & Cloning // -------------------------------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java index ecfb3c2ef1a3a..a844ac87d341f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanSerializer.java @@ -36,11 +36,6 @@ public boolean isImmutableType() { return true; } - @Override - public boolean isStateful() { - return false; - } - @Override public Boolean createInstance() { return FALSE; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java index 47950551c08b5..3aae95d62fd6e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BooleanValueSerializer.java @@ -37,11 +37,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public BooleanValue createInstance() { return new BooleanValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java index 32f3edd805914..92b36858654d2 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteSerializer.java @@ -38,11 +38,6 @@ public boolean isImmutableType() { return true; } - @Override - public boolean isStateful() { - return false; - } - @Override public Byte createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java index 24cc98b497695..e523d5eb4e094 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ByteValueSerializer.java @@ -37,11 +37,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public ByteValue createInstance() { return new ByteValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java index c46d3a02318a5..181db56af99f1 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharSerializer.java @@ -38,11 +38,6 @@ public boolean isImmutableType() { return true; } - @Override - public boolean isStateful() { - return false; - } - @Override public Character createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java index 71a8ef4aafb9a..690509c0fcb1a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/CharValueSerializer.java @@ -36,11 +36,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public CharValue createInstance() { return new CharValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java index 4bd2ea8e7799c..6aa11eb8312d6 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DateSerializer.java @@ -36,11 +36,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public Date createInstance() { return new Date(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java index 8e09f7cf3ead5..24af95cd02054 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleSerializer.java @@ -37,11 +37,6 @@ public boolean isImmutableType() { return true; } - @Override - public boolean isStateful() { - return false; - } - @Override public Double createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java index f4c7f3782dedc..34434f193d73f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/DoubleValueSerializer.java @@ -37,11 +37,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public DoubleValue createInstance() { return new DoubleValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java index 7ecf82a30693c..643e4fa138722 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/EnumSerializer.java @@ -46,8 +46,8 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return false; + public EnumSerializer duplicate() { + return this; } @Override diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java index b1a46b0a3fafc..c82378384e547 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatSerializer.java @@ -37,11 +37,6 @@ public boolean isImmutableType() { return true; } - @Override - public boolean isStateful() { - return false; - } - @Override public Float createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java index 6ebb268735a72..15d00b5446c7c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/FloatValueSerializer.java @@ -37,11 +37,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public FloatValue createInstance() { return new FloatValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java index b8612a2fb54f9..668766145bfc0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/GenericArraySerializer.java @@ -50,15 +50,21 @@ public GenericArraySerializer(Class componentClass, TypeSerializer compone this.componentSerializer = componentSerializer; this.EMPTY = create(0); } - + @Override public boolean isImmutableType() { return false; } @Override - public boolean isStateful() { - return this.componentSerializer.isStateful(); + public GenericArraySerializer duplicate() { + TypeSerializer duplicateComponentSerializer = this.componentSerializer.duplicate(); + if (duplicateComponentSerializer == this.componentSerializer) { + // is not stateful, return ourselves + return this; + } else { + return new GenericArraySerializer(componentClass, duplicateComponentSerializer); + } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java index 2937b2ad8d915..778f044fb27bd 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntSerializer.java @@ -38,11 +38,6 @@ public boolean isImmutableType() { return true; } - @Override - public boolean isStateful() { - return false; - } - @Override public Integer createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java index ec1f345361194..c2d1b60f2889c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/IntValueSerializer.java @@ -37,11 +37,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public IntValue createInstance() { return new IntValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java index 6b25596ced541..6d8b758515064 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongSerializer.java @@ -38,11 +38,6 @@ public boolean isImmutableType() { return true; } - @Override - public boolean isStateful() { - return false; - } - @Override public Long createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java index 95caf04f76b3c..37dec406ca08e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/LongValueSerializer.java @@ -37,11 +37,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public LongValue createInstance() { return new LongValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java index c6e7870cb81c4..44e5e3e46d55d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortSerializer.java @@ -38,11 +38,6 @@ public boolean isImmutableType() { return true; } - @Override - public boolean isStateful() { - return false; - } - @Override public Short createInstance() { return ZERO; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java index ab58987021427..1dbe4a506c099 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/ShortValueSerializer.java @@ -37,11 +37,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public ShortValue createInstance() { return new ShortValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java index 71221a20f8cf8..7b26600d2b538 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringSerializer.java @@ -38,11 +38,6 @@ public boolean isImmutableType() { return true; } - @Override - public boolean isStateful() { - return false; - } - @Override public String createInstance() { return EMPTY; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java index c5d5b55a0b11b..7628cabfb4a96 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/StringValueSerializer.java @@ -39,11 +39,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public StringValue createInstance() { return new StringValue(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java index 979d5ab81edd9..e076e5b1f93c7 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/TypeSerializerSingleton.java @@ -25,7 +25,12 @@ public abstract class TypeSerializerSingleton extends TypeSerializer{ private static final long serialVersionUID = 8766687317209282373L; // -------------------------------------------------------------------------------------------- - + + @Override + public TypeSerializerSingleton duplicate() { + return this; + } + @Override public int hashCode() { return super.hashCode(); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java index 33bb901284448..272ffbdeaafe8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/VoidSerializer.java @@ -34,11 +34,6 @@ public boolean isImmutableType() { return true; } - @Override - public boolean isStateful() { - return false; - } - @Override public Void createInstance() { return null; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java index e9941a88c753c..4a493ac69d159 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BooleanPrimitiveArraySerializer.java @@ -40,11 +40,6 @@ public final class BooleanPrimitiveArraySerializer extends TypeSerializerSinglet public boolean isImmutableType() { return false; } - - @Override - public boolean isStateful() { - return false; - } @Override public boolean[] createInstance() { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java index aaf867f2d08c7..fb4d506438c24 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/BytePrimitiveArraySerializer.java @@ -40,11 +40,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public byte[] createInstance() { return EMPTY; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java index 64632bd3d7673..8e3c4eae69fb3 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/CharPrimitiveArraySerializer.java @@ -41,11 +41,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public char[] createInstance() { return EMPTY; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java index 846ae748b24e5..10e25c2c74fdf 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/DoublePrimitiveArraySerializer.java @@ -41,11 +41,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public double[] createInstance() { return EMPTY; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java index 8f42ac8b6f92a..d57af0021a84a 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/FloatPrimitiveArraySerializer.java @@ -41,11 +41,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public float[] createInstance() { return EMPTY; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java index 2ab056cb321c9..eaff28750a218 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/IntPrimitiveArraySerializer.java @@ -41,11 +41,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public int[] createInstance() { return EMPTY; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java index 5d34dfe37d483..55a22c209b299 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/LongPrimitiveArraySerializer.java @@ -41,11 +41,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public long[] createInstance() { return EMPTY; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java index 2f3703338b0f3..08275b0b2697f 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/ShortPrimitiveArraySerializer.java @@ -40,11 +40,6 @@ public final class ShortPrimitiveArraySerializer extends TypeSerializerSingleton public boolean isImmutableType() { return false; } - - @Override - public boolean isStateful() { - return false; - } @Override public short[] createInstance() { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java index d5ab030e12c67..ad172a80db19c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/array/StringArraySerializer.java @@ -43,11 +43,6 @@ public boolean isImmutableType() { return true; } - @Override - public boolean isStateful() { - return false; - } - @Override public String[] createInstance() { return EMPTY; diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java index 7b72e894b9d9f..11b21d62f87ae 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/record/RecordSerializer.java @@ -57,8 +57,9 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return false; + public RecordSerializer duplicate() { + // does not hold state, so just return ourselves + return this; } @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java index cc72fa3e647fb..2758bd694b7b1 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/AvroSerializer.java @@ -79,8 +79,8 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return true; + public AvroSerializer duplicate() { + return new AvroSerializer(type, typeToInstantiate); } @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java index 8710f2de53e5c..193d495896e6a 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/CopyableValueSerializer.java @@ -47,8 +47,8 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return false; + public CopyableValueSerializer duplicate() { + return this; } @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java index 7caa770a325ef..039cef728456b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/GenericTypeComparator.java @@ -24,7 +24,6 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.core.memory.MemorySegment; @@ -42,9 +41,7 @@ public class GenericTypeComparator> extends TypeComparat private final Class type; - private final TypeSerializerFactory serializerFactory; - - private transient TypeSerializer serializer; + private TypeSerializer serializer; private transient T reference; @@ -61,15 +58,11 @@ public GenericTypeComparator(boolean ascending, TypeSerializer serializer, Cl this.ascending = ascending; this.serializer = serializer; this.type = type; - - this.serializerFactory = this.serializer.isStateful() - ? new RuntimeStatefulSerializerFactory(this.serializer, this.type) - : new RuntimeStatelessSerializerFactory(this.serializer, this.type); } private GenericTypeComparator(GenericTypeComparator toClone) { this.ascending = toClone.ascending; - this.serializerFactory = toClone.serializerFactory; + this.serializer = toClone.serializer.duplicate(); this.type = toClone.type; } @@ -104,9 +97,6 @@ public int compare(T first, T second) { @Override public int compareSerialized(final DataInputView firstSource, final DataInputView secondSource) throws IOException { - if (this.serializer == null) { - this.serializer = this.serializerFactory.getSerializer(); - } if (this.reference == null) { this.reference = this.serializer.createInstance(); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java index d9ecda78f1594..c55cd71b0b123 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/KryoSerializer.java @@ -69,7 +69,7 @@ public class KryoSerializer extends TypeSerializer { private final Class type; // ------------------------------------------------------------------------ - // The fields below are lazily initialized after de-serialization + // The fields below are lazily initialized after duplication or deserialization. private transient Kryo kryo; private transient T copyInstance; @@ -107,6 +107,20 @@ public KryoSerializer(Class type){ } + /** + * Copy-constructor that does not copy transient fields. They will be initialized once required. + */ + protected KryoSerializer(KryoSerializer toCopy) { + registeredSerializers = toCopy.registeredSerializers; + registeredSerializersClasses = toCopy.registeredSerializersClasses; + registeredTypes = toCopy.registeredTypes; + + type = toCopy.type; + if(type == null){ + throw new NullPointerException("Type class cannot be null."); + } + } + // ------------------------------------------------------------------------ @Override @@ -115,8 +129,8 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return true; + public KryoSerializer duplicate() { + return new KryoSerializer(this); } @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java index 1e58b9ddcf214..15e8537c480e6 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializer.java @@ -42,9 +42,6 @@ public final class PojoSerializer extends TypeSerializer { private final int numFields; - private final boolean stateful; - - @SuppressWarnings("unchecked") public PojoSerializer(Class clazz, TypeSerializer[] fieldSerializers, Field[] fields) { this.clazz = clazz; @@ -55,15 +52,6 @@ public PojoSerializer(Class clazz, TypeSerializer[] fieldSerializers, Fiel for (int i = 0; i < numFields; i++) { this.fields[i].setAccessible(true); } - - boolean stateful = false; - for (TypeSerializer ser : fieldSerializers) { - if (ser.isStateful()) { - stateful = true; - break; - } - } - this.stateful = stateful; } private void writeObject(ObjectOutputStream out) @@ -109,10 +97,25 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return this.stateful; + public PojoSerializer duplicate() { + boolean stateful = false; + TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length]; + + for (int i = 0; i < fieldSerializers.length; i++) { + duplicateFieldSerializers[i] = fieldSerializers[i].duplicate(); + if (duplicateFieldSerializers[i] != fieldSerializers[i]) { + // at least one of them is stateful + stateful = true; + } + } + + if (stateful) { + return new PojoSerializer(clazz, duplicateFieldSerializers, fields); + } else { + return this; + } } - + @Override public T createInstance() { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java similarity index 82% rename from flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java rename to flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java index 041b824f7c427..96aff737efb91 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatelessSerializerFactory.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeSerializerFactory.java @@ -24,7 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.util.InstantiationUtil; -public final class RuntimeStatelessSerializerFactory implements TypeSerializerFactory, java.io.Serializable { +public final class RuntimeSerializerFactory implements TypeSerializerFactory, java.io.Serializable { private static final long serialVersionUID = 1L; @@ -36,20 +36,18 @@ public final class RuntimeStatelessSerializerFactory implements TypeSerialize private TypeSerializer serializer; - private Class clazz; + private boolean firstSerializer = true; + private Class clazz; - public RuntimeStatelessSerializerFactory() {} + // Because we read the class from the TaskConfig and instantiate ourselves + public RuntimeSerializerFactory() {} - public RuntimeStatelessSerializerFactory(TypeSerializer serializer, Class clazz) { + public RuntimeSerializerFactory(TypeSerializer serializer, Class clazz) { if (serializer == null || clazz == null) { throw new NullPointerException(); } - - if (serializer.isStateful()) { - throw new IllegalArgumentException("Cannot use the stateless serializer factory with a stateful serializer."); - } - + this.clazz = clazz; this.serializer = serializer; } @@ -76,6 +74,7 @@ public void readParametersFromConfig(Configuration config, ClassLoader cl) throw try { this.clazz = (Class) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl); this.serializer = (TypeSerializer) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_SER, cl); + firstSerializer = true; } catch (ClassNotFoundException e) { throw e; @@ -88,7 +87,12 @@ public void readParametersFromConfig(Configuration config, ClassLoader cl) throw @Override public TypeSerializer getSerializer() { if (this.serializer != null) { - return this.serializer; + if (firstSerializer) { + firstSerializer = false; + return this.serializer; + } else { + return this.serializer.duplicate(); + } } else { throw new RuntimeException("SerializerFactory has not been initialized from configuration."); } @@ -108,8 +112,8 @@ public int hashCode() { @Override public boolean equals(Object obj) { - if (obj != null && obj instanceof RuntimeStatelessSerializerFactory) { - RuntimeStatelessSerializerFactory other = (RuntimeStatelessSerializerFactory) obj; + if (obj != null && obj instanceof RuntimeSerializerFactory) { + RuntimeSerializerFactory other = (RuntimeSerializerFactory) obj; return this.clazz == other.clazz && this.serializer.equals(other.serializer); diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java deleted file mode 100644 index dcc31bf5c4fbf..0000000000000 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/RuntimeStatefulSerializerFactory.java +++ /dev/null @@ -1,143 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - - -package org.apache.flink.api.java.typeutils.runtime; - -import java.io.IOException; - -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.util.InstantiationUtil; - -public final class RuntimeStatefulSerializerFactory implements TypeSerializerFactory, java.io.Serializable { - - private static final long serialVersionUID = 1L; - - private static final String CONFIG_KEY_SER = "SER_DATA"; - - private static final String CONFIG_KEY_CLASS = "CLASS_DATA"; - - private byte[] serializerData; - - private TypeSerializer serializer; // only for equality comparisons - - private transient ClassLoader loader; - - private Class clazz; - - public RuntimeStatefulSerializerFactory() {} - - public RuntimeStatefulSerializerFactory(TypeSerializer serializer, Class clazz) { - this.clazz = clazz; - this.loader = serializer.getClass().getClassLoader(); - - try { - this.serializerData = InstantiationUtil.serializeObject(serializer); - } catch (IOException e) { - throw new RuntimeException("Cannt serialize the Serializer.", e); - } - } - - public void setClassLoader(ClassLoader loader) { - this.loader = loader; - } - - @Override - public void writeParametersToConfig(Configuration config) { - try { - InstantiationUtil.writeObjectToConfig(clazz, config, CONFIG_KEY_CLASS); - config.setBytes(CONFIG_KEY_SER, this.serializerData); - } - catch (Exception e) { - throw new RuntimeException("Could not serialize serializer into the configuration.", e); - } - } - - @SuppressWarnings("unchecked") - @Override - public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { - if (config == null || cl == null) { - throw new NullPointerException(); - } - - this.serializerData = config.getBytes(CONFIG_KEY_SER, null); - if (this.serializerData == null) { - throw new RuntimeException("Could not find deserializer in the configuration."); - } - - this.loader = cl; - - try { - this.clazz = (Class) InstantiationUtil.readObjectFromConfig(config, CONFIG_KEY_CLASS, cl); - } - catch (ClassNotFoundException e) { - throw e; - } - catch (Exception e) { - throw new RuntimeException("Could not load deserializer from the configuration.", e); - } - } - - @SuppressWarnings("unchecked") - @Override - public TypeSerializer getSerializer() { - if (serializerData != null) { - try { - return (TypeSerializer) InstantiationUtil.deserializeObject(this.serializerData, this.loader); - } catch (Exception e) { - throw new RuntimeException("Repeated instantiation of serializer failed.", e); - } - } else { - throw new RuntimeException("SerializerFactory has not been initialized from configuration."); - } - } - - @Override - public Class getDataType() { - return this.clazz; - } - - // -------------------------------------------------------------------------------------------- - - @Override - public int hashCode() { - return clazz.hashCode(); - } - - @Override - public boolean equals(Object obj) { - if (obj != null && obj instanceof RuntimeStatefulSerializerFactory) { - @SuppressWarnings("unchecked") - RuntimeStatefulSerializerFactory other = (RuntimeStatefulSerializerFactory) obj; - - if (this.serializer == null) { - this.serializer = getSerializer(); - } - if (other.serializer == null) { - other.serializer = other.getSerializer(); - } - - return this.clazz == other.clazz && - this.serializer.equals(other.serializer); - } else { - return false; - } - } -} diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java index abcf89c2a0b57..28169e515d31c 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleComparatorBase.java @@ -23,7 +23,6 @@ import org.apache.flink.api.common.typeutils.CompositeTypeComparator; import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerFactory; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.types.KeyFieldOutOfBoundsException; @@ -41,10 +40,6 @@ public abstract class TupleComparatorBase extends CompositeTypeComparator @SuppressWarnings("rawtypes") protected TypeComparator[] comparators; - /** serializer factories to duplicate non thread-safe serializers */ - protected TypeSerializerFactory[] serializerFactories; - - protected int[] normalizedKeyLengths; protected int numLeadingNormalizableKeys; @@ -56,7 +51,7 @@ public abstract class TupleComparatorBase extends CompositeTypeComparator /** serializers to deserialize the first n fields for comparison */ @SuppressWarnings("rawtypes") - protected transient TypeSerializer[] serializers; + protected TypeSerializer[] serializers; // cache for the deserialized field objects protected transient Object[] deserializedFields1; @@ -70,14 +65,6 @@ public TupleComparatorBase(int[] keyPositions, TypeComparator[] comparators, this.comparators = (TypeComparator[]) comparators; this.serializers = (TypeSerializer[]) serializers; - // set the serializer factories. - this.serializerFactories = new TypeSerializerFactory[this.serializers.length]; - for (int i = 0; i < serializers.length; i++) { - this.serializerFactories[i] = this.serializers[i].isStateful() ? - new RuntimeStatefulSerializerFactory(this.serializers[i], Object.class) : - new RuntimeStatelessSerializerFactory(this.serializers[i], Object.class); - } - // set up auxiliary fields for normalized key support this.normalizedKeyLengths = new int[keyPositions.length]; int nKeys = 0; @@ -129,7 +116,11 @@ protected TupleComparatorBase(TupleComparatorBase toClone) { protected void privateDuplicate(TupleComparatorBase toClone) { // copy fields and serializer factories this.keyPositions = toClone.keyPositions; - this.serializerFactories = toClone.serializerFactories; + + this.serializers = new TypeSerializer[toClone.serializers.length]; + for (int i = 0; i < toClone.serializers.length; i++) { + this.serializers[i] = toClone.serializers[i].duplicate(); + } this.comparators = new TypeComparator[toClone.comparators.length]; for (int i = 0; i < toClone.comparators.length; i++) { @@ -261,13 +252,6 @@ public T readWithKeyDenormalization(T reuse, DataInputView source) throws IOExce // -------------------------------------------------------------------------------------------- protected final void instantiateDeserializationUtils() { - if (this.serializers == null) { - this.serializers = new TypeSerializer[this.serializerFactories.length]; - for (int i = 0; i < this.serializers.length; i++) { - this.serializers[i] = this.serializerFactories[i].getSerializer(); - } - } - this.deserializedFields1 = new Object[this.serializers.length]; this.deserializedFields2 = new Object[this.serializers.length]; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java index 9564c0101b4f4..6e5925dfdba6b 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializer.java @@ -35,6 +35,26 @@ public TupleSerializer(Class tupleClass, TypeSerializer[] fieldSerializers super(tupleClass, fieldSerializers); } + @Override + public TupleSerializer duplicate() { + boolean stateful = false; + TypeSerializer[] duplicateFieldSerializers = new TypeSerializer[fieldSerializers.length]; + + for (int i = 0; i < fieldSerializers.length; i++) { + duplicateFieldSerializers[i] = fieldSerializers[i].duplicate(); + if (duplicateFieldSerializers[i] != fieldSerializers[i]) { + // at least one of them is stateful + stateful = true; + } + } + + if (stateful) { + return new TupleSerializer(tupleClass, duplicateFieldSerializers); + } else { + return this; + } + } + @Override public T createInstance() { try { diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java index a30eda333c17a..f041736dc2d08 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/TupleSerializerBase.java @@ -32,27 +32,15 @@ public abstract class TupleSerializerBase extends TypeSerializer { protected final Class tupleClass; - protected final TypeSerializer[] fieldSerializers; + protected TypeSerializer[] fieldSerializers; protected final int arity; - protected final boolean stateful; - - @SuppressWarnings("unchecked") public TupleSerializerBase(Class tupleClass, TypeSerializer[] fieldSerializers) { this.tupleClass = tupleClass; this.fieldSerializers = (TypeSerializer[]) fieldSerializers; this.arity = fieldSerializers.length; - - boolean stateful = false; - for (TypeSerializer ser : fieldSerializers) { - if (ser.isStateful()) { - stateful = true; - break; - } - } - this.stateful = stateful; } public Class getTupleClass() { @@ -64,11 +52,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return this.stateful; - } - @Override public int getLength() { return -1; diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java index d6c35cbffa8cc..ad1b0f010ec09 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/ValueSerializer.java @@ -62,8 +62,8 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return true; + public ValueSerializer duplicate() { + return new ValueSerializer(type); } @Override diff --git a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java index c89733e0321d0..777122e025290 100644 --- a/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java +++ b/flink-java/src/main/java/org/apache/flink/api/java/typeutils/runtime/WritableSerializer.java @@ -98,8 +98,8 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return true; + public WritableSerializer duplicate() { + return new WritableSerializer(typeClass); } // -------------------------------------------------------------------------------------------- diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java index 64dae22c420ae..118e707cccfab 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/CollectionInputFormatTest.java @@ -278,8 +278,8 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return false; + public TestSerializer duplicate() { + return this; } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java index f494ca7369f53..0080d63a88c50 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/sort/LargeRecordHandler.java @@ -32,8 +32,7 @@ import org.apache.flink.api.common.typeutils.base.LongSerializer; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.api.java.typeutils.runtime.TupleComparator; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; import org.apache.flink.core.memory.MemorySegment; @@ -161,19 +160,8 @@ public long addRecord(T record) throws IOException { keySerializer = new TupleSerializer((Class) Tuple.getTupleClass(numKeyFields+1), tupleSers); keyComparator = new TupleComparator(keyPos, keyComps, keySers); - // create the serializer factory for the tuple serializer - if (keySerializer.isStateful()) { - ClassLoader cl = getClassLoader(tupleSers); - - RuntimeStatefulSerializerFactory factory = - new RuntimeStatefulSerializerFactory(keySerializer, keySerializer.getTupleClass()); - factory.setClassLoader(cl); - keySerializerFactory = factory; - } - else { - keySerializerFactory = new RuntimeStatelessSerializerFactory(keySerializer, keySerializer.getTupleClass()); - } - + keySerializerFactory = new RuntimeSerializerFactory(keySerializer, keySerializer.getTupleClass()); + keyTuple = keySerializer.createInstance(); } @@ -399,18 +387,6 @@ private static TypeSerializer createSerializer(Object key, int pos) { } } - private static ClassLoader getClassLoader(Object[] objects) { - final ClassLoader appCl = LargeRecordHandler.class.getClassLoader(); - - for (Object o : objects) { - if (o != null && o.getClass().getClassLoader() != appCl) { - return o.getClass().getClassLoader(); - } - } - - return appCl; - } - private static final class FetchingIterator implements MutableObjectIterator { private final TypeSerializer serializer; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java index b614709734a7d..02bffecd29a18 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/drivers/TestTaskContext.java @@ -23,7 +23,7 @@ import org.apache.flink.api.common.typeutils.TypeComparator; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; @@ -89,13 +89,13 @@ public void setInput2(MutableObjectIterator input, TypeSerializerFactory< @SuppressWarnings("unchecked") public void setInput1(MutableObjectIterator input, TypeSerializer serializer) { this.input1 = input; - this.serializer1 = new RuntimeStatefulSerializerFactory(serializer, (Class) serializer.createInstance().getClass()); + this.serializer1 = new RuntimeSerializerFactory(serializer, (Class) serializer.createInstance().getClass()); } @SuppressWarnings("unchecked") public void setInput2(MutableObjectIterator input, TypeSerializer serializer) { this.input2 = input; - this.serializer2 = new RuntimeStatefulSerializerFactory(serializer, (Class) serializer.createInstance().getClass()); + this.serializer2 = new RuntimeSerializerFactory(serializer, (Class) serializer.createInstance().getClass()); } public void setComparator1(TypeComparator comparator) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java index 6a0c5bf7fb141..7403ab03c102e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/ExternalSortLargeRecordsITCase.java @@ -25,7 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.ValueTypeInfo; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -125,7 +125,7 @@ public Tuple2 next() { Sorter> sorter = new UnilateralSortMerger>( this.memoryManager, this.ioManager, source, this.parentTask, - new RuntimeStatefulSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), + new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), comparator, 1.0, 1, 128, 0.7f); // check order @@ -194,7 +194,7 @@ public Tuple2 next() { Sorter> sorter = new UnilateralSortMerger>( this.memoryManager, this.ioManager, source, this.parentTask, - new RuntimeStatefulSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), + new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), comparator, 1.0, 1, 128, 0.7f); // check order @@ -278,7 +278,7 @@ else if (num % MEDIUM_REC_INTERVAL == 0) { Sorter> sorter = new UnilateralSortMerger>( this.memoryManager, this.ioManager, source, this.parentTask, - new RuntimeStatefulSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), + new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), comparator, 1.0, 1, 128, 0.7f); // check order @@ -348,7 +348,7 @@ public Tuple2 next() { Sorter> sorter = new UnilateralSortMerger>( this.memoryManager, this.ioManager, source, this.parentTask, - new RuntimeStatefulSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), + new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), comparator, 1.0, 1, 128, 0.7f); // check order diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java index a711d8088a657..084da41f89d3d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringSortingITCase.java @@ -33,12 +33,11 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeInfoParser; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.util.MutableObjectIterator; import org.junit.Assert; @@ -91,7 +90,7 @@ public void testStringSorting() { MutableObjectIterator inputIterator = new StringReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeStatelessSerializerFactory(serializer, String.class), comparator, 1.0, 4, 0.8f); + new RuntimeSerializerFactory(serializer, String.class), comparator, 1.0, 4, 0.8f); MutableObjectIterator sortedData = sorter.getIterator(); @@ -183,7 +182,7 @@ public void testStringTuplesSorting() { MutableObjectIterator> inputIterator = new StringTupleReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger>(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeStatelessSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), comparator, 1.0, 4, 0.8f); + new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), comparator, 1.0, 4, 0.8f); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java index f4ceed3f3d5f8..9e925e8aa6bca 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/sort/MassiveStringValueSortingITCase.java @@ -33,12 +33,11 @@ import org.apache.flink.api.java.typeutils.TypeInfoParser; import org.apache.flink.api.java.typeutils.runtime.CopyableValueComparator; import org.apache.flink.api.java.typeutils.runtime.CopyableValueSerializer; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.memorymanager.DefaultMemoryManager; import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.operators.sort.UnilateralSortMerger; import org.apache.flink.runtime.operators.testutils.DummyInvokable; import org.apache.flink.types.StringValue; import org.apache.flink.util.MutableObjectIterator; @@ -91,7 +90,7 @@ public void testStringValueSorting() { MutableObjectIterator inputIterator = new StringValueReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeStatelessSerializerFactory(serializer, StringValue.class), comparator, 1.0, 4, 0.8f); + new RuntimeSerializerFactory(serializer, StringValue.class), comparator, 1.0, 4, 0.8f); MutableObjectIterator sortedData = sorter.getIterator(); @@ -185,7 +184,7 @@ public void testStringValueTuplesSorting() { MutableObjectIterator> inputIterator = new StringValueTupleReaderMutableObjectIterator(reader); sorter = new UnilateralSortMerger>(mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeStatelessSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), comparator, 1.0, 4, 0.8f); + new RuntimeSerializerFactory>(serializer, (Class>) (Class) Tuple2.class), comparator, 1.0, 4, 0.8f); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java index 2134bcd134825..69dfeb9ac9971 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntListSerializer.java @@ -35,8 +35,8 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return false; + public IntListSerializer duplicate() { + return this; } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java index 361585d2fd8ff..c2571cc6beb32 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/IntPairSerializer.java @@ -38,8 +38,8 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return false; + public IntPairSerializer duplicate() { + return this; } @Override @@ -105,12 +105,12 @@ public IntPairSerializer getSerializer() { public Class getDataType() { return IntPair.class; } - + @Override public int hashCode() { return 42; } - + public boolean equals(Object obj) { return obj.getClass() == IntPairSerializerFactory.class; }; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java index a38633cc40c4d..388e8bd744cfb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/testutils/types/StringPairSerializer.java @@ -35,8 +35,8 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return false; + public StringPairSerializer duplicate() { + return this; } @Override diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala index 6169af3e73963..2a76c379fa425 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/CaseClassSerializer.scala @@ -17,6 +17,7 @@ */ package org.apache.flink.api.scala.typeutils +import org.apache.commons.lang.SerializationUtils import org.apache.flink.api.common.typeutils.TypeSerializer import org.apache.flink.api.java.typeutils.runtime.TupleSerializerBase import org.apache.flink.core.memory.{DataOutputView, DataInputView} @@ -28,12 +29,23 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView} abstract class CaseClassSerializer[T <: Product]( clazz: Class[T], scalaFieldSerializers: Array[TypeSerializer[_]]) - extends TupleSerializerBase[T](clazz, scalaFieldSerializers) { + extends TupleSerializerBase[T](clazz, scalaFieldSerializers) with Cloneable { @transient var fields : Array[AnyRef] = _ @transient var instanceCreationFailed : Boolean = false + override def duplicate = { + val result = this.clone().asInstanceOf[CaseClassSerializer[T]] + + // set transient fields to null and make copy of serializers + result.fields = null + result.instanceCreationFailed = false + result.fieldSerializers = fieldSerializers.map(_.duplicate()) + + result + } + def createInstance: T = { if (instanceCreationFailed) { null.asInstanceOf[T] @@ -56,8 +68,6 @@ abstract class CaseClassSerializer[T <: Product]( } } - override def isStateful() = true - def copy(from: T, reuse: T): T = { copy(from) } diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala index cb72486fc6422..65628a0b5ca0e 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/EitherSerializer.scala @@ -28,7 +28,7 @@ class EitherSerializer[A, B, T <: Either[A, B]]( val rightSerializer: TypeSerializer[B]) extends TypeSerializer[T] { - override def isStateful: Boolean = false + override def duplicate: EitherSerializer[A,B,T] = this override def createInstance: T = { Left(null).asInstanceOf[T] diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala index 8685cc576a78c..147a060a384f9 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/NothingSerializer.scala @@ -27,7 +27,7 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView} */ class NothingSerializer extends TypeSerializer[Any] { - override def isStateful: Boolean = false + override def duplicate: NothingSerializer = this override def createInstance: Any = { Integer.valueOf(-1) diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala index 4f8f632274b37..488710d685c25 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/OptionSerializer.scala @@ -26,7 +26,7 @@ import org.apache.flink.core.memory.{DataOutputView, DataInputView} class OptionSerializer[A](val elemSerializer: TypeSerializer[A]) extends TypeSerializer[Option[A]] { - override def isStateful: Boolean = false + override def duplicate: OptionSerializer[A] = this override def createInstance: Option[A] = { None diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala index fa519d97e1644..38fd14b043e4e 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TraversableSerializer.scala @@ -28,13 +28,25 @@ import scala.collection.generic.CanBuildFrom * Serializer for Scala Collections. */ abstract class TraversableSerializer[T <: TraversableOnce[E], E]( - val elementSerializer: TypeSerializer[E]) - extends TypeSerializer[T] { + var elementSerializer: TypeSerializer[E]) + extends TypeSerializer[T] with Cloneable { def getCbf: CanBuildFrom[T, E, T] @transient var cbf: CanBuildFrom[T, E, T] = getCbf + override def duplicate = { + val duplicateElementSerializer = elementSerializer.duplicate() + if (duplicateElementSerializer == elementSerializer) { + // is not stateful, so return ourselves + this + } else { + val result = this.clone().asInstanceOf[TraversableSerializer[T, E]] + result.elementSerializer = elementSerializer.duplicate() + result + } + } + private def readObject(in: ObjectInputStream): Unit = { in.defaultReadObject() cbf = getCbf @@ -85,8 +97,6 @@ abstract class TraversableSerializer[T <: TraversableOnce[E], E]( } } - override def isStateful: Boolean = false - override def deserialize(source: DataInputView): T = { val len = source.readInt() val builder = cbf() diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala index e94c944903a89..1f565f205b88f 100644 --- a/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/typeutils/TrySerializer.scala @@ -29,7 +29,7 @@ import scala.util.{Success, Try, Failure} class TrySerializer[A](val elemSerializer: TypeSerializer[A]) extends TypeSerializer[Try[A]] { - override def isStateful: Boolean = false + override def duplicate: TrySerializer[A] = this val throwableSerializer = new KryoSerializer[Throwable](classOf[Throwable]) diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java index 5dfe4b29b99cf..a5ef3a73785ab 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/function/source/FileSourceFunction.java @@ -24,8 +24,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.common.typeutils.TypeSerializerFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatefulSerializerFactory; -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory; +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; @@ -49,12 +48,7 @@ public FileSourceFunction(InputFormat format, TypeInformation private static TypeSerializerFactory createSerializer(TypeInformation typeInfo) { TypeSerializer serializer = typeInfo.createSerializer(); - if (serializer.isStateful()) { - return new RuntimeStatefulSerializerFactory(serializer, typeInfo.getTypeClass()); - } else { - return new RuntimeStatelessSerializerFactory(serializer, - typeInfo.getTypeClass()); - } + return new RuntimeSerializerFactory(serializer, typeInfo.getTypeClass()); } @Override diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java index 85faa9e2f3cc2..98f12ec65f0f1 100755 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/streamrecord/StreamRecordSerializer.java @@ -47,8 +47,8 @@ public boolean isImmutableType() { } @Override - public boolean isStateful() { - return false; + public StreamRecordSerializer duplicate() { + return this; } @Override diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java index 822b4f23a60ec..751ced3991e36 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithAdjacencyListSerializer.java @@ -34,11 +34,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public VertexWithAdjacencyList createInstance() { return new VertexWithAdjacencyList(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java index e972cd1b82901..8ff023372e00c 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankAndDanglingSerializer.java @@ -34,11 +34,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public VertexWithRankAndDangling createInstance() { return new VertexWithRankAndDangling(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java index 928d4f41f9783..2c3abcd741092 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java +++ b/flink-tests/src/test/java/org/apache/flink/test/iterative/nephele/customdanglingpagerank/types/VertexWithRankSerializer.java @@ -34,11 +34,6 @@ public boolean isImmutableType() { return false; } - @Override - public boolean isStateful() { - return false; - } - @Override public VertexWithRank createInstance() { return new VertexWithRank(); diff --git a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala index 6e5296b3025b8..4f8816fd6ec96 100644 --- a/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala +++ b/flink-tests/src/test/scala/org/apache/flink/api/scala/misc/MassiveCaseClassSortingITCase.scala @@ -31,7 +31,7 @@ import org.apache.flink.runtime.memorymanager.DefaultMemoryManager import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.common.typeutils.CompositeType import org.apache.flink.runtime.operators.sort.UnilateralSortMerger -import org.apache.flink.api.java.typeutils.runtime.RuntimeStatelessSerializerFactory +import org.apache.flink.api.java.typeutils.runtime.RuntimeSerializerFactory import org.junit.Assert._ import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable @@ -89,7 +89,7 @@ class MassiveCaseClassSortingITCase { sorter = new UnilateralSortMerger[StringTuple](mm, ioMan, inputIterator, new DummyInvokable(), - new RuntimeStatelessSerializerFactory[StringTuple](serializer, classOf[StringTuple]), + new RuntimeSerializerFactory[StringTuple](serializer, classOf[StringTuple]), comparator, 1.0, 4, 0.8f) val sortedData = sorter.getIterator