Skip to content

Commit

Permalink
[FLINK-11329] [DataStream] Migrate StreamElementSerializer to use new…
Browse files Browse the repository at this point in the history
… compatibility API
  • Loading branch information
Igal Shilman authored and tzulitai committed Jan 30, 2019
1 parent a51adc3 commit 17618f3
Show file tree
Hide file tree
Showing 6 changed files with 109 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,11 @@
package org.apache.flink.streaming.runtime.streamrecord;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerUtil;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.jobgraph.OperatorID;
Expand Down Expand Up @@ -283,56 +279,75 @@ public int hashCode() {
// --------------------------------------------------------------------------------------------

@Override
public StreamElementSerializerConfigSnapshot<T> snapshotConfiguration() {
return new StreamElementSerializerConfigSnapshot<>(typeSerializer);
public StreamElementSerializerSnapshot<T> snapshotConfiguration() {
return new StreamElementSerializerSnapshot<>(this);
}

@Override
public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> previousTypeSerializerAndConfig;
/**
* Configuration snapshot specific to the {@link StreamElementSerializer}.
* @deprecated see {@link StreamElementSerializerSnapshot}.
*/
@Deprecated
public static final class StreamElementSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<StreamElement> {

// we are compatible for data written by ourselves or the legacy MultiplexingStreamRecordSerializer
if (configSnapshot instanceof StreamElementSerializerConfigSnapshot) {
previousTypeSerializerAndConfig =
((StreamElementSerializerConfigSnapshot<?>) configSnapshot).getSingleNestedSerializerAndConfig();
} else {
return CompatibilityResult.requiresMigration();
}
private static final int VERSION = 1;

CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
previousTypeSerializerAndConfig.f0,
UnloadableDummyTypeSerializer.class,
previousTypeSerializerAndConfig.f1,
typeSerializer);
/** This empty nullary constructor is required for deserializing the configuration. */
public StreamElementSerializerConfigSnapshot() {}

if (!compatResult.isRequiresMigration()) {
return CompatibilityResult.compatible();
} else if (compatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new StreamElementSerializer<>(
new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
} else {
return CompatibilityResult.requiresMigration();
@Override
public int getVersion() {
return VERSION;
}

@Override
public TypeSerializerSchemaCompatibility<StreamElement> resolveSchemaCompatibility(TypeSerializer<StreamElement> newSerializer) {
return CompositeTypeSerializerUtil.delegateCompatibilityCheckToNewSnapshot(
newSerializer,
new StreamElementSerializerSnapshot<>(),
getSingleNestedSerializerAndConfig().f1);
}
}

/**
* Configuration snapshot specific to the {@link StreamElementSerializer}.
*/
public static final class StreamElementSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot<StreamElement> {
public static final class StreamElementSerializerSnapshot<T>
extends CompositeTypeSerializerSnapshot<StreamElement, StreamElementSerializer<T>> {

private static final int VERSION = 1;
private static final int VERSION = 2;

/** This empty nullary constructor is required for deserializing the configuration. */
public StreamElementSerializerConfigSnapshot() {}
@SuppressWarnings("WeakerAccess")
public StreamElementSerializerSnapshot() {
super(serializerClass());
}

public StreamElementSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) {
super(typeSerializer);
StreamElementSerializerSnapshot(StreamElementSerializer<T> serializerInstance) {
super(serializerInstance);
}

@Override
public int getVersion() {
protected int getCurrentOuterSnapshotVersion() {
return VERSION;
}

@Override
protected TypeSerializer<?>[] getNestedSerializers(StreamElementSerializer<T> outerSerializer) {
return new TypeSerializer[]{outerSerializer.getContainedTypeSerializer()};
}

@Override
protected StreamElementSerializer<T> createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
@SuppressWarnings("unchecked")
TypeSerializer<T> casted = (TypeSerializer<T>) nestedSerializers[0];

return new StreamElementSerializer<>(casted);
}

@SuppressWarnings("unchecked")
private static <T> Class<StreamElementSerializer<T>> serializerClass() {
Class<?> streamElementSerializerClass = StreamElementSerializer.class;
return (Class<StreamElementSerializer<T>>) streamElementSerializerClass;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.runtime.streamrecord;

import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer.StreamElementSerializerSnapshot;
import org.apache.flink.testutils.migration.MigrationVersion;

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Collection;

/**
* Migration tests for {@link StreamElementSerializer}.
*/
@RunWith(Parameterized.class)
public class StreamElementSerializerMigrationTest extends TypeSerializerSnapshotMigrationTestBase<StreamElement> {

public StreamElementSerializerMigrationTest(TestSpecification<StreamElement> testSpecification) {
super(testSpecification);
}

@SuppressWarnings("unchecked")
@Parameterized.Parameters(name = "Test Specification = {0}")
public static Collection<TestSpecification<?>> testSpecifications() {

final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);

testSpecifications.add(
"stream-element-serializer",
StreamElementSerializer.class,
StreamElementSerializerSnapshot.class,
() -> new StreamElementSerializer<>(StringSerializer.INSTANCE));

return testSpecifications.get();
}
}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.

0 comments on commit 17618f3

Please sign in to comment.