diff --git a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java index 9987faef9d751..95276d62fb615 100644 --- a/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java +++ b/flink-core/src/main/java/org/apache/flink/api/java/typeutils/runtime/PojoSerializerSnapshot.java @@ -474,13 +474,21 @@ private static Tuple2, Integer>, TypeSerializer[] Iterator> serializersForPreexistingRegistrations = Arrays.asList(preExistingRegistrationsCompatibility.getNestedSerializers()).iterator(); - for (Map.Entry, TypeSerializer> registration : newSubclassRegistrations.entrySet()) { - // new registrations should simply be appended to the subclass serializer registry with their new serializers; - // preexisting registrations should use the compatibility-checked serializer - TypeSerializer newRegistration = (reconfiguredSubclassSerializerRegistry.containsKey(registration.getKey())) - ? serializersForPreexistingRegistrations.next() - : registration.getValue(); - reconfiguredSubclassSerializerRegistry.put(registration.getKey(), newRegistration); + // first, replace all restored serializers of subclasses that co-exist in + // the previous and new registrations, with the compatibility-checked serializers + for (Map.Entry, TypeSerializer> oldRegistration : reconfiguredSubclassSerializerRegistry.entrySet()) { + if (newSubclassRegistrations.containsKey(oldRegistration.getKey())) { + oldRegistration.setValue(serializersForPreexistingRegistrations.next()); + } + } + + // then, for all new registration that did not exist before, append it to the registry simply with their + // new serializers + for (Map.Entry, TypeSerializer> newRegistration : newSubclassRegistrations.entrySet()) { + TypeSerializer oldRegistration = reconfiguredSubclassSerializerRegistry.get(newRegistration.getKey()); + if (oldRegistration == null) { + reconfiguredSubclassSerializerRegistry.put(newRegistration.getKey(), newRegistration.getValue()); + } } return decomposeSubclassSerializerRegistry(reconfiguredSubclassSerializerRegistry);