Skip to content

Commit

Permalink
[FLINK-11073] [core] Let ArrayListSerializerSnapshot be a CompositeTy…
Browse files Browse the repository at this point in the history
…peSerializerSnapshot
  • Loading branch information
tzulitai committed Jan 8, 2019
1 parent 9822cc8 commit 383cc9e
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ public int hashCode() {

@Override
public TypeSerializerSnapshot<ArrayList<T>> snapshotConfiguration() {
return new ArrayListSerializerSnapshot<>(elementSerializer);
return new ArrayListSerializerSnapshot<>(this);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,72 +18,46 @@

package org.apache.flink.runtime.state;

import org.apache.flink.api.common.typeutils.CompositeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.CompositeTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerSchemaCompatibility;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;

import java.io.IOException;
import java.util.ArrayList;

import static org.apache.flink.util.Preconditions.checkState;

/**
* Snapshot class for the {@link ArrayListSerializer}.
*/
public class ArrayListSerializerSnapshot<T> implements TypeSerializerSnapshot<ArrayList<T>> {
public class ArrayListSerializerSnapshot<T> extends CompositeTypeSerializerSnapshot<ArrayList<T>, ArrayListSerializer> {

private static final int CURRENT_VERSION = 1;

private CompositeSerializerSnapshot nestedElementSerializerSnapshot;

/**
* Constructor for read instantiation.
*/
public ArrayListSerializerSnapshot() {}
public ArrayListSerializerSnapshot() {
super(ArrayListSerializer.class);
}

/**
* Constructor for creating the snapshot for writing.
*/
public ArrayListSerializerSnapshot(TypeSerializer<T> elementSerializer) {
this.nestedElementSerializerSnapshot = new CompositeSerializerSnapshot(elementSerializer);
public ArrayListSerializerSnapshot(ArrayListSerializer<T> arrayListSerializer) {
super(arrayListSerializer);
}

@Override
public int getCurrentVersion() {
public int getCurrentOuterSnapshotVersion() {
return CURRENT_VERSION;
}

@Override
public TypeSerializer<ArrayList<T>> restoreSerializer() {
return new ArrayListSerializer<>(nestedElementSerializerSnapshot.getRestoreSerializer(0));
}

@Override
public TypeSerializerSchemaCompatibility<ArrayList<T>> resolveSchemaCompatibility(TypeSerializer<ArrayList<T>> newSerializer) {
checkState(nestedElementSerializerSnapshot != null);

if (newSerializer instanceof ArrayListSerializer) {
ArrayListSerializer<T> serializer = (ArrayListSerializer<T>) newSerializer;

return nestedElementSerializerSnapshot.resolveCompatibilityWithNested(
TypeSerializerSchemaCompatibility.compatibleAsIs(),
serializer.getElementSerializer());
}
else {
return TypeSerializerSchemaCompatibility.incompatible();
}
}

@Override
public void writeSnapshot(DataOutputView out) throws IOException {
nestedElementSerializerSnapshot.writeCompositeSnapshot(out);
protected ArrayListSerializer createOuterSerializerWithNestedSerializers(TypeSerializer<?>[] nestedSerializers) {
@SuppressWarnings("unchecked")
TypeSerializer<T> elementSerializer = (TypeSerializer<T>) nestedSerializers[0];
return new ArrayListSerializer<>(elementSerializer);
}

@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader userCodeClassLoader) throws IOException {
this.nestedElementSerializerSnapshot = CompositeSerializerSnapshot.readCompositeSnapshot(in, userCodeClassLoader);
protected TypeSerializer<?>[] getNestedSerializers(ArrayListSerializer outerSerializer) {
return new TypeSerializer<?>[] { outerSerializer.getElementSerializer() };
}
}

0 comments on commit 383cc9e

Please sign in to comment.