Skip to content

Commit

Permalink
[FLINK-17520] [test] Simplify CompositeTypeSerializerSnapshotTest
Browse files Browse the repository at this point in the history
The unit tests in CompositeTypeSerializerSnapshotTest were previously
over-engineered w.r.t. how to mock the result of a compatibility check.
It was doing string equals checks, whereas it is simple enough to just
let the new serializer wrap a mock compat result and just return that
for the compatibility checks.
  • Loading branch information
tzulitai committed May 19, 2020
1 parent d2cfc43 commit 6cda904
Showing 1 changed file with 29 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import java.io.IOException;
import java.util.Arrays;

import static org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot.OuterSchemaCompatibility;

/**
* Test suite for the {@link CompositeTypeSerializerSnapshot}.
*/
Expand All @@ -41,7 +43,6 @@ public class CompositeTypeSerializerSnapshotTest {

@Test
public void testIncompatiblePrecedence() throws IOException {
final String OUTER_CONFIG = "outer-config";
final TypeSerializer<?>[] testNestedSerializers = {
new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION),
Expand All @@ -53,15 +54,13 @@ public void testIncompatiblePrecedence() throws IOException {
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
testNestedSerializers,
testNestedSerializers,
OUTER_CONFIG,
OUTER_CONFIG);
OuterSchemaCompatibility.COMPATIBLE_AS_IS);

Assert.assertTrue(compatibility.isIncompatible());
}

@Test
public void testCompatibleAfterMigrationPrecedence() throws IOException {
final String OUTER_CONFIG = "outer-config";
TypeSerializer<?>[] testNestedSerializers = {
new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION),
Expand All @@ -73,15 +72,13 @@ public void testCompatibleAfterMigrationPrecedence() throws IOException {
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
testNestedSerializers,
testNestedSerializers,
OUTER_CONFIG,
OUTER_CONFIG);
OuterSchemaCompatibility.COMPATIBLE_AS_IS);

Assert.assertTrue(compatibility.isCompatibleAfterMigration());
}

@Test
public void testCompatibleWithReconfiguredSerializerPrecedence() throws IOException {
final String OUTER_CONFIG = "outer-config";
TypeSerializer<?>[] testNestedSerializers = {
new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
new NestedSerializer(TargetCompatibility.COMPATIBLE_WITH_RECONFIGURED_SERIALIZER),
Expand All @@ -92,8 +89,7 @@ public void testCompatibleWithReconfiguredSerializerPrecedence() throws IOExcept
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
testNestedSerializers,
testNestedSerializers,
OUTER_CONFIG,
OUTER_CONFIG);
OuterSchemaCompatibility.COMPATIBLE_AS_IS);

Assert.assertTrue(compatibility.isCompatibleWithReconfiguredSerializer());

Expand All @@ -108,7 +104,6 @@ public void testCompatibleWithReconfiguredSerializerPrecedence() throws IOExcept

@Test
public void testCompatibleAsIsPrecedence() throws IOException {
final String OUTER_CONFIG = "outer-config";
TypeSerializer<?>[] testNestedSerializers = {
new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
Expand All @@ -118,16 +113,13 @@ public void testCompatibleAsIsPrecedence() throws IOException {
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
testNestedSerializers,
testNestedSerializers,
OUTER_CONFIG,
OUTER_CONFIG);
OuterSchemaCompatibility.COMPATIBLE_AS_IS);

Assert.assertTrue(compatibility.isCompatibleAsIs());
}

@Test
public void testOuterSnapshotCompatibilityPrecedence() throws IOException {
final String INIT_OUTER_CONFIG = "outer-config";
final String INCOMPAT_OUTER_CONFIG = "incompat-outer-config";
TypeSerializer<?>[] testNestedSerializers = {
new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
};
Expand All @@ -136,8 +128,7 @@ public void testOuterSnapshotCompatibilityPrecedence() throws IOException {
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
testNestedSerializers,
testNestedSerializers,
INIT_OUTER_CONFIG,
INCOMPAT_OUTER_CONFIG);
OuterSchemaCompatibility.INCOMPATIBLE);

// even though nested serializers are compatible, incompatibility of the outer
// snapshot should have higher precedence in the final result
Expand All @@ -146,8 +137,6 @@ public void testOuterSnapshotCompatibilityPrecedence() throws IOException {

@Test
public void testNestedFieldSerializerArityMismatchPrecedence() throws IOException {
final String OUTER_CONFIG = "outer-config";

final TypeSerializer<?>[] initialNestedSerializers = {
new NestedSerializer(TargetCompatibility.COMPATIBLE_AS_IS),
};
Expand All @@ -162,8 +151,7 @@ public void testNestedFieldSerializerArityMismatchPrecedence() throws IOExceptio
snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
initialNestedSerializers,
newNestedSerializers,
OUTER_CONFIG,
OUTER_CONFIG);
OuterSchemaCompatibility.COMPATIBLE_AS_IS);

// arity mismatch in the nested serializers should return incompatible as the result
Assert.assertTrue(compatibility.isIncompatible());
Expand All @@ -172,10 +160,9 @@ public void testNestedFieldSerializerArityMismatchPrecedence() throws IOExceptio
private TypeSerializerSchemaCompatibility<String> snapshotCompositeSerializerAndGetSchemaCompatibilityAfterRestore(
TypeSerializer<?>[] initialNestedSerializers,
TypeSerializer<?>[] newNestedSerializer,
String initialOuterConfiguration,
String newOuterConfiguration) throws IOException {
OuterSchemaCompatibility mockOuterSchemaCompatibilityResult) throws IOException {
TestCompositeTypeSerializer testSerializer =
new TestCompositeTypeSerializer(initialOuterConfiguration, initialNestedSerializers);
new TestCompositeTypeSerializer(initialNestedSerializers);

TypeSerializerSnapshot<String> testSerializerSnapshot = testSerializer.snapshotConfiguration();

Expand All @@ -187,7 +174,7 @@ private TypeSerializerSchemaCompatibility<String> snapshotCompositeSerializerAnd
in, Thread.currentThread().getContextClassLoader());

TestCompositeTypeSerializer newTestSerializer =
new TestCompositeTypeSerializer(newOuterConfiguration, newNestedSerializer);
new TestCompositeTypeSerializer(mockOuterSchemaCompatibilityResult, newNestedSerializer);
return testSerializerSnapshot.resolveSchemaCompatibility(newTestSerializer);
}

Expand All @@ -205,7 +192,7 @@ public void testRestoreCompositeTypeSerializer() throws IOException {
new NestedSerializer(TargetCompatibility.COMPATIBLE_AFTER_MIGRATION)
};

TestCompositeTypeSerializer testSerializer = new TestCompositeTypeSerializer("outer-config", testNestedSerializers);
TestCompositeTypeSerializer testSerializer = new TestCompositeTypeSerializer(testNestedSerializers);

TypeSerializerSnapshot<String> testSerializerSnapshot = testSerializer.snapshotConfiguration();

Expand Down Expand Up @@ -238,19 +225,24 @@ public static class TestCompositeTypeSerializer extends TypeSerializer<String> {

private static final StringSerializer delegateSerializer = StringSerializer.INSTANCE;

private final String outerConfiguration;
private final OuterSchemaCompatibility mockOuterSchemaCompatibility;

private final TypeSerializer<?>[] nestedSerializers;

TestCompositeTypeSerializer(TypeSerializer<?>[] nestedSerializers) {
this.mockOuterSchemaCompatibility = OuterSchemaCompatibility.COMPATIBLE_AS_IS;
this.nestedSerializers = nestedSerializers;
}

TestCompositeTypeSerializer(
String outerConfiguration,
OuterSchemaCompatibility mockOuterSchemaCompatibility,
TypeSerializer<?>[] nestedSerializers) {
this.outerConfiguration = outerConfiguration;
this.mockOuterSchemaCompatibility = mockOuterSchemaCompatibility;
this.nestedSerializers = nestedSerializers;
}

public String getOuterConfiguration() {
return outerConfiguration;
public OuterSchemaCompatibility getMockOuterSchemaCompatibility() {
return mockOuterSchemaCompatibility;
}

TypeSerializer<?>[] getNestedSerializers() {
Expand Down Expand Up @@ -335,20 +327,20 @@ public int hashCode() {
*/
public static class TestCompositeTypeSerializerSnapshot extends CompositeTypeSerializerSnapshot<String, TestCompositeTypeSerializer> {

private String outerConfiguration;
private OuterSchemaCompatibility mockOuterSchemaCompatibility;

public TestCompositeTypeSerializerSnapshot() {
super(TestCompositeTypeSerializer.class);
}

TestCompositeTypeSerializerSnapshot(TestCompositeTypeSerializer serializer) {
super(serializer);
this.outerConfiguration = serializer.getOuterConfiguration();
this.mockOuterSchemaCompatibility = serializer.getMockOuterSchemaCompatibility();
}

@Override
protected TestCompositeTypeSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
return new TestCompositeTypeSerializer(outerConfiguration, nestedSerializers);
return new TestCompositeTypeSerializer(mockOuterSchemaCompatibility, nestedSerializers);
}

@Override
Expand All @@ -358,18 +350,18 @@ protected TypeSerializer<?>[] getNestedSerializers(TestCompositeTypeSerializer o

@Override
protected void writeOuterSnapshot(DataOutputView out) throws IOException {
out.writeUTF(outerConfiguration);
out.writeInt(mockOuterSchemaCompatibility.ordinal());
}

@Override
public void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
Assert.assertEquals(getCurrentOuterSnapshotVersion(), readOuterSnapshotVersion);
this.outerConfiguration = in.readUTF();
this.mockOuterSchemaCompatibility = OuterSchemaCompatibility.values()[in.readInt()];
}

@Override
protected boolean isOuterSnapshotCompatible(TestCompositeTypeSerializer newSerializer) {
return outerConfiguration.equals(newSerializer.getOuterConfiguration());
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(TestCompositeTypeSerializer newSerializer) {
return newSerializer.getMockOuterSchemaCompatibility();
}

@Override
Expand Down

0 comments on commit 6cda904

Please sign in to comment.