Skip to content

Commit

Permalink
[FLINK-9377] [core] (part 1) Extend TypeSerializerConfigSnapshot as a…
Browse files Browse the repository at this point in the history
… factory for restoring serializers

This commit is the first step towards removing serializers from
checkpointed state meta info and making Flink checkpoints Java
serialization free.

Instead of writing serializers in checkpoints, and trying to read that
to obtain a restore serializer at restore time, we aim to only write the
config snapshot as the single source of truth and use it as a factory to
create a restore serializer.

This commit adds the restoreSerializer() method and signatures to the
TypeSerializerConfigSnapshot interface. Use of the method, as well as
properly implementing the method for all serializers, will be
implemented in follow-up commits.

To allow for the codebase to still build, the restoreSerializer() method
currently returns the originating serializer directly. This implies the
fact that the originating serializer has been injected to the config
snapshot appropriately.
  • Loading branch information
tzulitai committed Oct 10, 2018
1 parent 5dc3609 commit 3787b89
Show file tree
Hide file tree
Showing 70 changed files with 583 additions and 260 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -167,9 +167,9 @@ public WritableSerializerConfigSnapshot<T> snapshotConfiguration() {
}

@Override
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
if (configSnapshot instanceof WritableSerializerConfigSnapshot
&& typeClass.equals(((WritableSerializerConfigSnapshot) configSnapshot).getTypeClass())) {
&& typeClass.equals(((WritableSerializerConfigSnapshot<?>) configSnapshot).getTypeClass())) {

return CompatibilityResult.compatible();
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
* is required.
*/
@Internal
public abstract class CompositeTypeSerializerConfigSnapshot extends TypeSerializerConfigSnapshot {
public abstract class CompositeTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot<T> {

private List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> nestedSerializersAndConfigs;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
* @param <T> The type to be instantiated.
*/
@Internal
public abstract class GenericTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot {
public abstract class GenericTypeSerializerConfigSnapshot<T> extends TypeSerializerConfigSnapshot<T> {

private Class<T> typeClass;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
* A base class for {@link TypeSerializerConfigSnapshot}s that do not have any parameters.
*/
@Internal
public final class ParameterlessTypeSerializerConfig extends TypeSerializerConfigSnapshot {
public final class ParameterlessTypeSerializerConfig<T> extends TypeSerializerConfigSnapshot<T> {

private static final int VERSION = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,12 +130,12 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
}

public TypeSerializerConfigSnapshot snapshotConfiguration() {
public TypeSerializerConfigSnapshot<T> snapshotConfiguration() {
throw new UnsupportedOperationException(
"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
}

public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
throw new UnsupportedOperationException(
"This is a TypeDeserializerAdapter used only for deserialization; this method should not be used.");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,14 +179,14 @@ public abstract class TypeSerializer<T> implements Serializable {
*
* @return snapshot of the serializer's current configuration (cannot be {@code null}).
*/
public abstract TypeSerializerConfigSnapshot snapshotConfiguration();
public abstract TypeSerializerConfigSnapshot<T> snapshotConfiguration();

/**
* Ensure compatibility of this serializer with a preceding serializer that was registered for serialization of
* the same managed state (if any - this method is only relevant if this serializer is registered for
* serialization of managed state).
*
* The compatibility check in this method should be performed by inspecting the preceding serializer's configuration
* <p>The compatibility check in this method should be performed by inspecting the preceding serializer's configuration
* snapshot. The method may reconfigure the serializer (if required and possible) so that it may be compatible,
* or provide a signaling result that informs Flink that state migration is necessary before continuing to use
* this serializer.
Expand Down Expand Up @@ -215,5 +215,5 @@ public abstract class TypeSerializer<T> implements Serializable {
*
* @return the determined compatibility result (cannot be {@code null}).
*/
public abstract CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot);
public abstract CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot);
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,35 +21,112 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.io.VersionedIOReadableWritable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.Preconditions;

import java.io.IOException;

/**
* A {@code TypeSerializerConfigSnapshot} is a point-in-time view of a {@link TypeSerializer's} configuration.
* The configuration snapshot of a serializer is persisted along with checkpoints of the managed state that the
* serializer is registered to.
* The configuration snapshot of a serializer is persisted within checkpoints
* as a single source of meta information about the schema of serialized data in the checkpoint.
* This serves three purposes:
*
* <ul>
* <li><strong>Capturing serializer parameters and schema:</strong> a serializer's configuration snapshot
* represents information about the parameters, state, and schema of a serializer.
* This is explained in more detail below.</li>
*
* <li><strong>Compatibility checks for new serializers:</strong> when new serializers are available,
* they need to be checked whether or not they are compatible to read the data written by the previous serializer.
* This is performed by providing the serializer configuration snapshots in checkpoints to the corresponding
* new serializers.</li>
*
* <p>The persisted configuration may later on be used by new serializers to ensure serialization compatibility
* for the same managed state. In order for new serializers to be able to ensure this, the configuration snapshot
* should encode sufficient information about:
* <li><strong>Factory for a read serializer when schema conversion is required:<strong> in the case that new
* serializers are not compatible to read previous data, a schema conversion process executed across all data
* is required before the new serializer can be continued to be used. This conversion process requires a compatible
* read serializer to restore serialized bytes as objects, and then written back again using the new serializer.
* In this scenario, the serializer configuration snapshots in checkpoints doubles as a factory for the read
* serializer of the conversion process.</li>
* </ul>
*
* <h2>Serializer Configuration and Schema</h2>
*
* <p>Since serializer configuration snapshots needs to be used to ensure serialization compatibility
* for the same managed state as well as serving as a factory for compatible read serializers, the configuration
* snapshot should encode sufficient information about:
*
* <ul>
* <li><strong>Parameter settings of the serializer:</strong> parameters of the serializer include settings
* required to setup the serializer, or the state of the serializer if it is stateful. If the serializer
* has nested serializers, then the configuration snapshot should also contain the parameters of the nested
* serializers.</li>
*
* <li><strong>Serialization schema of the serializer:</strong> the data format used by the serializer.</li>
* <li><strong>Serialization schema of the serializer:</strong> the binary format used by the serializer, or
* in other words, the schema of data written by the serializer.</li>
* </ul>
*
* <p>NOTE: Implementations must contain the default empty nullary constructor. This is required to be able to
* deserialize the configuration snapshot from its binary form.
*
* @param <T> The data type that the originating serializer of this configuration serializes.
*/
@PublicEvolving
public abstract class TypeSerializerConfigSnapshot extends VersionedIOReadableWritable {
public abstract class TypeSerializerConfigSnapshot<T> extends VersionedIOReadableWritable {

/** The user code class loader; only relevant if this configuration instance was deserialized from binary form. */
private ClassLoader userCodeClassLoader;

/**
* The originating serializer of this configuration snapshot.
*
* TODO to allow for incrementally adapting the implementation of serializer config snapshot subclasses,
* TODO we currently have a base implementation for the {@link #restoreSerializer()}
* TODO method which simply returns this serializer instance. The serializer is written
* TODO and read using Java serialization as part of reading / writing the config snapshot
*/
private TypeSerializer<T> serializer;

/**
* Creates a serializer using this configuration, that is capable of reading data
* written by the serializer described by this configuration.
*
* @return the restored serializer.
*/
public TypeSerializer<T> restoreSerializer() {
// TODO this implementation is only a placeholder; the intention is to have no default implementation
return serializer;
}

/**
* Set the originating serializer of this configuration snapshot.
*
* TODO this method is a temporary workaround to inject the serializer instance to
* TODO be returned by the restoreSerializer() method.
*/
@Internal
public final void setSerializer(TypeSerializer<T> serializer) {
this.serializer = Preconditions.checkNotNull(serializer);
}

@Override
public void write(DataOutputView out) throws IOException {
// bump the version; we use this to know that there is a serializer to read as part of the config
out.writeInt(getVersion() + 1);

TypeSerializerSerializationUtil.writeSerializer(out, this.serializer);
}

@Override
public void read(DataInputView in) throws IOException {
super.read(in);

if (getReadVersion() == getVersion() + 1) {
this.serializer = TypeSerializerSerializationUtil.tryReadSerializer(in, getUserCodeClassLoader(), true);
}
}

/**
* Set the user code class loader.
* Only relevant if this configuration instance was deserialized from binary form.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ public static void writeSerializersAndConfigsWithResilience(
writeSerializer(bufferWrapper, serAndConfSnapshot.f0);

out.writeInt(bufferWithPos.getPosition());
writeSerializerConfigSnapshot(bufferWrapper, serAndConfSnapshot.f1);
writeSerializerConfigSnapshot(bufferWrapper, serAndConfSnapshot.f1, serAndConfSnapshot.f0);
}

out.writeInt(bufferWithPos.getPosition());
Expand Down Expand Up @@ -229,16 +229,17 @@ public static List<Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot>> read
*
* @throws IOException
*/
public static void writeSerializerConfigSnapshot(
public static <T> void writeSerializerConfigSnapshot(
DataOutputView out,
TypeSerializerConfigSnapshot serializerConfigSnapshot) throws IOException {
TypeSerializerConfigSnapshot<T> serializerConfigSnapshot,
TypeSerializer<T> serializer) throws IOException {

new TypeSerializerConfigSnapshotSerializationProxy(serializerConfigSnapshot).write(out);
new TypeSerializerConfigSnapshotSerializationProxy<>(serializerConfigSnapshot, serializer).write(out);
}

/**
* Reads from a data input view a {@link TypeSerializerConfigSnapshot} that was previously
* written using {@link #writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot)}.
* written using {@link #writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}.
*
* @param in the data input view
* @param userCodeClassLoader the user code class loader to use
Expand All @@ -257,31 +258,9 @@ public static TypeSerializerConfigSnapshot readSerializerConfigSnapshot(
return proxy.getSerializerConfigSnapshot();
}

/**
* Writes multiple {@link TypeSerializerConfigSnapshot}s to the provided data output view.
*
* <p>It is written with a format that can be later read again using
* {@link #readSerializerConfigSnapshots(DataInputView, ClassLoader)}.
*
* @param out the data output view
* @param serializerConfigSnapshots the serializer configuration snapshots to write
*
* @throws IOException
*/
public static void writeSerializerConfigSnapshots(
DataOutputView out,
TypeSerializerConfigSnapshot... serializerConfigSnapshots) throws IOException {

out.writeInt(serializerConfigSnapshots.length);

for (TypeSerializerConfigSnapshot snapshot : serializerConfigSnapshots) {
new TypeSerializerConfigSnapshotSerializationProxy(snapshot).write(out);
}
}

/**
* Reads from a data input view multiple {@link TypeSerializerConfigSnapshot}s that was previously
* written using {@link #writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot)}.
* written using {@link #writeSerializerConfigSnapshot(DataOutputView, TypeSerializerConfigSnapshot, TypeSerializer)}.
*
* @param in the data input view
* @param userCodeClassLoader the user code class loader to use
Expand Down Expand Up @@ -387,19 +366,23 @@ public int getVersion() {
/**
* Utility serialization proxy for a {@link TypeSerializerConfigSnapshot}.
*/
static final class TypeSerializerConfigSnapshotSerializationProxy extends VersionedIOReadableWritable {
static final class TypeSerializerConfigSnapshotSerializationProxy<T> extends VersionedIOReadableWritable {

private static final int VERSION = 1;

private ClassLoader userCodeClassLoader;
private TypeSerializerConfigSnapshot serializerConfigSnapshot;
private TypeSerializerConfigSnapshot<T> serializerConfigSnapshot;
private TypeSerializer<T> serializer;

TypeSerializerConfigSnapshotSerializationProxy(ClassLoader userCodeClassLoader) {
this.userCodeClassLoader = Preconditions.checkNotNull(userCodeClassLoader);
}

TypeSerializerConfigSnapshotSerializationProxy(TypeSerializerConfigSnapshot serializerConfigSnapshot) {
this.serializerConfigSnapshot = serializerConfigSnapshot;
TypeSerializerConfigSnapshotSerializationProxy(
TypeSerializerConfigSnapshot<T> serializerConfigSnapshot,
TypeSerializer<T> serializer) {
this.serializerConfigSnapshot = Preconditions.checkNotNull(serializerConfigSnapshot);
this.serializer = Preconditions.checkNotNull(serializer);
}

@Override
Expand All @@ -410,6 +393,10 @@ public void write(DataOutputView out) throws IOException {
// correct type of config snapshot instance when deserializing
out.writeUTF(serializerConfigSnapshot.getClass().getName());

// TODO this is a temporary workaround until all serializer config snapshot classes in Flink
// TODO have properly implemented the restore serializer factory methods
serializerConfigSnapshot.setSerializer(serializer);

// the actual configuration parameters
serializerConfigSnapshot.write(out);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,12 +94,12 @@ public void copy(DataInputView source, DataOutputView target) throws IOException
}

@Override
public TypeSerializerConfigSnapshot snapshotConfiguration() {
public TypeSerializerConfigSnapshot<T> snapshotConfiguration() {
throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
}

@Override
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
throw new UnsupportedOperationException("This object is a dummy TypeSerializer.");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,16 @@
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;

import java.util.Collection;

/**
* Configuration snapshot of a serializer for collection types.
*
* @param <T> Type of the element.
*/
@Internal
public final class CollectionSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot {
public final class CollectionSerializerConfigSnapshot<C extends Collection<T>, T>
extends CompositeTypeSerializerConfigSnapshot<C> {

private static final int VERSION = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ public EnumSerializerConfigSnapshot<T> snapshotConfiguration() {

@SuppressWarnings("unchecked")
@Override
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
if (configSnapshot instanceof EnumSerializerConfigSnapshot) {
final EnumSerializerConfigSnapshot<T> config = (EnumSerializerConfigSnapshot<T>) configSnapshot;

Expand Down Expand Up @@ -281,7 +281,7 @@ public void read(DataInputView in) throws IOException {
+ getTypeClass().getName() + " no longer exists.", e);
}
}
} else if (getReadVersion() == VERSION) {
} else if (getReadVersion() >= VERSION) {
int numEnumConstants = in.readInt();

this.enumConstants = new ArrayList<>(numEnumConstants);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,12 +198,12 @@ public String toString() {
// --------------------------------------------------------------------------------------------

@Override
public GenericArraySerializerConfigSnapshot snapshotConfiguration() {
public GenericArraySerializerConfigSnapshot<C> snapshotConfiguration() {
return new GenericArraySerializerConfigSnapshot<>(componentClass, componentSerializer);
}

@Override
public CompatibilityResult<C[]> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
public CompatibilityResult<C[]> ensureCompatibility(TypeSerializerConfigSnapshot<?> configSnapshot) {
if (configSnapshot instanceof GenericArraySerializerConfigSnapshot) {
final GenericArraySerializerConfigSnapshot config = (GenericArraySerializerConfigSnapshot) configSnapshot;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
* @param <C> The component type.
*/
@Internal
public final class GenericArraySerializerConfigSnapshot<C> extends CompositeTypeSerializerConfigSnapshot {
public final class GenericArraySerializerConfigSnapshot<C> extends CompositeTypeSerializerConfigSnapshot<C[]> {

private static final int VERSION = 1;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ public int hashCode() {
// --------------------------------------------------------------------------------------------

@Override
public CollectionSerializerConfigSnapshot snapshotConfiguration() {
public CollectionSerializerConfigSnapshot<List<T>, T> snapshotConfiguration() {
return new CollectionSerializerConfigSnapshot<>(elementSerializer);
}

@Override
public CompatibilityResult<List<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
if (configSnapshot instanceof CollectionSerializerConfigSnapshot) {
Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousElemSerializerAndConfig =
((CollectionSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig();
((CollectionSerializerConfigSnapshot<?, ?>) configSnapshot).getSingleNestedSerializerAndConfig();

CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult(
previousElemSerializerAndConfig.f0,
Expand Down
Loading

0 comments on commit 3787b89

Please sign in to comment.