Skip to content

Commit

Permalink
[FLINK-13159] Fix incorrect subclass serializer reconfiguration in Po…
Browse files Browse the repository at this point in the history
…joSerializer
  • Loading branch information
tzulitai committed Aug 8, 2019
1 parent 594a7eb commit 268da6a
Showing 1 changed file with 15 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -474,13 +474,21 @@ private static Tuple2<LinkedHashMap<Class<?>, Integer>, TypeSerializer<Object>[]
Iterator<TypeSerializer<?>> serializersForPreexistingRegistrations =
Arrays.asList(preExistingRegistrationsCompatibility.getNestedSerializers()).iterator();

for (Map.Entry<Class<?>, TypeSerializer<?>> registration : newSubclassRegistrations.entrySet()) {
// new registrations should simply be appended to the subclass serializer registry with their new serializers;
// preexisting registrations should use the compatibility-checked serializer
TypeSerializer<?> newRegistration = (reconfiguredSubclassSerializerRegistry.containsKey(registration.getKey()))
? serializersForPreexistingRegistrations.next()
: registration.getValue();
reconfiguredSubclassSerializerRegistry.put(registration.getKey(), newRegistration);
// first, replace all restored serializers of subclasses that co-exist in
// the previous and new registrations, with the compatibility-checked serializers
for (Map.Entry<Class<?>, TypeSerializer<?>> oldRegistration : reconfiguredSubclassSerializerRegistry.entrySet()) {
if (newSubclassRegistrations.containsKey(oldRegistration.getKey())) {
oldRegistration.setValue(serializersForPreexistingRegistrations.next());
}
}

// then, for all new registration that did not exist before, append it to the registry simply with their
// new serializers
for (Map.Entry<Class<?>, TypeSerializer<?>> newRegistration : newSubclassRegistrations.entrySet()) {
TypeSerializer<?> oldRegistration = reconfiguredSubclassSerializerRegistry.get(newRegistration.getKey());
if (oldRegistration == null) {
reconfiguredSubclassSerializerRegistry.put(newRegistration.getKey(), newRegistration.getValue());
}
}

return decomposeSubclassSerializerRegistry(reconfiguredSubclassSerializerRegistry);
Expand Down

0 comments on commit 268da6a

Please sign in to comment.