Skip to content

Commit

Permalink
[FLINK-6554] [core] Make CompatibilityResult options more explicitly …
Browse files Browse the repository at this point in the history
…defined

Previously, if a serializer determines that state migration needs to be
performed but could not provide a fallback convert deserializer, it
would use CompatibilityResult.requiresMigration(null).

This commit makes this option more explicit by having a
CompatibilityResult.requiresMigration() option that takes no parameters.
This improves how the user perceives the API without having to rely on
the Javadoc that it is allowed to have no fallback convert deserializer.

Consequently, when using
CompatibilityResult.requiresMigration(TypeDeserializer), the provided
argument cannot be null.

This closes apache#3886.
  • Loading branch information
tzulitai committed May 13, 2017
1 parent 347100d commit 947c44e
Show file tree
Hide file tree
Showing 33 changed files with 109 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c

return CompatibilityResult.compatible();
} else {
return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1520,7 +1520,7 @@ protected <N, S> ColumnFamilyHandle getColumnFamily(
restoredMetaInfo.getStateSerializerConfigSnapshot(),
newMetaInfo.getStateSerializer());

if (!namespaceCompatibility.requiresMigration() && !stateCompatibility.requiresMigration()) {
if (!namespaceCompatibility.isRequiresMigration() && !stateCompatibility.isRequiresMigration()) {
stateInfo.f1 = newMetaInfo;
return stateInfo.f0;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.api.common.typeutils;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.util.Preconditions;

/**
* A {@code CompatibilityResult} contains information about whether or not data migration
Expand All @@ -41,7 +42,7 @@ public final class CompatibilityResult<T> {
private final TypeDeserializer<T> convertDeserializer;

/**
* Returns a strategy that signals that the new serializer is compatible and no migration is required.
* Returns a result that signals that the new serializer is compatible and no migration is required.
*
* @return a result that signals migration is not required for the new serializer
*/
Expand All @@ -50,19 +51,32 @@ public static <T> CompatibilityResult<T> compatible() {
}

/**
* Returns a strategy that signals migration to be performed.
* Returns a result that signals migration to be performed, and in the case that the preceding serializer
* cannot be found or restored to read the previous data during migration, a provided convert deserializer
* can be used.
*
* <p>Furthermore, in the case that the preceding serializer cannot be found or restored to read the
* previous data during migration, a provided convert deserializer can be used (may be {@code null}
* if one cannot be provided).
* @param convertDeserializer the convert deserializer to use, in the case that the preceding serializer
* cannot be found.
*
* <p>In the case that the preceding serializer cannot be found and a convert deserializer is not
* provided, the migration will fail due to the incapability of reading previous data.
*
* @return a result that signals migration is necessary, possibly providing a convert deserializer.
* @return a result that signals migration is necessary, also providing a convert deserializer.
*/
public static <T> CompatibilityResult<T> requiresMigration(TypeDeserializer<T> convertDeserializer) {
return new CompatibilityResult<>(true, convertDeserializer);
Preconditions.checkNotNull(convertDeserializer, "Convert deserializer cannot be null.");

return new CompatibilityResult<>(true, Preconditions.checkNotNull(convertDeserializer));
}

/**
* Returns a result that signals migration to be performed. The migration will fail if the preceding
* serializer for the previous data cannot be found.
*
* <p>You can also provide a convert deserializer using {@link #requiresMigration(TypeDeserializer)},
* which will be used as a fallback resort in such cases.
*
* @return a result that signals migration is necessary, without providing a convert deserializer.
*/
public static <T> CompatibilityResult<T> requiresMigration() {
return new CompatibilityResult<>(true, null);
}

private CompatibilityResult(boolean requiresMigration, TypeDeserializer<T> convertDeserializer) {
Expand All @@ -74,7 +88,7 @@ public TypeDeserializer<T> getConvertDeserializer() {
return convertDeserializer;
}

public boolean requiresMigration() {
public boolean isRequiresMigration() {
return requiresMigration;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,19 @@ public abstract class TypeSerializer<T> implements TypeDeserializer<T>, Serializ
* migration needs to be performed, because this serializer is not compatible, or cannot be reconfigured to be
* compatible, for previous data. Furthermore, in the case that the preceding serializer cannot be found or
* restored to read the previous data to perform the migration, the provided convert deserializer can be
* used (may be {@code null} if one cannot be provided).</li>
* used as a fallback resort.</li>
*
* <li>{@link CompatibilityResult#requiresMigration()}: this signals Flink that migration needs to be
* performed, because this serializer is not compatible, or cannot be reconfigured to be compatible, for
* previous data. If the preceding serializer cannot be found (either its implementation changed or it was
* removed from the classpath) then the migration will fail due to incapability to read previous data.</li>
* </ul>
*
* @see CompatibilityResult
*
* @param configSnapshot configuration snapshot of a preceding serializer for the same managed state
*
* @return the determined compatibility result.
* @return the determined compatibility result (cannot be {@code null}).
*/
public abstract CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c
}
}

return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,8 +208,8 @@ public CompatibilityResult<C[]> ensureCompatibility(TypeSerializerConfigSnapshot
CompatibilityResult<C> compatResult = componentSerializer.ensureCompatibility(
config.getSingleNestedSerializerConfigSnapshot());

if (!compatResult.requiresMigration()) {
return CompatibilityResult.requiresMigration(null);
if (!compatResult.isRequiresMigration()) {
return CompatibilityResult.compatible();
} else if (compatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new GenericArraySerializer<>(
Expand All @@ -219,6 +219,6 @@ public CompatibilityResult<C[]> ensureCompatibility(TypeSerializerConfigSnapshot
}
}

return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -185,14 +185,14 @@ public CompatibilityResult<List<T>> ensureCompatibility(TypeSerializerConfigSnap
CompatibilityResult<T> compatResult = elementSerializer.ensureCompatibility(
((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerConfigSnapshot());

if (!compatResult.requiresMigration()) {
if (!compatResult.isRequiresMigration()) {
return CompatibilityResult.compatible();
} else if (compatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
new ListSerializer<>(new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer())));
}
}

return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,7 @@ public CompatibilityResult<Map<K, V>> ensureCompatibility(TypeSerializerConfigSn
CompatibilityResult<K> keyCompatResult = keySerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[0]);
CompatibilityResult<V> valueCompatResult = valueSerializer.ensureCompatibility(keyValueSerializerConfigSnapshots[1]);

if (!keyCompatResult.requiresMigration() && !valueCompatResult.requiresMigration()) {
if (!keyCompatResult.isRequiresMigration() && !valueCompatResult.isRequiresMigration()) {
return CompatibilityResult.compatible();
} else if (keyCompatResult.getConvertDeserializer() != null && valueCompatResult.getConvertDeserializer() != null) {
return CompatibilityResult.requiresMigration(
Expand All @@ -226,6 +226,6 @@ public CompatibilityResult<Map<K, V>> ensureCompatibility(TypeSerializerConfigSn
}
}

return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ && isCompatibleSerializationFormatIdentifier(

return CompatibilityResult.compatible();
} else {
return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c

for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
if (reconfiguredRegistrationEntry.getValue().isDummy()) {
return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}
}

Expand All @@ -249,7 +249,7 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c

// ends up here if the preceding serializer is not
// the ValueSerializer, or serialized data type has changed
return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}

public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c
&& valueClass.equals(((CopyableValueSerializerConfigSnapshot) configSnapshot).getTypeClass())) {
return CompatibilityResult.compatible();
} else {
return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ public CompatibilityResult<Either<L, R>> ensureCompatibility(TypeSerializerConfi
CompatibilityResult<L> leftCompatResult = leftSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[0]);
CompatibilityResult<R> rightCompatResult = rightSerializer.ensureCompatibility(leftRightSerializerConfigSnapshots[1]);

if (!leftCompatResult.requiresMigration() && !rightCompatResult.requiresMigration()) {
if (!leftCompatResult.isRequiresMigration() && !rightCompatResult.isRequiresMigration()) {
return CompatibilityResult.compatible();
} else {
if (leftCompatResult.getConvertDeserializer() != null && rightCompatResult.getConvertDeserializer() != null) {
Expand All @@ -219,6 +219,6 @@ public CompatibilityResult<Either<L, R>> ensureCompatibility(TypeSerializerConfi
}
}

return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -580,13 +580,13 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c
reorderedFields[i] = fieldToConfigSnapshotEntry.getKey();

compatResult = fieldSerializers[fieldIndex].ensureCompatibility(fieldToConfigSnapshotEntry.getValue());
if (compatResult.requiresMigration()) {
return CompatibilityResult.requiresMigration(null);
if (compatResult.isRequiresMigration()) {
return CompatibilityResult.requiresMigration();
} else {
reorderedFieldSerializers[i] = fieldSerializers[fieldIndex];
}
} else {
return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}

i++;
Expand Down Expand Up @@ -618,8 +618,8 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c
for (TypeSerializerConfigSnapshot previousRegisteredSerializerConfig : previousRegistrations.values()) {
// check compatibility of subclass serializer
compatResult = reorderedRegisteredSubclassSerializers[i].ensureCompatibility(previousRegisteredSerializerConfig);
if (compatResult.requiresMigration()) {
return CompatibilityResult.requiresMigration(null);
if (compatResult.isRequiresMigration()) {
return CompatibilityResult.requiresMigration();
}

i++;
Expand All @@ -638,8 +638,8 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c

// check compatibility of cached subclass serializer
compatResult = cachedSerializer.ensureCompatibility(previousCachedEntry.getValue());
if (compatResult.requiresMigration()) {
return CompatibilityResult.requiresMigration(null);
if (compatResult.isRequiresMigration()) {
return CompatibilityResult.requiresMigration();
} else {
rebuiltCache.put(previousCachedEntry.getKey(), cachedSerializer);
}
Expand All @@ -661,7 +661,7 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c
}
}

return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}

public static final class PojoSerializerConfigSnapshot<T> extends GenericTypeSerializerConfigSnapshot<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,12 +270,12 @@ public CompatibilityResult<Row> ensureCompatibility(TypeSerializerConfigSnapshot
CompatibilityResult<?> compatResult;
for (int i = 0; i < fieldSerializers.length; i++) {
compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
if (compatResult.requiresMigration()) {
if (compatResult.isRequiresMigration()) {
requireMigration = true;

if (compatResult.getConvertDeserializer() == null) {
// one of the field serializers cannot provide a fallback deserializer
return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
} else {
convertDeserializers[i] =
new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer());
Expand All @@ -291,7 +291,7 @@ public CompatibilityResult<Row> ensureCompatibility(TypeSerializerConfigSnapshot
}
}

return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}

public static final class RowSerializerConfigSnapshot extends CompositeTypeSerializerConfigSnapshot {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,8 +145,8 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c
CompatibilityResult compatResult;
for (int i = 0; i < fieldSerializers.length; i++) {
compatResult = fieldSerializers[i].ensureCompatibility(fieldSerializerConfigSnapshots[i]);
if (compatResult.requiresMigration()) {
return CompatibilityResult.requiresMigration(null);
if (compatResult.isRequiresMigration()) {
return CompatibilityResult.requiresMigration();
}
}

Expand All @@ -155,6 +155,6 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c
}
}

return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c
}
}

return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}

public static class ValueSerializerConfigSnapshot<T extends Value> extends KryoRegistrationSerializerConfigSnapshot<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c
"proper serializer, because its previous serializer cannot be loaded or is no " +
"longer valid but a new serializer is not available", reconfiguredRegistrationEntry.getKey());

return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}
}

Expand All @@ -410,7 +410,7 @@ public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot c
}
}

return CompatibilityResult.requiresMigration(null);
return CompatibilityResult.requiresMigration();
}

public static final class KryoSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,11 +121,11 @@ public void testSnapshotConfigurationAndReconfigure() throws Exception {
}

CompatibilityResult strategy = getSerializer().ensureCompatibility(restoredConfig);
assertFalse(strategy.requiresMigration());
assertFalse(strategy.isRequiresMigration());

// also verify that the serializer's reconfigure implementation detects incompatibility
strategy = getSerializer().ensureCompatibility(new TestIncompatibleSerializerConfigSnapshot());
assertTrue(strategy.requiresMigration());
assertTrue(strategy.isRequiresMigration());
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public void testReconfiguration() {
// reconfigure and verify compatibility
CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(
new EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, mockPreviousOrder));
assertFalse(compatResult.requiresMigration());
assertFalse(compatResult.isRequiresMigration());

// after reconfiguration, the order should be first the original BAR, PAULA, NATHANIEL,
// followed by the "new enum constants" FOO, PETER, EMMA
Expand Down Expand Up @@ -107,7 +107,7 @@ public void testConfigurationSnapshotSerialization() throws Exception {
}

CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(restoredConfig);
assertFalse(compatResult.requiresMigration());
assertFalse(compatResult.isRequiresMigration());

assertEquals(PublicEnum.FOO.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.FOO).intValue());
assertEquals(PublicEnum.BAR.ordinal(), serializer.getValueToOrdinal().get(PublicEnum.BAR).intValue());
Expand Down Expand Up @@ -163,7 +163,7 @@ public void testSerializeReconfiguredEnumSerializer() throws Exception {
// reconfigure and verify compatibility
CompatibilityResult<PublicEnum> compatResult = serializer.ensureCompatibility(
new EnumSerializer.EnumSerializerConfigSnapshot<>(PublicEnum.class, mockPreviousOrder));
assertFalse(compatResult.requiresMigration());
assertFalse(compatResult.isRequiresMigration());

// serialize and deserialize again the serializer
byte[] serializedSerializer = InstantiationUtil.serializeObject(serializer);
Expand Down
Loading

0 comments on commit 947c44e

Please sign in to comment.