Skip to content

Commit

Permalink
[FLINK-19972][serialization] add more hints in case of incompatbilities
Browse files Browse the repository at this point in the history
  • Loading branch information
NicoK committed Nov 10, 2020
1 parent 8a63e64 commit 700a92d
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -213,19 +213,29 @@ private <N, V> StateTable<K, N, V> tryRegisterStateTable(

restoredKvMetaInfo.updateSnapshotTransformFactory(snapshotTransformFactory);

// fetch current serializer now because if it is incompatible, we can't access
// it anymore to improve the error message
TypeSerializer<N> previousNamespaceSerializer =
restoredKvMetaInfo.getNamespaceSerializer();

TypeSerializerSchemaCompatibility<N> namespaceCompatibility =
restoredKvMetaInfo.updateNamespaceSerializer(namespaceSerializer);
if (namespaceCompatibility.isCompatibleAfterMigration() || namespaceCompatibility.isIncompatible()) {
throw new StateMigrationException("For heap backends, the new namespace serializer must be compatible.");
throw new StateMigrationException("For heap backends, the new namespace serializer (" + namespaceSerializer + ") must be compatible with the old namespace serializer (" + previousNamespaceSerializer + ").");
}

restoredKvMetaInfo.checkStateMetaInfo(stateDesc);

// fetch current serializer now because if it is incompatible, we can't access
// it anymore to improve the error message
TypeSerializer<V> previousStateSerializer =
restoredKvMetaInfo.getStateSerializer();

TypeSerializerSchemaCompatibility<V> stateCompatibility =
restoredKvMetaInfo.updateStateSerializer(newStateSerializer);

if (stateCompatibility.isIncompatible()) {
throw new StateMigrationException("For heap backends, the new state serializer must not be incompatible.");
throw new StateMigrationException("For heap backends, the new state serializer (" + newStateSerializer + ") must not be incompatible with the old state serializer (" + previousStateSerializer + ").");
}

stateTable.setMetaInfo(restoredKvMetaInfo);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.state.heap;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.fs.CloseableRegistry;
Expand Down Expand Up @@ -134,12 +135,16 @@ public Void restore() throws Exception {
serializationProxy.read(inView);

if (!keySerializerRestored) {
// fetch current serializer now because if it is incompatible, we can't access
// it anymore to improve the error message
TypeSerializer<K> currentSerializer =
keySerializerProvider.currentSchemaSerializer();
// check for key serializer compatibility; this also reconfigures the
// key serializer to be compatible, if it is required and is possible
TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat =
keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(serializationProxy.getKeySerializerSnapshot());
if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
throw new StateMigrationException("The new key serializer must be compatible.");
throw new StateMigrationException("The new key serializer (" + currentSerializer + ") must be compatible with the previous key serializer (" + keySerializerProvider.previousSchemaSerializer() + ").");
}

keySerializerRestored = true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -569,19 +569,28 @@ private <N, S extends State, SV> RegisteredKeyValueStateBackendMetaInfo<N, SV> u
@SuppressWarnings("unchecked")
RegisteredKeyValueStateBackendMetaInfo<N, SV> restoredKvStateMetaInfo = oldStateInfo.f1;

// fetch current serializer now because if it is incompatible, we can't access
// it anymore to improve the error message
TypeSerializer<N> previousNamespaceSerializer =
restoredKvStateMetaInfo.getNamespaceSerializer();

TypeSerializerSchemaCompatibility<N> s = restoredKvStateMetaInfo.updateNamespaceSerializer(namespaceSerializer);
if (s.isCompatibleAfterMigration() || s.isIncompatible()) {
throw new StateMigrationException("The new namespace serializer must be compatible.");
throw new StateMigrationException("The new namespace serializer (" + namespaceSerializer + ") must be compatible with the old namespace serializer (" + previousNamespaceSerializer + ").");
}

restoredKvStateMetaInfo.checkStateMetaInfo(stateDesc);

// fetch current serializer now because if it is incompatible, we can't access
// it anymore to improve the error message
TypeSerializer<SV> previousStateSerializer = restoredKvStateMetaInfo.getStateSerializer();

TypeSerializerSchemaCompatibility<SV> newStateSerializerCompatibility =
restoredKvStateMetaInfo.updateStateSerializer(stateSerializer);
if (newStateSerializerCompatibility.isCompatibleAfterMigration()) {
migrateStateValues(stateDesc, oldStateInfo);
} else if (newStateSerializerCompatibility.isIncompatible()) {
throw new StateMigrationException("The new state serializer cannot be incompatible.");
throw new StateMigrationException("The new state serializer (" + stateSerializer + ") must not be incompatible with the old state serializer (" + previousStateSerializer + ").");
}

return restoredKvStateMetaInfo;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.contrib.streaming.state.restore;

import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.contrib.streaming.state.RocksDBKeyedStateBackend.RocksDbKvStateInfo;
import org.apache.flink.contrib.streaming.state.RocksDBNativeMetricMonitor;
Expand Down Expand Up @@ -189,12 +190,16 @@ KeyedBackendSerializationProxy<K> readMetaData(DataInputView dataInputView)
new KeyedBackendSerializationProxy<>(userCodeClassLoader);
serializationProxy.read(dataInputView);
if (!isKeySerializerCompatibilityChecked) {
// fetch current serializer now because if it is incompatible, we can't access
// it anymore to improve the error message
TypeSerializer<K> currentSerializer =
keySerializerProvider.currentSchemaSerializer();
// check for key serializer compatibility; this also reconfigures the
// key serializer to be compatible, if it is required and is possible
TypeSerializerSchemaCompatibility<K> keySerializerSchemaCompat =
keySerializerProvider.setPreviousSerializerSnapshotForRestoredState(serializationProxy.getKeySerializerSnapshot());
if (keySerializerSchemaCompat.isCompatibleAfterMigration() || keySerializerSchemaCompat.isIncompatible()) {
throw new StateMigrationException("The new key serializer must be compatible.");
throw new StateMigrationException("The new key serializer (" + currentSerializer + ") must be compatible with the previous key serializer (" + keySerializerProvider.previousSchemaSerializer() + ").");
}

isKeySerializerCompatibilityChecked = true;
Expand Down

0 comments on commit 700a92d

Please sign in to comment.