Skip to content

Commit

Permalink
[FLINK-10778] [tests] Make TypeSerializerSnapshotMigrationTestBase aw…
Browse files Browse the repository at this point in the history
…are of snapshot version

Before, the tests in TypeSerializerSnapshotMigrationTestBase always
assumed that the test serializer snapshotw were written with Flink 1.6.

This commit makes this flexible, so that we can allow subclasses to
specify different snapshot versions for each TestSpecification.
  • Loading branch information
tzulitai committed Jan 17, 2019
1 parent fa3432a commit e21c3d4
Show file tree
Hide file tree
Showing 13 changed files with 150 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
import org.apache.flink.api.java.typeutils.runtime.JavaEitherSerializerSnapshot;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.types.Either;

import org.junit.runner.RunWith;
Expand All @@ -48,14 +49,22 @@ public static Collection<Object[]> testSpecifications() {

// Either<String, Integer>

final TestSpecification<Either<String, Integer>> either = TestSpecification.<Either<String, Integer>>builder("1.6-either", EitherSerializer.class, JavaEitherSerializerSnapshot.class)
final TestSpecification<Either<String, Integer>> either =TestSpecification.<Either<String, Integer>>builder(
"1.6-either",
EitherSerializer.class,
JavaEitherSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE))
.withSnapshotDataLocation("flink-1.6-either-type-serializer-snapshot")
.withTestData("flink-1.6-either-type-serializer-data", 10);

// GenericArray<String>

final TestSpecification<String[]> array = TestSpecification.<String[]>builder("1.6-generic-array", GenericArraySerializer.class, GenericArraySerializerSnapshot.class)
final TestSpecification<String[]> array = TestSpecification.<String[]>builder(
"1.6-generic-array",
GenericArraySerializer.class,
GenericArraySerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
.withTestData("flink-1.6-array-type-serializer-data", 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.flink.core.memory.DataInputDeserializer;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputSerializer;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
Expand Down Expand Up @@ -125,13 +126,17 @@ private TypeSerializerSnapshot<ElementT> writeAndThenReadTheSnapshot(
TypeSerializerSnapshotSerializationUtil.writeSerializerSnapshot(out, newSnapshot, serializer);

DataInputView in = new DataInputDeserializer(out.wrapAsByteBuffer());
return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(in, Thread.currentThread().getContextClassLoader(), null);
return readSnapshot(in);
}

private TypeSerializerSnapshot<ElementT> snapshotUnderTest() {
DataInputView input = contentsOf(testSpecification.getSnapshotDataLocation());
try {
return readPre17SnapshotFormat(input);
if (!testSpecification.getTestMigrationVersion().isNewerVersionThan(MigrationVersion.v1_6)) {
return readPre17SnapshotFormat(input);
} else {
return readSnapshot(input);
}
}
catch (IOException e) {
throw new RuntimeException("Unable to read " + testSpecification.getSnapshotDataLocation(), e);
Expand All @@ -148,6 +153,11 @@ private TypeSerializerSnapshot<ElementT> readPre17SnapshotFormat(DataInputView i
return (TypeSerializerSnapshot<ElementT>) serializers.get(0).f1;
}

private TypeSerializerSnapshot<ElementT> readSnapshot(DataInputView in) throws IOException {
return TypeSerializerSnapshotSerializationUtil.readSerializerSnapshot(
in, Thread.currentThread().getContextClassLoader(), null);
}

private DataInputView dataUnderTest() {
return contentsOf(testSpecification.getTestDataLocation());
}
Expand Down Expand Up @@ -189,6 +199,7 @@ protected static final class TestSpecification<T> {
private final Class<? extends TypeSerializer<T>> serializerType;
private final Class<? extends TypeSerializerSnapshot<T>> snapshotClass;
private final String name;
private final MigrationVersion testMigrationVersion;
private Supplier<? extends TypeSerializer<T>> serializerProvider;
private String snapshotDataLocation;
private String testDataLocation;
Expand All @@ -198,22 +209,26 @@ protected static final class TestSpecification<T> {
public static <T> TestSpecification<T> builder(
String name,
Class<? extends TypeSerializer> serializerClass,
Class<? extends TypeSerializerSnapshot> snapshotClass) {
Class<? extends TypeSerializerSnapshot> snapshotClass,
MigrationVersion testMigrationVersion) {

return new TestSpecification<>(
name,
(Class<? extends TypeSerializer<T>>) serializerClass,
(Class<? extends TypeSerializerSnapshot<T>>) snapshotClass);
(Class<? extends TypeSerializerSnapshot<T>>) snapshotClass,
testMigrationVersion);
}

private TestSpecification(
String name,
Class<? extends TypeSerializer<T>> serializerType,
Class<? extends TypeSerializerSnapshot<T>> snapshotClass) {
Class<? extends TypeSerializerSnapshot<T>> snapshotClass,
MigrationVersion testMigrationVersion) {

this.name = name;
this.serializerType = serializerType;
this.snapshotClass = snapshotClass;
this.testMigrationVersion = testMigrationVersion;
}

public TestSpecification<T> withSerializerProvider(Supplier<? extends TypeSerializer<T>> serializerProvider) {
Expand Down Expand Up @@ -249,6 +264,10 @@ private Path getSnapshotDataLocation() {
return resourcePath(this.snapshotDataLocation);
}

private MigrationVersion getTestMigrationVersion() {
return testMigrationVersion;
}

public Class<? extends TypeSerializerSnapshot<T>> getSnapshotClass() {
return snapshotClass;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.flink.api.common.typeutils.TypeSerializerSnapshotMigrationTestBase;

import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.types.BooleanValue;
import org.apache.flink.types.ByteValue;
import org.apache.flink.types.CharValue;
Expand Down Expand Up @@ -61,7 +62,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<BigDecimal> bigDec = TestSpecification.<BigDecimal>builder(
"1.6-big-dec",
BigDecSerializer.class,
BigDecSerializer.BigDecSerializerSnapshot.class)
BigDecSerializer.BigDecSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> BigDecSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-big-dec-serializer-snapshot")
.withTestData("flink-1.6-big-dec-serializer-data", 10);
Expand All @@ -71,7 +73,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<BigInteger> bigInt = TestSpecification.<BigInteger>builder(
"1.6-big-int",
BigIntSerializer.class,
BigIntSerializer.BigIntSerializerSnapshot.class)
BigIntSerializer.BigIntSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> BigIntSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-big-int-serializer-snapshot")
.withTestData("flink-1.6-big-int-serializer-data", 10);
Expand All @@ -81,7 +84,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<Boolean> booleanType = TestSpecification.<Boolean>builder(
"1.6-boolean",
BooleanSerializer.class,
BooleanSerializer.BooleanSerializerSnapshot.class)
BooleanSerializer.BooleanSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> BooleanSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-boolean-serializer-snapshot")
.withTestData("flink-1.6-boolean-serializer-data", 10);
Expand All @@ -91,7 +95,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<BooleanValue> booleanValue = TestSpecification.<BooleanValue>builder(
"1.6-boolean-value",
BooleanValueSerializer.class,
BooleanValueSerializer.BooleanValueSerializerSnapshot.class)
BooleanValueSerializer.BooleanValueSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> BooleanValueSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-boolean-value-serializer-snapshot")
.withTestData("flink-1.6-boolean-value-serializer-data", 10);
Expand All @@ -101,7 +106,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<Byte> byteType = TestSpecification.<Byte>builder(
"1.6-byte",
ByteSerializer.class,
ByteSerializer.ByteSerializerSnapshot.class)
ByteSerializer.ByteSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> ByteSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-byte-serializer-snapshot")
.withTestData("flink-1.6-byte-serializer-data", 10);
Expand All @@ -111,7 +117,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<ByteValue> byteValue = TestSpecification.<ByteValue>builder(
"1.6-byte-value",
ByteValueSerializer.class,
ByteValueSerializer.ByteValueSerializerSnapshot.class)
ByteValueSerializer.ByteValueSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> ByteValueSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-byte-value-serializer-snapshot")
.withTestData("flink-1.6-byte-value-serializer-data", 10);
Expand All @@ -121,7 +128,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<Character> charType = TestSpecification.<Character>builder(
"1.6-char",
CharSerializer.class,
CharSerializer.CharSerializerSnapshot.class)
CharSerializer.CharSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> CharSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-char-serializer-snapshot")
.withTestData("flink-1.6-char-serializer-data", 10);
Expand All @@ -131,7 +139,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<CharValue> charValue = TestSpecification.<CharValue>builder(
"1.6-char-value",
CharValueSerializer.class,
CharValueSerializer.CharValueSerializerSnapshot.class)
CharValueSerializer.CharValueSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> CharValueSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-char-value-serializer-snapshot")
.withTestData("flink-1.6-char-value-serializer-data", 10);
Expand All @@ -141,7 +150,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<Date> javaDate = TestSpecification.<Date>builder(
"1.6-date",
DateSerializer.class,
DateSerializer.DateSerializerSnapshot.class)
DateSerializer.DateSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> DateSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-date-serializer-snapshot")
.withTestData("flink-1.6-date-serializer-data", 10);
Expand All @@ -151,7 +161,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<Double> doubleType = TestSpecification.<Double>builder(
"1.6-double",
DoubleSerializer.class,
DoubleSerializer.DoubleSerializerSnapshot.class)
DoubleSerializer.DoubleSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> DoubleSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-double-serializer-snapshot")
.withTestData("flink-1.6-double-serializer-data", 10);
Expand All @@ -161,7 +172,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<DoubleValue> doubleValue = TestSpecification.<DoubleValue>builder(
"1.6-double-value",
DoubleValueSerializer.class,
DoubleValueSerializer.DoubleValueSerializerSnapshot.class)
DoubleValueSerializer.DoubleValueSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> DoubleValueSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-double-value-serializer-snapshot")
.withTestData("flink-1.6-double-value-serializer-data", 10);
Expand All @@ -171,7 +183,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<Float> floatType = TestSpecification.<Float>builder(
"1.6-float",
FloatSerializer.class,
FloatSerializer.FloatSerializerSnapshot.class)
FloatSerializer.FloatSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> FloatSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-float-serializer-snapshot")
.withTestData("flink-1.6-float-serializer-data", 10);
Expand All @@ -181,7 +194,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<FloatValue> floatValue = TestSpecification.<FloatValue>builder(
"1.6-float-value",
FloatValueSerializer.class,
FloatValueSerializer.FloatValueSerializerSnapshot.class)
FloatValueSerializer.FloatValueSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> FloatValueSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-float-value-serializer-snapshot")
.withTestData("flink-1.6-float-value-serializer-data", 10);
Expand All @@ -191,7 +205,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<Integer> intType = TestSpecification.<Integer>builder(
"1.6-int",
IntSerializer.class,
IntSerializer.IntSerializerSnapshot.class)
IntSerializer.IntSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> IntSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-int-serializer-snapshot")
.withTestData("flink-1.6-int-serializer-data", 10);
Expand All @@ -201,7 +216,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<IntValue> intValue = TestSpecification.<IntValue>builder(
"1.6-int-value",
IntValueSerializer.class,
IntValueSerializer.IntValueSerializerSnapshot.class)
IntValueSerializer.IntValueSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> IntValueSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-int-value-serializer-snapshot")
.withTestData("flink-1.6-int-value-serializer-data", 10);
Expand All @@ -211,7 +227,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<Long> longType = TestSpecification.<Long>builder(
"1.6-long",
LongSerializer.class,
LongSerializer.LongSerializerSnapshot.class)
LongSerializer.LongSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> LongSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-long-serializer-snapshot")
.withTestData("flink-1.6-long-serializer-data", 10);
Expand All @@ -221,7 +238,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<LongValue> longValue = TestSpecification.<LongValue>builder(
"1.6-long-value",
LongValueSerializer.class,
LongValueSerializer.LongValueSerializerSnapshot.class)
LongValueSerializer.LongValueSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> LongValueSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-long-value-serializer-snapshot")
.withTestData("flink-1.6-long-value-serializer-data", 10);
Expand All @@ -231,7 +249,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<NullValue> nullValue = TestSpecification.<NullValue>builder(
"1.6-null-value",
NullValueSerializer.class,
NullValueSerializer.NullValueSerializerSnapshot.class)
NullValueSerializer.NullValueSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> NullValueSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-null-value-serializer-snapshot")
.withTestData("flink-1.6-null-value-serializer-data", 10);
Expand All @@ -241,7 +260,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<Short> shortType = TestSpecification.<Short>builder(
"1.6-short",
ShortSerializer.class,
ShortSerializer.ShortSerializerSnapshot.class)
ShortSerializer.ShortSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> ShortSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-short-serializer-snapshot")
.withTestData("flink-1.6-short-serializer-data", 10);
Expand All @@ -251,7 +271,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<ShortValue> shortValue = TestSpecification.<ShortValue>builder(
"1.6-short-value",
ShortValueSerializer.class,
ShortValueSerializer.ShortValueSerializerSnapshot.class)
ShortValueSerializer.ShortValueSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> ShortValueSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-short-value-serializer-snapshot")
.withTestData("flink-1.6-short-value-serializer-data", 10);
Expand All @@ -261,7 +282,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<java.sql.Date> sqlDate = TestSpecification.<java.sql.Date>builder(
"1.6-sql-date",
SqlDateSerializer.class,
SqlDateSerializer.SqlDateSerializerSnapshot.class)
SqlDateSerializer.SqlDateSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> SqlDateSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-sql-date-serializer-snapshot")
.withTestData("flink-1.6-sql-date-serializer-data", 10);
Expand All @@ -271,7 +293,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<Time> sqlTime = TestSpecification.<Time>builder(
"1.6-sql-time",
SqlTimeSerializer.class,
SqlTimeSerializer.SqlTimeSerializerSnapshot.class)
SqlTimeSerializer.SqlTimeSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> SqlTimeSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-sql-time-serializer-snapshot")
.withTestData("flink-1.6-sql-time-serializer-data", 10);
Expand All @@ -281,7 +304,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<Timestamp> sqlTimestamp = TestSpecification.<Timestamp>builder(
"1.6-sql-timestamp",
SqlTimestampSerializer.class,
SqlTimestampSerializer.SqlTimestampSerializerSnapshot.class)
SqlTimestampSerializer.SqlTimestampSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> SqlTimestampSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-sql-timestamp-serializer-snapshot")
.withTestData("flink-1.6-sql-timestamp-serializer-data", 10);
Expand All @@ -291,7 +315,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<String> stringType = TestSpecification.<String>builder(
"1.6-string",
StringSerializer.class,
StringSerializer.StringSerializerSnapshot.class)
StringSerializer.StringSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> StringSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-string-serializer-snapshot")
.withTestData("flink-1.6-string-serializer-data", 10);
Expand All @@ -301,7 +326,8 @@ public static Collection<Object> testSpecifications() {
final TestSpecification<StringValue> stringValue = TestSpecification.<StringValue>builder(
"1.6-string-value",
StringValueSerializer.class,
StringValueSerializer.StringValueSerializerSnapshot.class)
StringValueSerializer.StringValueSerializerSnapshot.class,
MigrationVersion.v1_6)
.withSerializerProvider(() -> StringValueSerializer.INSTANCE)
.withSnapshotDataLocation("flink-1.6-string-value-serializer-snapshot")
.withTestData("flink-1.6-string-value-serializer-data", 10);
Expand Down
Loading

0 comments on commit e21c3d4

Please sign in to comment.