Skip to content

Commit

Permalink
[FLINK-10778] [tests] Extend all TypeSerializerSnapshotMigrationTestB…
Browse files Browse the repository at this point in the history
…ase subclasses to test restoring from 1.7

This closes apache#7504.
  • Loading branch information
tzulitai committed Jan 17, 2019
1 parent 8a49d73 commit 76dd766
Show file tree
Hide file tree
Showing 105 changed files with 412 additions and 593 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,10 @@
import org.apache.flink.api.java.typeutils.runtime.EitherSerializer;
import org.apache.flink.api.java.typeutils.runtime.JavaEitherSerializerSnapshot;
import org.apache.flink.testutils.migration.MigrationVersion;
import org.apache.flink.types.Either;

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.util.Arrays;
import java.util.Collection;

/**
Expand All @@ -45,33 +43,21 @@ public CompositeTypeSerializerSnapshotMigrationTest(TestSpecification<Object> te

@SuppressWarnings("unchecked")
@Parameterized.Parameters(name = "Test Specification = {0}")
public static Collection<Object[]> testSpecifications() {

// Either<String, Integer>

final TestSpecification<Either<String, Integer>> either =TestSpecification.<Either<String, Integer>>builder(
"1.6-either",
EitherSerializer.class,
JavaEitherSerializerSnapshot.class,
MigrationVersion.v1_6)
.withNewSerializerProvider(() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE))
.withSnapshotDataLocation("flink-1.6-either-type-serializer-snapshot")
.withTestData("flink-1.6-either-type-serializer-data", 10);

// GenericArray<String>

final TestSpecification<String[]> array = TestSpecification.<String[]>builder(
"1.6-generic-array",
GenericArraySerializer.class,
GenericArraySerializerSnapshot.class,
MigrationVersion.v1_6)
.withNewSerializerProvider(() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE))
.withSnapshotDataLocation("flink-1.6-array-type-serializer-snapshot")
.withTestData("flink-1.6-array-type-serializer-data", 10);

return Arrays.asList(
new Object[]{either},
new Object[]{array}
);
public static Collection<TestSpecification<?>> testSpecifications() {

final TestSpecifications testSpecifications = new TestSpecifications(MigrationVersion.v1_6, MigrationVersion.v1_7);

testSpecifications.add(
"either-serializer",
EitherSerializer.class,
JavaEitherSerializerSnapshot.class,
() -> new EitherSerializer<>(StringSerializer.INSTANCE, IntSerializer.INSTANCE));
testSpecifications.add(
"generic-array-serializer",
GenericArraySerializer.class,
GenericArraySerializerSnapshot.class,
() -> new GenericArraySerializer<>(String.class, StringSerializer.INSTANCE));

return testSpecifications.get();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,13 @@
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.function.Supplier;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.hamcrest.CoreMatchers.allOf;
import static org.hamcrest.CoreMatchers.instanceOf;
Expand Down Expand Up @@ -280,6 +284,111 @@ public String toString() {
}
}

/**
* Utility class to help build a collection of {@link TestSpecification} for
* multiple test migration versions. For each test specification added,
* an entry will be added for each specified migration version.
*/
protected static final class TestSpecifications {

private static final int DEFAULT_TEST_DATA_COUNT = 10;
private static final String DEFAULT_SNAPSHOT_FILENAME_FORMAT = "flink-%s-%s-snapshot";
private static final String DEFAULT_TEST_DATA_FILENAME_FORMAT = "flink-%s-%s-data";

private final Collection<TestSpecification<?>> testSpecifications = new LinkedList<>();
private final MigrationVersion[] testVersions;

public TestSpecifications(MigrationVersion... testVersions) {
checkArgument(
testVersions.length > 0,
"At least one test migration version should be specified.");
this.testVersions = testVersions;
}

/**
* Adds a test specification to be tested for all specified test versions.
*
* <p>This method adds the specification with pre-defined snapshot and data filenames,
* with the format "flink-&lt;testVersion&gt;-&lt;specName&gt;-&lt;data/snapshot&gt;",
* and each specification's test data count is assumed to always be 10.
*
* @param name test specification name.
* @param serializerClass class of the current serializer.
* @param snapshotClass class of the current serializer snapshot class.
* @param serializerProvider provider for an instance of the current serializer.
*
* @param <T> type of the test data.
*/
public <T> void add(
String name,
Class<? extends TypeSerializer> serializerClass,
Class<? extends TypeSerializerSnapshot> snapshotClass,
Supplier<? extends TypeSerializer<T>> serializerProvider) {
for (MigrationVersion testVersion : testVersions) {
testSpecifications.add(
TestSpecification.<T>builder(
getSpecNameForVersion(name, testVersion),
serializerClass,
snapshotClass,
testVersion)
.withNewSerializerProvider(serializerProvider)
.withSnapshotDataLocation(
String.format(DEFAULT_SNAPSHOT_FILENAME_FORMAT, testVersion, name))
.withTestData(
String.format(DEFAULT_TEST_DATA_FILENAME_FORMAT, testVersion, name),
DEFAULT_TEST_DATA_COUNT)
);
}
}

/**
* Adds a test specification to be tested for all specified test versions.
*
* @param name test specification name.
* @param serializerClass class of the current serializer.
* @param snapshotClass class of the current serializer snapshot class.
* @param serializerProvider provider for an instance of the current serializer.
* @param testSnapshotFilenameProvider provider for the filename of the test snapshot.
* @param testDataFilenameProvider provider for the filename of the test data.
* @param testDataCount expected number of records to be read in the test data files.
*
* @param <T> type of the test data.
*/
public <T> void add(
String name,
Class<? extends TypeSerializer> serializerClass,
Class<? extends TypeSerializerSnapshot> snapshotClass,
Supplier<? extends TypeSerializer<T>> serializerProvider,
TestResourceFilenameSupplier testSnapshotFilenameProvider,
TestResourceFilenameSupplier testDataFilenameProvider,
int testDataCount) {
for (MigrationVersion testVersion : testVersions) {
testSpecifications.add(
TestSpecification.<T>builder(
getSpecNameForVersion(name, testVersion),
serializerClass,
snapshotClass,
testVersion)
.withNewSerializerProvider(serializerProvider)
.withSnapshotDataLocation(testSnapshotFilenameProvider.get(testVersion))
.withTestData(testDataFilenameProvider.get(testVersion), testDataCount)
);
}
}

public Collection<TestSpecification<?>> get() {
return Collections.unmodifiableCollection(testSpecifications);
}

private static String getSpecNameForVersion(String baseName, MigrationVersion testVersion) {
return testVersion + "-" + baseName;
}
}

protected interface TestResourceFilenameSupplier {
String get(MigrationVersion testVersion);
}

// --------------------------------------------------------------------------------------------------------------
// Utilities
// --------------------------------------------------------------------------------------------------------------
Expand Down
Loading

0 comments on commit 76dd766

Please sign in to comment.