Skip to content

Commit

Permalink
[FLINK-10709] [core] Remove NS generic from TypeSerializerSchemaCompa…
Browse files Browse the repository at this point in the history
…tibility
  • Loading branch information
StephanEwen authored and tzulitai committed Oct 30, 2018
1 parent b0b10ed commit 17f311a
Show file tree
Hide file tree
Showing 22 changed files with 57 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public TypeSerializer<T> restoreSerializer() {
}

@Override
public <NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(NS newSerializer) {
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
// if there is no configuration snapshot to check against,
// then we can only assume that the new serializer is compatible as is
return TypeSerializerSchemaCompatibility.compatibleAsIs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ public static <T> CompatibilityResult<T> resolveCompatibilityResult(
TypeSerializerSnapshot<T> precedingSerializerConfigSnapshot,
TypeSerializer<T> newSerializer) {

TypeSerializerSchemaCompatibility<T, TypeSerializer<T>> compatibility =
TypeSerializerSchemaCompatibility<T> compatibility =
precedingSerializerConfigSnapshot.resolveSchemaCompatibility(newSerializer);

// everything except "compatible" maps to "requires migration".
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ protected CompositeSerializerSnapshot(TypeSerializer<?>... serializers) {

protected abstract TypeSerializer<?>[] getNestedSerializersFromSerializer(S serializer);

protected abstract TypeSerializerSchemaCompatibility<T, S> outerCompatibility(S serializer);
protected abstract TypeSerializerSchemaCompatibility<T> outerCompatibility(S serializer);

protected abstract Class<?> outerSerializerType();

Expand Down Expand Up @@ -140,8 +140,7 @@ public TypeSerializer<T> restoreSerializer() {
return createSerializer(nestedSerializers);
}

public <NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS>
resolveSchemaCompatibility(NS newSerializer) {
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {

// class compatibility
if (!outerSerializerType().isInstance(newSerializer)) {
Expand All @@ -152,7 +151,7 @@ public TypeSerializer<T> restoreSerializer() {

@SuppressWarnings("unchecked")
final S castedSerializer = (S) newSerializer;
final TypeSerializerSchemaCompatibility<T, S> outerCompatibility = outerCompatibility(castedSerializer);
final TypeSerializerSchemaCompatibility<T> outerCompatibility = outerCompatibility(castedSerializer);

if (outerCompatibility.isIncompatible()) {
return TypeSerializerSchemaCompatibility.incompatible();
Expand All @@ -165,7 +164,7 @@ public TypeSerializer<T> restoreSerializer() {

boolean nestedSerializerRequiresMigration = false;
for (int i = 0; i < nestedSnapshots.length; i++) {
TypeSerializerSchemaCompatibility<?, ?> compatibility =
TypeSerializerSchemaCompatibility<?> compatibility =
resolveCompatibility(nestedSerializers[i], nestedSnapshots[i]);

if (compatibility.isIncompatible()) {
Expand All @@ -189,11 +188,11 @@ public TypeSerializer<T> restoreSerializer() {
* Utility method to conjure up a new scope for the generic parameters.
*/
@SuppressWarnings("unchecked")
private static <E, X extends TypeSerializer<E>> TypeSerializerSchemaCompatibility<E, X> resolveCompatibility(
private static <E> TypeSerializerSchemaCompatibility<E> resolveCompatibility(
TypeSerializer<?> serializer,
TypeSerializerSnapshot<?> snapshot) {

X typedSerializer = (X) serializer;
TypeSerializer<E> typedSerializer = (TypeSerializer<E>) serializer;
TypeSerializerSnapshot<E> typedSnapshot = (TypeSerializerSnapshot<E>) snapshot;

return typedSnapshot.resolveSchemaCompatibility(typedSerializer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,8 @@ else if (serializer instanceof UnloadableDummyTypeSerializer) {
}

@Override
public <NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(
NS newSerializer) {
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
TypeSerializer<T> newSerializer) {

// in prior versions, the compatibility check was in the serializer itself, so we
// delegate this call to the serializer.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,20 +27,18 @@
* A {@code TypeSerializerSchemaCompatibility} represents information about whether or not a {@link TypeSerializer}
* can be safely used to read data written by a previous type serializer.
*
* <p>Typically, the compatibility of the new serializer is resolved by checking it against the snapshotted
* {@link TypeSerializerConfigSnapshot} of the previous serializer. Depending on the type of the
* <p>Typically, the compatibility of the new serializer is resolved by checking the serializer against the
* {@link TypeSerializerSnapshot} of the previous serializer. Depending on the type of the
* resolved compatibility result, migration (i.e., reading bytes with the previous serializer and then writing
* it again with the new serializer) may be required before the new serializer can be used.
*
* @param <T> the type of data serialized by the serializer that was being checked.
*
* @param <NS> the type of serializer that was being checked.
*
* @see TypeSerializer
* @see TypeSerializerConfigSnapshot#resolveSchemaCompatibility(TypeSerializer)
* @see TypeSerializerSnapshot#resolveSchemaCompatibility(TypeSerializer)
*/
@PublicEvolving
public class TypeSerializerSchemaCompatibility<T, NS extends TypeSerializer<T>> {
public class TypeSerializerSchemaCompatibility<T> {

/**
* Enum for the type of the compatibility.
Expand Down Expand Up @@ -77,7 +75,7 @@ enum Type {
*
* @return a result that indicates migration is not required for the new serializer.
*/
public static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> compatibleAsIs() {
public static <T> TypeSerializerSchemaCompatibility<T> compatibleAsIs() {
return new TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AS_IS, null);
}

Expand All @@ -87,7 +85,7 @@ public static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibilit
*
* @return a result that indicates that the new serializer can be used after migrating the written bytes.
*/
public static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> compatibleAfterMigration() {
public static <T> TypeSerializerSchemaCompatibility<T> compatibleAfterMigration() {
return new TypeSerializerSchemaCompatibility<>(Type.COMPATIBLE_AFTER_MIGRATION, null);
}

Expand All @@ -101,11 +99,11 @@ public static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibilit
*
* @return a result that indicates incompatibility between the new and previous serializer.
*/
public static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> incompatible() {
public static <T> TypeSerializerSchemaCompatibility<T> incompatible() {
return new TypeSerializerSchemaCompatibility<>(Type.INCOMPATIBLE, null);
}

private TypeSerializerSchemaCompatibility(Type resultType, @Nullable NS reconfiguredNewSerializer) {
private TypeSerializerSchemaCompatibility(Type resultType, @Nullable TypeSerializer<T> reconfiguredNewSerializer) {
this.resultType = Preconditions.checkNotNull(resultType);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,11 +126,10 @@ public interface TypeSerializerSnapshot<T> {
* the format during the restore operation.
*
* @param newSerializer the new serializer to check.
* @param <NS> the type of the new serializer
*
* @return the serializer compatibility result.
*/
<NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(NS newSerializer);
TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer);

// ------------------------------------------------------------------------
// read / write utilities
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ protected TypeSerializer<?>[] getNestedSerializersFromSerializer(GenericArraySer
}

@Override
protected TypeSerializerSchemaCompatibility<C[], GenericArraySerializer<C>>
protected TypeSerializerSchemaCompatibility<C[]>
outerCompatibility(GenericArraySerializer<C> serializer) {

return serializer.getComponentClass() == componentClass ?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,11 @@ public int hashCode() {
}

// --------------------------------------------------------------------------------------------
// Serializer configuration snapshotting & compatibility
// Serializer configuration snapshot & compatibility
// --------------------------------------------------------------------------------------------

@Override
public CollectionSerializerConfigSnapshot<List<T>, T> snapshotConfiguration() {
public TypeSerializerSnapshot<List<T>> snapshotConfiguration() {
return new CollectionSerializerConfigSnapshot<>(elementSerializer);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ public int getVersion() {

@Override
@SuppressWarnings("unchecked")
public <NS extends TypeSerializer<Either<L, R>>> TypeSerializerSchemaCompatibility<Either<L, R>, NS>
resolveSchemaCompatibility(NS newSerializer) {
public TypeSerializerSchemaCompatibility<Either<L, R>> resolveSchemaCompatibility(
TypeSerializer<Either<L, R>> newSerializer) {

// this class was shared between the Java Either Serializer and the
// Scala Either serializer
Expand All @@ -61,7 +61,7 @@ public int getVersion() {
}

@SuppressWarnings("unchecked")
private <NS extends TypeSerializer<Either<L, R>>> TypeSerializerSchemaCompatibility<Either<L, R>, NS> checkJavaSerializerCompatibility(
private TypeSerializerSchemaCompatibility<Either<L, R>> checkJavaSerializerCompatibility(
EitherSerializer<L, R> serializer) {

TypeSerializer<L> leftSerializer = serializer.getLeftSerializer();
Expand All @@ -70,8 +70,8 @@ private <NS extends TypeSerializer<Either<L, R>>> TypeSerializerSchemaCompatibil
TypeSerializerSnapshot<L> leftSnapshot = (TypeSerializerSnapshot<L>) getNestedSerializersAndConfigs().get(0).f1;
TypeSerializerSnapshot<R> rightSnapshot = (TypeSerializerSnapshot<R>) getNestedSerializersAndConfigs().get(1).f1;

TypeSerializerSchemaCompatibility<?, ?> leftCompatibility = leftSnapshot.resolveSchemaCompatibility(leftSerializer);
TypeSerializerSchemaCompatibility<?, ?> rightCompatibility = rightSnapshot.resolveSchemaCompatibility(rightSerializer);
TypeSerializerSchemaCompatibility<?> leftCompatibility = leftSnapshot.resolveSchemaCompatibility(leftSerializer);
TypeSerializerSchemaCompatibility<?> rightCompatibility = rightSnapshot.resolveSchemaCompatibility(rightSerializer);

if (leftCompatibility.isCompatibleAsIs() && rightCompatibility.isCompatibleAsIs()) {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ protected TypeSerializer<?>[] getNestedSerializersFromSerializer(EitherSerialize
}

@Override
protected TypeSerializerSchemaCompatibility<Either<L, R>, EitherSerializer<L, R>> outerCompatibility(EitherSerializer<L, R> serializer) {
protected TypeSerializerSchemaCompatibility<Either<L, R>> outerCompatibility(EitherSerializer<L, R> serializer) {
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ public void testSnapshotConfigurationAndReconfigure() throws Exception {
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), getSerializer());
}

TypeSerializerSchemaCompatibility<T, ? extends TypeSerializer<T>> strategy = restoredConfig.resolveSchemaCompatibility(getSerializer());
TypeSerializerSchemaCompatibility<T> strategy = restoredConfig.resolveSchemaCompatibility(getSerializer());
assertTrue(strategy.isCompatibleAsIs());

TypeSerializer<T> restoreSerializer = restoredConfig.restoreSerializer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void serializerSnapshotRestoresCurrentSerializer() {
public void snapshotIsCompatibleWithTheCurrentSerializer() {
TypeSerializerSnapshot<ElementT> snapshot = snapshotUnderTest();

TypeSerializerSchemaCompatibility<ElementT, TypeSerializer<ElementT>> result = snapshot.resolveSchemaCompatibility(testSpecification.createSerializer());
TypeSerializerSchemaCompatibility<ElementT> result = snapshot.resolveSchemaCompatibility(testSpecification.createSerializer());

assertTrue(result.isCompatibleAsIs());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public void testConfigurationSnapshotSerialization() throws Exception {
new DataInputViewStreamWrapper(in), Thread.currentThread().getContextClassLoader(), serializer);
}

TypeSerializerSchemaCompatibility<PublicEnum, ?> compatResult = restoredConfig.resolveSchemaCompatibility(serializer);
TypeSerializerSchemaCompatibility<PublicEnum> compatResult = restoredConfig.resolveSchemaCompatibility(serializer);
assertTrue(compatResult.isCompatibleAsIs());

assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public void testReconfigureWithDifferentPojoType() throws Exception {
}

@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<SubTestUserClassA, ?> compatResult =
TypeSerializerSchemaCompatibility<SubTestUserClassA> compatResult =
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer2);
assertTrue(compatResult.isIncompatible());
}
Expand Down Expand Up @@ -345,7 +345,7 @@ public void testReconfigureDifferentSubclassRegistrationOrder() throws Exception
}

@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<TestUserClass, ?> compatResult =
TypeSerializerSchemaCompatibility<TestUserClass> compatResult =
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
assertTrue(compatResult.isCompatibleAsIs());

Expand Down Expand Up @@ -392,7 +392,7 @@ public void testReconfigureRepopulateNonregisteredSubclassSerializerCache() thro

// reconfigure - check reconfiguration result and that subclass serializer cache is repopulated
@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<TestUserClass, ?> compatResult =
TypeSerializerSchemaCompatibility<TestUserClass> compatResult =
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
assertTrue(compatResult.isCompatibleAsIs());
assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
Expand Down Expand Up @@ -457,7 +457,7 @@ public void testReconfigureWithPreviouslyNonregisteredSubclasses() throws Except
// 1) subclass serializer cache is repopulated
// 2) registrations also contain the now registered subclasses
@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<TestUserClass, ?> compatResult =
TypeSerializerSchemaCompatibility<TestUserClass> compatResult =
pojoSerializerConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
assertTrue(compatResult.isCompatibleAsIs());
assertEquals(2, pojoSerializer.getSubclassSerializerCache().size());
Expand Down Expand Up @@ -535,7 +535,7 @@ public void testReconfigureWithDifferentFieldOrder() throws Exception {
new HashMap<>()); // empty; irrelevant for this test

// reconfigure - check reconfiguration result and that fields are reordered to the previous order
TypeSerializerSchemaCompatibility<TestUserClass, ?> compatResult =
TypeSerializerSchemaCompatibility<TestUserClass> compatResult =
mockPreviousConfigSnapshot.resolveSchemaCompatibility(pojoSerializer);
assertTrue(compatResult.isCompatibleAsIs());
int i = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ public void testMigrationStrategyForRemovedAvroDependency() throws Exception {
}

@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<TestClass, ?> compatResult =
TypeSerializerSchemaCompatibility<TestClass> compatResult =
kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializerForA);
assertTrue(compatResult.isCompatibleAsIs());
}
Expand Down Expand Up @@ -114,7 +114,7 @@ public void testMigrationStrategyWithDifferentKryoType() throws Exception {
}

@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<TestClassB, ?> compatResult =
TypeSerializerSchemaCompatibility<TestClassB> compatResult =
kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializerForB);
assertTrue(compatResult.isIncompatible());
}
Expand Down Expand Up @@ -277,7 +277,7 @@ public void testMigrationStrategyForDifferentRegistrationOrder() throws Exceptio

// reconfigure - check reconfiguration result and that registration id remains the same
@SuppressWarnings("unchecked")
TypeSerializerSchemaCompatibility<TestClass, ?> compatResult =
TypeSerializerSchemaCompatibility<TestClass> compatResult =
kryoSerializerConfigSnapshot.resolveSchemaCompatibility(kryoSerializer);
assertTrue(compatResult.isCompatibleAsIs());
assertEquals(testClassId, kryoSerializer.getKryo().getRegistration(TestClass.class).getId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,8 @@ private void readV2(DataInputView in, ClassLoader userCodeClassLoader) throws IO
}

@Override
public <NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS>
resolveSchemaCompatibility(NS newSerializer) {
public TypeSerializerSchemaCompatibility<T>
resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {
if (!(newSerializer instanceof AvroSerializer)) {
return TypeSerializerSchemaCompatibility.incompatible();
}
Expand Down Expand Up @@ -143,7 +143,7 @@ public TypeSerializer<T> restoreSerializer() {
* (@see <a href="https://avro.apache.org/docs/current/spec.html#Schema+Resolution">Schema Resolution</a>).
*/
@VisibleForTesting
static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(
static <T> TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(
Schema writerSchema,
Schema readerSchema) {

Expand All @@ -157,7 +157,7 @@ static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS
return avroCompatibilityToFlinkCompatibility(compatibility);
}

private static <T, NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS>
private static <T> TypeSerializerSchemaCompatibility<T>
avroCompatibilityToFlinkCompatibility(SchemaPairCompatibility compatibility) {

switch (compatibility.getType()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K,
TypeSerializerSnapshot<K> keySerializerSnapshot = Preconditions.checkNotNull(
(TypeSerializerSnapshot<K>) metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.KEY_SERIALIZER));

TypeSerializerSchemaCompatibility<K, ?> keyCompatibility =
TypeSerializerSchemaCompatibility<K> keyCompatibility =
keySerializerSnapshot.resolveSchemaCompatibility(broadcastStateKeySerializer);
if (keyCompatibility.isIncompatible()) {
throw new StateMigrationException("The new key serializer for broadcast state must not be incompatible.");
Expand All @@ -241,7 +241,7 @@ public <K, V> BroadcastState<K, V> getBroadcastState(final MapStateDescriptor<K,
TypeSerializerSnapshot<V> valueSerializerSnapshot = Preconditions.checkNotNull(
(TypeSerializerSnapshot<V>) metaInfoSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));

TypeSerializerSchemaCompatibility<V, ?> valueCompatibility =
TypeSerializerSchemaCompatibility<V> valueCompatibility =
valueSerializerSnapshot.resolveSchemaCompatibility(broadcastStateValueSerializer);
if (valueCompatibility.isIncompatible()) {
throw new StateMigrationException("The new value serializer for broadcast state must not be incompatible.");
Expand Down Expand Up @@ -601,7 +601,7 @@ private <S> ListState<S> getListState(
TypeSerializerSnapshot<S> stateSerializerSnapshot = Preconditions.checkNotNull(
(TypeSerializerSnapshot<S>) restoredSnapshot.getTypeSerializerConfigSnapshot(StateMetaInfoSnapshot.CommonSerializerKeys.VALUE_SERIALIZER));

TypeSerializerSchemaCompatibility<S, ?> stateCompatibility =
TypeSerializerSchemaCompatibility<S> stateCompatibility =
stateSerializerSnapshot.resolveSchemaCompatibility(newPartitionStateSerializer);
if (stateCompatibility.isIncompatible()) {
throw new StateMigrationException("The new state serializer for operator state must not be incompatible.");
Expand Down
Loading

0 comments on commit 17f311a

Please sign in to comment.