Skip to content

Commit

Permalink
[FLINK-9377] [core] (part 6) Introduce TypeSerializerSnapshot for a s…
Browse files Browse the repository at this point in the history
…moother upgrade path

This commit deprecates TypeSerializerConfigSnapshot, and introduces a
TypeSerializerSnapshot interface which will eventually be the new
replacement.

The now-deprecated TypeSerializerConfigSnapshot differentiates in that
when being written, it wil still write along with it the prior
serializer and return that when attempting to restore the prior
serializer. Implementations which are upgraded to directly implement the
new TypeSerializerSnapshot interface are strictly required to implement
the restoreSerializer() method. Therefore, once upgraded, the prior
serializer is no longer written.
  • Loading branch information
tzulitai committed Oct 10, 2018
1 parent 2dd8116 commit ac74a0c
Show file tree
Hide file tree
Showing 59 changed files with 1,134 additions and 699 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.common.typeutils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnull;
import java.io.IOException;

/**
* A utility {@link TypeSerializerConfigSnapshot} that is used for backwards compatibility purposes.
*
* <p>In older versions of Flink (<= 1.2), we only wrote serializers and not their corresponding snapshots.
* This class serves as a wrapper around the restored serializer instances.
*
* @param <T> the data type that the wrapped serializer instance serializes.
*/
@Internal
public class BackwardsCompatibleSerializerSnapshot<T> implements TypeSerializerSnapshot<T> {

/**
* The serializer instance written in savepoints.
*/
@Nonnull
private TypeSerializer<T> serializerInstance;

public BackwardsCompatibleSerializerSnapshot(TypeSerializer<T> serializerInstance) {
this.serializerInstance = Preconditions.checkNotNull(serializerInstance);
}

@Override
public void write(DataOutputView out) throws IOException {
throw new UnsupportedOperationException(
"This is a dummy config snapshot used only for backwards compatibility.");
}

@Override
public void read(int version, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
throw new UnsupportedOperationException(
"This is a dummy config snapshot used only for backwards compatibility.");
}

@Override
public int getCurrentVersion() {
throw new UnsupportedOperationException(
"This is a dummy config snapshot used only for backwards compatibility.");
}

@Override
public TypeSerializer<T> restoreSerializer() {
return serializerInstance;
}

@Override
public <NS extends TypeSerializer<T>> TypeSerializerSchemaCompatibility<T, NS> resolveSchemaCompatibility(NS newSerializer) {
// if there is no configuration snapshot to check against,
// then we can only assume that the new serializer is compatible as is
return TypeSerializerSchemaCompatibility.compatibleAsIs();
}

@Override
public int hashCode() {
return serializerInstance.hashCode();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}

if (o == null || getClass() != o.getClass()) {
return false;
}

BackwardsCompatibleSerializerSnapshot<?> that = (BackwardsCompatibleSerializerSnapshot<?>) o;

return that.serializerInstance.equals(serializerInstance);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,19 +62,13 @@ public class CompatibilityUtil {
public static <T> CompatibilityResult<T> resolveCompatibilityResult(
@Nullable TypeSerializer<?> precedingSerializer,
Class<?> dummySerializerClassTag,
TypeSerializerConfigSnapshot precedingSerializerConfigSnapshot,
TypeSerializerSnapshot<?> precedingSerializerConfigSnapshot,
TypeSerializer<T> newSerializer) {

TypeSerializerConfigSnapshot<?> actualConfigSnapshot;
if (precedingSerializerConfigSnapshot instanceof BackwardsCompatibleConfigSnapshot) {
actualConfigSnapshot =
((BackwardsCompatibleConfigSnapshot) precedingSerializerConfigSnapshot).getWrappedConfigSnapshot();
} else {
actualConfigSnapshot = precedingSerializerConfigSnapshot;
}
if (precedingSerializerConfigSnapshot != null
&& !(precedingSerializerConfigSnapshot instanceof BackwardsCompatibleSerializerSnapshot)) {

if (actualConfigSnapshot != null) {
CompatibilityResult<T> initialResult = newSerializer.ensureCompatibility(actualConfigSnapshot);
CompatibilityResult<T> initialResult = newSerializer.ensureCompatibility(precedingSerializerConfigSnapshot);

if (!initialResult.isRequiresMigration()) {
return initialResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
@Internal
public abstract class CompositeTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot<T> {

private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nestedSerializersAndConfigs;
private List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> nestedSerializersAndConfigs;

/** This empty nullary constructor is required for deserializing the configuration. */
public CompositeTypeSerializerConfigSnapshot() {}
Expand All @@ -50,11 +50,9 @@ public CompositeTypeSerializerConfigSnapshot(TypeSerializer<?>... nestedSerializ

this.nestedSerializersAndConfigs = new ArrayList<>(nestedSerializers.length);
for (TypeSerializer<?> nestedSerializer : nestedSerializers) {
TypeSerializerConfigSnapshot configSnapshot = nestedSerializer.snapshotConfiguration();
TypeSerializerSnapshot<?> configSnapshot = nestedSerializer.snapshotConfiguration();
this.nestedSerializersAndConfigs.add(
new Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>(
nestedSerializer.duplicate(),
Preconditions.checkNotNull(configSnapshot)));
new Tuple2<>(nestedSerializer.duplicate(), Preconditions.checkNotNull(configSnapshot)));
}
}

Expand All @@ -71,11 +69,11 @@ public void read(DataInputView in) throws IOException {
TypeSerializerSerializationUtil.readSerializersAndConfigsWithResilience(in, getUserCodeClassLoader());
}

public List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> getNestedSerializersAndConfigs() {
public List<Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>>> getNestedSerializersAndConfigs() {
return nestedSerializersAndConfigs;
}

public Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> getSingleNestedSerializerAndConfig() {
public Tuple2<TypeSerializer<?>, TypeSerializerSnapshot<?>> getSingleNestedSerializerAndConfig() {
return nestedSerializersAndConfigs.get(0);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.api.common.typeutils;

import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
Expand Down Expand Up @@ -175,11 +176,11 @@ public abstract class TypeSerializer<T> implements Serializable {
* serializer was registered to, the returned configuration snapshot can be used to ensure compatibility
* of the new serializer and determine if state migration is required.
*
* @see TypeSerializerConfigSnapshot
* @see TypeSerializerSnapshot
*
* @return snapshot of the serializer's current configuration (cannot be {@code null}).
*/
public abstract TypeSerializerConfigSnapshot<T> snapshotConfiguration();
public abstract TypeSerializerSnapshot<T> snapshotConfiguration();

/**
* Ensure compatibility of this serializer with a preceding serializer that was registered for serialization of
Expand Down Expand Up @@ -214,9 +215,33 @@ public abstract class TypeSerializer<T> implements Serializable {
* @param configSnapshot configuration snapshot of a preceding serializer for the same managed state
*
* @return the determined compatibility result (cannot be {@code null}).
*
* @deprecated TODO this method will be removed in later follow-up commits (see FLINK-9377).
*/
@Deprecated
public abstract CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot);
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
throw new IllegalStateException(
"Seems like that you are still using TypeSerializerConfigSnapshot; if so, this method must be implemented. " +
"Once you change to directly use TypeSerializerSnapshot, then you can safely remove the implementation " +
"of this method.");
}

@Internal
public final CompatibilityResult<T> ensureCompatibility(TypeSerializerSnapshot<?> configSnapshot) {
if (configSnapshot instanceof TypeSerializerConfigSnapshot) {
return ensureCompatibility((TypeSerializerConfigSnapshot<?>) configSnapshot);
} else {
@SuppressWarnings("unchecked")
TypeSerializerSnapshot<T> casted = (TypeSerializerSnapshot<T>) configSnapshot;

TypeSerializerSchemaCompatibility<T, ? extends TypeSerializer<T>> compat = casted.resolveSchemaCompatibility(this);
if (compat.isCompatibleAsIs()) {
return CompatibilityResult.compatible();
} else if (compat.isCompatibleAfterMigration()) {
return CompatibilityResult.requiresMigration();
} else if (compat.isIncompatible()) {
throw new IllegalStateException("The new serializer is incompatible.");
} else {
throw new IllegalStateException("Unidentifiable schema compatibility type. This is a bug, please file a JIRA.");
}
}
}
}
Loading

0 comments on commit ac74a0c

Please sign in to comment.