Skip to content

Commit

Permalink
[FLINK-6482] [core] Add nested serializers to config snapshots of com…
Browse files Browse the repository at this point in the history
…posite serializers

This commit adds also the nested serializers themselves to the
configuration snapshots of composite serializers. This opens up the
oppurtunity to use the previous nested serializer as the convert
deserializer in the case that a nested serializer in the new serializer
determines that state migration is required.

This commit also consolidate all TypeSerializer-related serialization
proxies into a single utility class.

This closes apache#3937.
  • Loading branch information
tzulitai committed May 22, 2017
1 parent 5624c70 commit a7bc5de
Show file tree
Hide file tree
Showing 49 changed files with 1,808 additions and 1,298 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSerializationProxy;
import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer;
import org.apache.flink.api.common.typeutils.base.array.BytePrimitiveArraySerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.runtime.DataInputViewStream;
Expand All @@ -53,6 +53,7 @@
import org.apache.flink.runtime.query.TaskKvStateRegistry;
import org.apache.flink.runtime.state.AbstractKeyedStateBackend;
import org.apache.flink.runtime.state.CheckpointStreamFactory;
import org.apache.flink.api.common.typeutils.CompatibilityUtil;
import org.apache.flink.runtime.state.DoneFuture;
import org.apache.flink.runtime.state.IncrementalKeyedStateHandle;
import org.apache.flink.runtime.state.KeyGroupRange;
Expand All @@ -64,7 +65,6 @@
import org.apache.flink.runtime.state.PlaceholderStreamStateHandle;
import org.apache.flink.runtime.state.RegisteredKeyedBackendStateMetaInfo;
import org.apache.flink.runtime.state.StateHandleID;
import org.apache.flink.runtime.state.StateMigrationUtil;
import org.apache.flink.runtime.state.StateObject;
import org.apache.flink.runtime.state.StateUtil;
import org.apache.flink.runtime.state.StreamStateHandle;
Expand Down Expand Up @@ -1111,9 +1111,9 @@ private void restoreKVStateMetaData() throws IOException, RocksDBException {

// check for key serializer compatibility; this also reconfigures the
// key serializer to be compatible, if it is required and is possible
if (StateMigrationUtil.resolveCompatibilityResult(
if (CompatibilityUtil.resolveCompatibilityResult(
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
UnloadableDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
rocksDBKeyedStateBackend.keySerializer)
.isRequiresMigration()) {
Expand Down Expand Up @@ -1230,9 +1230,9 @@ private RocksDBIncrementalRestoreOperation(RocksDBKeyedStateBackend<T> stateBack

// check for key serializer compatibility; this also reconfigures the
// key serializer to be compatible, if it is required and is possible
if (StateMigrationUtil.resolveCompatibilityResult(
if (CompatibilityUtil.resolveCompatibilityResult(
serializationProxy.getKeySerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
UnloadableDummyTypeSerializer.class,
serializationProxy.getKeySerializerConfigSnapshot(),
stateBackend.keySerializer)
.isRequiresMigration()) {
Expand Down Expand Up @@ -1532,16 +1532,15 @@ protected <N, S> ColumnFamilyHandle getColumnFamily(
}

// check compatibility results to determine if state migration is required

CompatibilityResult<?> namespaceCompatibility = StateMigrationUtil.resolveCompatibilityResult(
CompatibilityResult<N> namespaceCompatibility = CompatibilityUtil.resolveCompatibilityResult(
restoredMetaInfo.getNamespaceSerializer(),
MigrationNamespaceSerializerProxy.class,
restoredMetaInfo.getNamespaceSerializerConfigSnapshot(),
newMetaInfo.getNamespaceSerializer());

CompatibilityResult<S> stateCompatibility = StateMigrationUtil.resolveCompatibilityResult(
CompatibilityResult<S> stateCompatibility = CompatibilityUtil.resolveCompatibilityResult(
restoredMetaInfo.getStateSerializer(),
TypeSerializerSerializationProxy.ClassNotFoundDummyTypeSerializer.class,
UnloadableDummyTypeSerializer.class,
restoredMetaInfo.getStateSerializerConfigSnapshot(),
newMetaInfo.getStateSerializer());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
* limitations under the License.
*/

package org.apache.flink.runtime.state;
package org.apache.flink.api.common.typeutils;

import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.annotation.Internal;

/**
* Utilities related to state migration, commonly used in the state backends.
* Utilities related to serializer compatibility.
*/
public class StateMigrationUtil {
@Internal
public class CompatibilityUtil {

/**
* Resolves the final compatibility result of two serializers by taking into account compound information,
Expand All @@ -47,11 +46,12 @@ public class StateMigrationUtil {
* @param newSerializer the new serializer to ensure compatibility with
*
* @param <T> Type of the data handled by the serializers
*
*
* @return the final resolved compatibility result
*/
@SuppressWarnings("unchecked")
public static <T> CompatibilityResult<T> resolveCompatibilityResult(
TypeSerializer<T> precedingSerializer,
TypeSerializer<?> precedingSerializer,
Class<?> dummySerializerClassTag,
TypeSerializerConfigSnapshot precedingSerializerConfigSnapshot,
TypeSerializer<T> newSerializer) {
Expand All @@ -65,7 +65,7 @@ public static <T> CompatibilityResult<T> resolveCompatibilityResult(
if (precedingSerializer != null && !(precedingSerializer.getClass().equals(dummySerializerClassTag))) {
// if the preceding serializer exists and is not a dummy, use
// that for converting instead of the provided convert deserializer
return CompatibilityResult.requiresMigration(precedingSerializer);
return CompatibilityResult.requiresMigration((TypeSerializer<T>) precedingSerializer);
} else if (initialResult.getConvertDeserializer() != null) {
return initialResult;
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,47 +19,64 @@
package org.apache.flink.api.common.typeutils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;

import java.io.IOException;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;

/**
* A {@link TypeSerializerConfigSnapshot} for serializers that has multiple nested serializers.
* The configuration snapshot consists of the configuration snapshots of all nested serializers.
* The configuration snapshot consists of the configuration snapshots of all nested serializers, and
* also the nested serializers themselves.
*
* <p>Both the nested serializers and the configuration snapshots are written as configuration of
* composite serializers, so that on restore, the previous serializer may be used in case migration
* is required.
*/
@Internal
public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {

private TypeSerializerConfigSnapshot[] nestedSerializerConfigSnapshots;
private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nestedSerializersAndConfigs;

/** This empty nullary constructor is required for deserializing the configuration. */
public CompositeTypeSerializerConfigSnapshot() {}

public CompositeTypeSerializerConfigSnapshot(TypeSerializerConfigSnapshot... nestedSerializerConfigSnapshots) {
this.nestedSerializerConfigSnapshots = Preconditions.checkNotNull(nestedSerializerConfigSnapshots);
public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... nestedSerializers) {
Preconditions.checkNotNull(nestedSerializers);

this.nestedSerializersAndConfigs = new ArrayList<>(nestedSerializers.length);
for (TypeSerializer<?> nestedSerializer : nestedSerializers) {
TypeSerializerConfigSnapshot configSnapshot = nestedSerializer.snapshotConfiguration();
this.nestedSerializersAndConfigs.add(
new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
nestedSerializer.duplicate(),
Preconditions.checkNotNull(configSnapshot)));
}
}

@Override
public void write(DataOutputView out) throws IOException {
super.write(out);
TypeSerializerUtil.writeSerializerConfigSnapshots(out, nestedSerializerConfigSnapshots);
TypeSerializerSerializationUtil.writeSerializersAndConfigsWithResilience(out, nestedSerializersAndConfigs);
}

@Override
public void read(DataInputView in) throws IOException {
super.read(in);
nestedSerializerConfigSnapshots = TypeSerializerUtil.readSerializerConfigSnapshots(in, getUserCodeClassLoader());
this.nestedSerializersAndConfigs =
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, getUserCodeClassLoader());
}

public TypeSerializerConfigSnapshot[] getNestedSerializerConfigSnapshots() {
return nestedSerializerConfigSnapshots;
public List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getNestedSerializersAndConfigs() {
return nestedSerializersAndConfigs;
}

public TypeSerializerConfigSnapshot getSingleNestedSerializerConfigSnapshot() {
return nestedSerializerConfigSnapshots[0];
public Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> getSingleNestedSerializerAndConfig() {
return nestedSerializersAndConfigs.get(0);
}

@Override
Expand All @@ -73,13 +90,11 @@ public boolean equals(Object obj) {
}

return (obj.getClass().equals(getClass()))
&& Arrays.equals(
nestedSerializerConfigSnapshots,
((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializerConfigSnapshots());
&& nestedSerializersAndConfigs.equals(((CompositeTypeSerializerConfigSnapshot) obj).getNestedSerializersAndConfigs());
}

@Override
public int hashCode() {
return Arrays.hashCode(nestedSerializerConfigSnapshots);
return nestedSerializersAndConfigs.hashCode();
}
}
Loading

0 comments on commit a7bc5de

Please sign in to comment.