Skip to content

Commit

Permalink
[FLINK-17520] [core] Rework all implementations of CompositeTypeSeria…
Browse files Browse the repository at this point in the history
…lizerSnapshot#resolveOuterSchemaCompatibility
  • Loading branch information
tzulitai committed May 19, 2020
1 parent 71e3339 commit 55a21bc
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,10 @@ protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in,
}

@Override
protected boolean isOuterSnapshotCompatible(GenericArraySerializer<C> newSerializer) {
return this.componentClass == newSerializer.getComponentClass();
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(GenericArraySerializer<C> newSerializer) {
return (this.componentClass == newSerializer.getComponentClass())
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,8 +348,10 @@ protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in,
}

@Override
protected boolean isOuterSnapshotCompatible(NullableSerializer<T> newSerializer) {
return nullPaddingLength == newSerializer.nullPaddingLength();
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(NullableSerializer<T> newSerializer) {
return (nullPaddingLength == newSerializer.nullPaddingLength())
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,9 @@ protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in,
}

@Override
protected boolean isOuterSnapshotCompatible(ScalaCaseClassSerializer<T> newSerializer) {
return Objects.equals(type, newSerializer.getTupleClass());
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(ScalaCaseClassSerializer<T> newSerializer) {
return (Objects.equals(type, newSerializer.getTupleClass()))
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,9 @@ protected void readOuterSnapshot(
}

@Override
protected boolean isOuterSnapshotCompatible(TraversableSerializer<T, E> newSerializer) {
return cbfCode.equals(newSerializer.cbfCode());
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(TraversableSerializer<T, E> newSerializer) {
return (cbfCode.equals(newSerializer.cbfCode()))
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,10 @@ protected void readOuterSnapshot(int readOuterSnapshotVersion, DataInputView in,
}

@Override
protected boolean isOuterSnapshotCompatible(ScalaCaseClassSerializer<Tuple2<T1, T2>> newSerializer) {
return Objects.equals(type, newSerializer.getTupleClass());
protected OuterSchemaCompatibility resolveOuterSchemaCompatibility(ScalaCaseClassSerializer<Tuple2<T1, T2>> newSerializer) {
return (Objects.equals(type, newSerializer.getTupleClass()))
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
}

private static <T1, T2> Class<ScalaCaseClassSerializer<scala.Tuple2<T1, T2>>> correspondingSerializerClass() {
Expand Down

0 comments on commit 55a21bc

Please sign in to comment.