diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java index f84ff15a6ee91..e8af0f9cdaf6c 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializer.java @@ -19,15 +19,11 @@ package org.apache.flink.streaming.runtime.streamrecord; import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot; +import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil; import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.runtime.jobgraph.OperatorID; @@ -283,56 +279,75 @@ public int hashCode() { // -------------------------------------------------------------------------------------------- @Override - public StreamElementSerializerConfigSnapshot snapshotConfiguration() { - return new StreamElementSerializerConfigSnapshot<>(typeSerializer); + public StreamElementSerializerSnapshot snapshotConfiguration() { + return new StreamElementSerializerSnapshot<>(this); } - @Override - public CompatibilityResult ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - Tuple2, TypeSerializerSnapshot> previousTypeSerializerAndConfig; + /** + * Configuration snapshot specific to the {@link StreamElementSerializer}. + * @deprecated see {@link StreamElementSerializerSnapshot}. + */ + @Deprecated + public static final class StreamElementSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { - // we are compatible for data written by ourselves or the legacy MultiplexingStreamRecordSerializer - if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) { - previousTypeSerializerAndConfig = - ((StreamElementSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); - } else { - return CompatibilityResult.requiresMigration(); - } + private static final int VERSION = 1; - CompatibilityResult compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousTypeSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousTypeSerializerAndConfig.f1, - typeSerializer); + /** This empty nullary constructor is required for deserializing the configuration. */ + public StreamElementSerializerConfigSnapshot() {} - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new StreamElementSerializer<>( - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); - } else { - return CompatibilityResult.requiresMigration(); + @Override + public int getVersion() { + return VERSION; + } + + @Override + public TypeSerializerSchemaCompatibility resolveSchemaCompatibility(TypeSerializer newSerializer) { + return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot( + newSerializer, + new StreamElementSerializerSnapshot<>(), + getSingleNestedSerializerAndConfig().f1); } } /** * Configuration snapshot specific to the {@link StreamElementSerializer}. */ - public static final class StreamElementSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot { + public static final class StreamElementSerializerSnapshot + extends CompositeTypeSerializerSnapshot> { - private static final int VERSION = 1; + private static final int VERSION = 2; - /** This empty nullary constructor is required for deserializing the configuration. */ - public StreamElementSerializerConfigSnapshot() {} + @SuppressWarnings("WeakerAccess") + public StreamElementSerializerSnapshot() { + super(serializerClass()); + } - public StreamElementSerializerConfigSnapshot(TypeSerializer typeSerializer) { - super(typeSerializer); + StreamElementSerializerSnapshot(StreamElementSerializer serializerInstance) { + super(serializerInstance); } @Override - public int getVersion() { + protected int getCurrentOuterSnapshotVersion() { return VERSION; } + + @Override + protected TypeSerializer[] getNestedSerializers(StreamElementSerializer outerSerializer) { + return new TypeSerializer[]{outerSerializer.getContainedTypeSerializer()}; + } + + @Override + protected StreamElementSerializer createOuterSerializerWithNestedSerializers(TypeSerializer[] nestedSerializers) { + @SuppressWarnings("unchecked") + TypeSerializer casted = (TypeSerializer) nestedSerializers[0]; + + return new StreamElementSerializer<>(casted); + } + + @SuppressWarnings("unchecked") + private static Class> serializerClass() { + Class streamElementSerializerClass = StreamElementSerializer.class; + return (Class>) streamElementSerializerClass; + } } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java new file mode 100644 index 0000000000000..b6169d48a5729 --- /dev/null +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/streamrecord/StreamElementSerializerMigrationTest.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.runtime.streamrecord; + +import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase; +import org.apache.flink.api.common.typeutils.base.StringSerializer; +import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.StreamElementSerializerSnapshot; +import org.apache.flink.testutils.migration.MigrationVersion; + +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.Collection; + +/** + * Migration tests for {@link StreamElementSerializer}. + */ +@RunWith(Parameterized.class) +public class StreamElementSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase { + + public StreamElementSerializerMigrationTest(TestSpecification testSpecification) { + super(testSpecification); + } + + @SuppressWarnings("unchecked") + @Parameterized.Parameters(name = "Test Specification = {0}") + public static Collection> testSpecifications() { + + final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7); + + testSpecifications.add( + "stream-element-serializer", + StreamElementSerializer.class, + StreamElementSerializerSnapshot.class, + () -> new StreamElementSerializer<>(StringSerializer.INSTANCE)); + + return testSpecifications.get(); + } +} diff --git a/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data new file mode 100644 index 0000000000000..81b80c3422c2b Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-data differ diff --git a/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot new file mode 100644 index 0000000000000..8ffdb43b17b32 Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.6-stream-element-serializer-snapshot differ diff --git a/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data new file mode 100644 index 0000000000000..01f05e7e3f994 Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-data differ diff --git a/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot new file mode 100644 index 0000000000000..dc7f76b7144b9 Binary files /dev/null and b/flink-streaming-java/src/test/resources/flink-1.7-stream-element-serializer-snapshot differ