Skip to content

Commit

Permalink
[FLINK-17520] [doc] Use new resolveOuterSchemaCompatibility in custom…
Browse files Browse the repository at this point in the history
… serialization docs

This closes apache#12209.
  • Loading branch information
tzulitai committed May 19, 2020
1 parent 55a21bc commit 87e92dd
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 14 deletions.
8 changes: 5 additions & 3 deletions docs/dev/stream/state/custom_serialization.md
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ the nested element serializer.
In these cases, an additional three methods need to be implemented on the `CompositeTypeSerializerSnapshot`:
* `#writeOuterSnapshot(DataOutputView)`: defines how the outer snapshot information is written.
* `#readOuterSnapshot(int, DataInputView, ClassLoader)`: defines how the outer snapshot information is read.
* `#isOuterSnapshotCompatible(TypeSerializer)`: checks whether the outer snapshot information remains identical.
* `#resolveOuterSchemaCompatibility(TypeSerializer)`: checks the compatibility based on the outer snapshot information.

By default, the `CompositeTypeSerializerSnapshot` assumes that there isn't any outer snapshot information to
read / write, and therefore have empty default implementations for the above methods. If the subclass
Expand Down Expand Up @@ -351,8 +351,10 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
}

@Override
protected boolean isOuterSnapshotCompatible(GenericArraySerializer newSerializer) {
return this.componentClass == newSerializer.getComponentClass();
protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) {
return (this.componentClass == newSerializer.getComponentClass())
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
}

@Override
Expand Down
24 changes: 13 additions & 11 deletions docs/dev/stream/state/custom_serialization.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type and the *serialized binary format* of a state type. The schema, generally s
1. Data schema of the state type has evolved, i.e. adding or removing a field from a POJO that is used as state.
2. Generally speaking, after a change to the data schema, the serialization format of the serializer will need to be upgraded.
3. Configuration of the serializer has changed.

In order for the new execution to have information about the *written schema* of state and detect whether or not the
schema has changed, upon taking a savepoint of an operator's state, a *snapshot* of the state serializer needs to be
written along with the state bytes. This is abstracted a `TypeSerializerSnapshot`, explained in the next subsection.
Expand All @@ -108,10 +108,10 @@ public interface TypeSerializerSnapshot<T> {

<div data-lang="java" markdown="1">
{% highlight java %}
public abstract class TypeSerializer<T> {

public abstract class TypeSerializer<T> {
// ...

public abstract TypeSerializerSnapshot<T> snapshotConfiguration();
}
{% endhighlight %}
Expand Down Expand Up @@ -140,7 +140,7 @@ which can be one of the following:
2. **`TypeSerializerSchemaCompatibility.compatibleAfterMigration()`**: this result signals that the new serializer has a
different serialization schema, and it is possible to migrate from the old schema by using the previous serializer
(which recognizes the old schema) to read bytes into state objects, and then rewriting the object back to bytes with
the new serializer (which recognizes the new schema).
the new serializer (which recognizes the new schema).
3. **`TypeSerializerSchemaCompatibility.incompatible()`**: this result signals that the new serializer has a
different serialization schema, but it is not possible to migrate from the old schema.

Expand Down Expand Up @@ -170,13 +170,13 @@ to the implementation of state serializers and their serializer snapshots.
- Upon receiving the new serializer, it is provided to the restored previous serializer's snapshot via the
`TypeSerializer#resolveSchemaCompatibility` to check for schema compatibility.
4. **Migrate state bytes in backend from schema _A_ to schema _B_**
- If the compatibility resolution reflects that the schema has changed and migration is possible, schema migration is
- If the compatibility resolution reflects that the schema has changed and migration is possible, schema migration is
performed. The previous state serializer which recognizes schema _A_ will be obtained from the serializer snapshot, via
`TypeSerializerSnapshot#restoreSerializer()`, and is used to deserialize state bytes to objects, which in turn
are re-written again with the new serializer, which recognizes schema _B_ to complete the migration. All entries
of the accessed state is migrated all-together before processing continues.
- If the resolution signals incompatibility, then the state access fails with an exception.

#### Heap state backends (e.g. `MemoryStateBackend`, `FsStateBackend`)

1. **Register new state with a state serializer that has schema _A_**
Expand Down Expand Up @@ -218,7 +218,7 @@ as your serializer's snapshot class:

- `TypeSerializerSchemaCompatibility.compatibleAsIs()`, if the new serializer class remains identical, or
- `TypeSerializerSchemaCompatibility.incompatible()`, if the new serializer class is different then the previous one.

Below is an example of how the `SimpleTypeSerializerSnapshot` is used, using Flink's `IntSerializer` as an example:
<div data-lang="java" markdown="1">
{% highlight java %}
Expand Down Expand Up @@ -309,7 +309,7 @@ the nested element serializer.
In these cases, an additional three methods need to be implemented on the `CompositeTypeSerializerSnapshot`:
* `#writeOuterSnapshot(DataOutputView)`: defines how the outer snapshot information is written.
* `#readOuterSnapshot(int, DataInputView, ClassLoader)`: defines how the outer snapshot information is read.
* `#isOuterSnapshotCompatible(TypeSerializer)`: checks whether the outer snapshot information remains identical.
* `#resolveOuterSchemaCompatibility(TypeSerializer)`: checks the compatibility based on the outer snapshot information.

By default, the `CompositeTypeSerializerSnapshot` assumes that there isn't any outer snapshot information to
read / write, and therefore have empty default implementations for the above methods. If the subclass
Expand Down Expand Up @@ -351,8 +351,10 @@ public final class GenericArraySerializerSnapshot<C> extends CompositeTypeSerial
}

@Override
protected boolean isOuterSnapshotCompatible(GenericArraySerializer newSerializer) {
return this.componentClass == newSerializer.getComponentClass();
protected boolean resolveOuterSchemaCompatibility(GenericArraySerializer newSerializer) {
return (this.componentClass == newSerializer.getComponentClass())
? OuterSchemaCompatibility.COMPATIBLE_AS_IS
: OuterSchemaCompatibility.INCOMPATIBLE;
}

@Override
Expand Down

0 comments on commit 87e92dd

Please sign in to comment.