Skip to content

Commit

Permalink
[FLINK-17520] [core] Extend CompositeTypeSerializerSnapshot to allow …
Browse files Browse the repository at this point in the history
…migration based on outer snapshot

This commit deprecates isOuterSnapshotCompatible, which only allows
signaling if the outer config is either compatible or not compatible, in
favor of a new resolveOuterSchemaCompatibility method which additionally
allows the user to signal migration.

The change is backwards compatible, and allows subclasses that still
only implement isOuterSnapshotCompatible to work as is.
  • Loading branch information
tzulitai committed May 19, 2020
1 parent 1c810e4 commit d2cfc43
Showing 1 changed file with 63 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@
*
* <p>Serializers that do have some outer snapshot needs to make sure to implement the methods
* {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, and
* {@link #isOuterSnapshotCompatible(TypeSerializer)} when using this class as the base for its serializer snapshot
* class. By default, the base implementations of these methods are empty, i.e. this class assumes that
* subclasses do not have any outer snapshot that needs to be persisted.
* {@link #resolveOuterSchemaCompatibility(TypeSerializer)} (TypeSerializer)} when using this class as the base
* for its serializer snapshot class. By default, the base implementations of these methods are empty, i.e. this
* class assumes that subclasses do not have any outer snapshot that needs to be persisted.
*
* <h2>Snapshot Versioning</h2>
*
Expand Down Expand Up @@ -82,6 +82,15 @@
@PublicEvolving
public abstract class CompositeTypeSerializerSnapshot<T, S extends TypeSerializer<T>> implements TypeSerializerSnapshot<T> {

/**
* Indicates schema compatibility of the serializer configuration persisted as the outer snapshot.
*/
protected enum OuterSchemaCompatibility {
COMPATIBLE_AS_IS,
COMPATIBLE_AFTER_MIGRATION,
INCOMPATIBLE
}

/** Magic number for integrity checks during deserialization. */
private static final int MAGIC_NUMBER = 911108;

Expand Down Expand Up @@ -168,18 +177,19 @@ TypeSerializerSchemaCompatibility<T> internalResolveSchemaCompatibility(

S castedNewSerializer = correspondingSerializerClass.cast(newSerializer);

// check that outer configuration is compatible; if not, short circuit result
if (!isOuterSnapshotCompatible(castedNewSerializer)) {
return TypeSerializerSchemaCompatibility.incompatible();
}
final OuterSchemaCompatibility outerSchemaCompatibility =
resolveOuterSchemaCompatibility(castedNewSerializer);

final TypeSerializer<?>[] newNestedSerializers = getNestedSerializers(castedNewSerializer);
// check that nested serializer arity remains identical; if not, short circuit result
if (newNestedSerializers.length != snapshots.length) {
return TypeSerializerSchemaCompatibility.incompatible();
}

return constructFinalSchemaCompatibilityResult(newNestedSerializers, snapshots);
return constructFinalSchemaCompatibilityResult(
newNestedSerializers,
snapshots,
outerSchemaCompatibility);
}

@Internal
Expand Down Expand Up @@ -237,7 +247,7 @@ public final TypeSerializer<T> restoreSerializer() {
* only has nested serializers and no extra information. Otherwise, if the outer serializer contains
* some extra information that needs to be persisted as part of the serializer snapshot, this
* must be overridden. Note that this method and the corresponding methods
* {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
* {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}, {@link #resolveOuterSchemaCompatibility(TypeSerializer)}
* needs to be implemented.
*
* @param out the {@link DataOutputView} to write the outer snapshot to.
Expand All @@ -251,7 +261,7 @@ protected void writeOuterSnapshot(DataOutputView out) throws IOException {}
* only has nested serializers and no extra information. Otherwise, if the outer serializer contains
* some extra information that has been persisted as part of the serializer snapshot, this
* must be overridden. Note that this method and the corresponding methods
* {@link #writeOuterSnapshot(DataOutputView)}, {@link #isOuterSnapshotCompatible(TypeSerializer)}
* {@link #writeOuterSnapshot(DataOutputView)}, {@link #resolveOuterSchemaCompatibility(TypeSerializer)}
* needs to be implemented.
*
* @param readOuterSnapshotVersion the read version of the outer snapshot.
Expand All @@ -275,11 +285,38 @@ protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in,
*
* @return a flag indicating whether or not the new serializer's outer information is compatible with the one
* written in this snapshot.
*
* @deprecated this method is deprecated, and will be removed in the future.
* Please implement {@link #resolveOuterSchemaCompatibility(TypeSerializer)} instead.
*/
@Deprecated
protected boolean isOuterSnapshotCompatible(S newSerializer) {
return true;
}

/**
* Checks the schema compatibility of the given new serializer based on the outer snapshot.
*
* <p>The base implementation of this method assumes that the outer serializer
* only has nested serializers and no extra information, and therefore the result of the check is
* {@link OuterSchemaCompatibility#COMPATIBLE_AS_IS}. Otherwise, if the outer serializer contains
* some extra information that has been persisted as part of the serializer snapshot, this
* must be overridden. Note that this method and the corresponding methods
* {@link #writeOuterSnapshot(DataOutputView)}, {@link #readOuterSnapshot(int, DataInputView, ClassLoader)}
* needs to be implemented.
*
* @param newSerializer the new serializer, which contains the new outer information to check against.
*
* @return a {@link OuterSchemaCompatibility} indicating whether or the new serializer's outer
* information is compatible, requires migration, or incompatible with the one written
* in this snapshot.
*/
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(S newSerializer) {
return (isOuterSnapshotCompatible(newSerializer))
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
}

// ------------------------------------------------------------------------------------------
// Utilities
// ------------------------------------------------------------------------------------------
Expand Down Expand Up @@ -311,17 +348,28 @@ private void legacyInternalReadOuterSnapshot(

private TypeSerializerSchemaCompatibility<T> constructFinalSchemaCompatibilityResult(
TypeSerializer<?>[] newNestedSerializers,
TypeSerializerSnapshot<?>[] nestedSerializerSnapshots) {
TypeSerializerSnapshot<?>[] nestedSerializerSnapshots,
OuterSchemaCompatibility outerSchemaCompatibility) {

IntermediateCompatibilityResult<T> intermediateResult =
IntermediateCompatibilityResult<T> nestedSerializersCompatibilityResult =
CompositeTypeSerializerUtil.constructIntermediateCompatibilityResult(newNestedSerializers, nestedSerializerSnapshots);

if (intermediateResult.isCompatibleWithReconfiguredSerializer()) {
if (outerSchemaCompatibility == OuterSchemaCompatibility.INCOMPATIBLE
|| nestedSerializersCompatibilityResult.isIncompatible()) {
return TypeSerializerSchemaCompatibility.incompatible();
}

if (outerSchemaCompatibility == OuterSchemaCompatibility.COMPATIBLE_AFTER_MIGRATION
|| nestedSerializersCompatibilityResult.isCompatibleAfterMigration()) {
return TypeSerializerSchemaCompatibility.compatibleAfterMigration();
}

if (nestedSerializersCompatibilityResult.isCompatibleWithReconfiguredSerializer()) {
@SuppressWarnings("unchecked")
TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(intermediateResult.getNestedSerializers());
TypeSerializer<T> reconfiguredCompositeSerializer = createOuterSerializerWithNestedSerializers(nestedSerializersCompatibilityResult.getNestedSerializers());
return TypeSerializerSchemaCompatibility.compatibleWithReconfiguredSerializer(reconfiguredCompositeSerializer);
}

return intermediateResult.getFinalResult();
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}
}

0 comments on commit d2cfc43

Please sign in to comment.