Skip to content

Commit

Permalink
[FLINK-11328] [core] Upgrade parameterless / singleton serializers to…
Browse files Browse the repository at this point in the history
… use new serialization compatibility APIs
  • Loading branch information
tzulitai committed Jan 22, 2019
1 parent f388b65 commit edf6d59
Show file tree
Hide file tree
Showing 64 changed files with 610 additions and 149 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -1230,6 +1232,23 @@ public void copy(
public boolean canEqual(Object obj) {
return obj instanceof TransactionStateSerializer;
}

// ------------------------------------------------------------------------

@Override
public TypeSerializerSnapshot<KafkaTransactionState> snapshotConfiguration() {
return new TransactionStateSerializerSnapshot();
}

/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class TransactionStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionState> {
public TransactionStateSerializerSnapshot() {
super(TransactionStateSerializer::new);
}
}
}

/**
Expand Down Expand Up @@ -1312,6 +1331,23 @@ public void copy(
public boolean canEqual(Object obj) {
return obj instanceof ContextStateSerializer;
}

// ------------------------------------------------------------------------

@Override
public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() {
return new ContextStateSerializerSnapshot();
}

/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class ContextStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {
public ContextStateSerializerSnapshot() {
super(ContextStateSerializer::new);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.SimpleTypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.TypeSerializerSnapshot;
import org.apache.flink.api.common.typeutils.base.TypeSerializerSingleton;
import org.apache.flink.api.java.ClosureCleaner;
import org.apache.flink.configuration.Configuration;
Expand Down Expand Up @@ -1236,6 +1238,24 @@ public void copy(
public boolean canEqual(Object obj) {
return obj instanceof FlinkKafkaProducer.TransactionStateSerializer;
}

// -----------------------------------------------------------------------------------

@Override
public TypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionState> snapshotConfiguration() {
return new TransactionStateSerializerSnapshot();
}

/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class TransactionStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<FlinkKafkaProducer.KafkaTransactionState> {

public TransactionStateSerializerSnapshot() {
super(TransactionStateSerializer::new);
}
}
}

/**
Expand Down Expand Up @@ -1318,6 +1338,24 @@ public void copy(
public boolean canEqual(Object obj) {
return obj instanceof FlinkKafkaProducer.ContextStateSerializer;
}

// -----------------------------------------------------------------------------------

@Override
public TypeSerializerSnapshot<KafkaTransactionContext> snapshotConfiguration() {
return new ContextStateSerializerSnapshot();
}

/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class ContextStateSerializerSnapshot extends SimpleTypeSerializerSnapshot<KafkaTransactionContext> {

public ContextStateSerializerSnapshot() {
super(ContextStateSerializer::new);
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,12 @@

/**
* A base class for {@link TypeSerializerConfigSnapshot}s that do not have any parameters.
*
* @deprecated this snapshot class is no longer used by any serializers, and is maintained only
* for backward compatibility reasons. It is fully replaced by {@link SimpleTypeSerializerSnapshot}.
*/
@Internal
@Deprecated
public final class ParameterlessTypeSerializerConfig<T> extends TypeSerializerConfigSnapshot<T> {

private static final int VERSION = 1;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,13 @@
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.util.InstantiationUtil;

import javax.annotation.Nonnull;
import javax.annotation.Nullable;

import java.io.IOException;
import java.util.function.Supplier;

import static org.apache.flink.util.Preconditions.checkNotNull;
import static org.apache.flink.util.Preconditions.checkState;

/**
* A simple base class for TypeSerializerSnapshots, for serializers that have no
Expand All @@ -46,24 +44,18 @@ public abstract class SimpleTypeSerializerSnapshot<T> implements TypeSerializerS
* backwards compatible code paths in case we decide to make this snapshot backwards compatible with
* the {@link ParameterlessTypeSerializerConfig}.
*/
private static final int CURRENT_VERSION = 2;
private static final int CURRENT_VERSION = 3;

/** The class of the serializer for this snapshot.
* The field is null if the serializer was created for read and has not been read, yet. */
@Nullable
private Class<? extends TypeSerializer<T>> serializerClass;

/**
* Default constructor for instantiation on restore (reading the snapshot).
*/
@SuppressWarnings("unused")
public SimpleTypeSerializerSnapshot() {}
@Nonnull
private Supplier<? extends TypeSerializer<T>> serializerSupplier;

/**
* Constructor to create snapshot from serializer (writing the snapshot).
*/
public SimpleTypeSerializerSnapshot(@Nonnull Class<? extends TypeSerializer<T>> serializerClass) {
this.serializerClass = checkNotNull(serializerClass);
public SimpleTypeSerializerSnapshot(@Nonnull Supplier<? extends TypeSerializer<T>> serializerSupplier) {
this.serializerSupplier = checkNotNull(serializerSupplier);
}

// ------------------------------------------------------------------------
Expand All @@ -77,42 +69,39 @@ public int getCurrentVersion() {

@Override
public TypeSerializer<T> restoreSerializer() {
checkState(serializerClass != null);
return InstantiationUtil.instantiate(serializerClass);
return serializerSupplier.get();
}

@Override
public TypeSerializerSchemaCompatibility<T> resolveSchemaCompatibility(TypeSerializer<T> newSerializer) {

checkState(serializerClass != null);
return newSerializer.getClass() == serializerClass ?
return newSerializer.getClass() == serializerSupplier.get().getClass() ?
TypeSerializerSchemaCompatibility.compatibleAsIs() :
TypeSerializerSchemaCompatibility.incompatible();
}

@Override
public void writeSnapshot(DataOutputView out) throws IOException {
checkState(serializerClass != null);
out.writeUTF(serializerClass.getName());
//
}

@Override
public void readSnapshot(int readVersion, DataInputView in, ClassLoader classLoader) throws IOException {
switch (readVersion) {
case 2:
read(in, classLoader);
case 3: {
break;
default:
}
case 2: {
// we don't need the classname any more; read and drop to maintain compatibility
in.readUTF();
break;
}
default: {
throw new IOException("Unrecognized version: " + readVersion);
}
}
}

private void read(DataInputView in, ClassLoader classLoader) throws IOException {
final String className = in.readUTF();
final Class<?> clazz = resolveClassName(className, classLoader, false);
this.serializerClass = cast(clazz);
}

// ------------------------------------------------------------------------
// standard utilities
// ------------------------------------------------------------------------
Expand All @@ -131,45 +120,4 @@ public final int hashCode() {
public String toString() {
return getClass().getName();
}

// ------------------------------------------------------------------------
// utilities
// ------------------------------------------------------------------------

private static Class<?> resolveClassName(String className, ClassLoader cl, boolean allowCanonicalName) throws IOException {
try {
return Class.forName(className, false, cl);
}
catch (ClassNotFoundException e) {
if (allowCanonicalName) {
try {
return Class.forName(guessClassNameFromCanonical(className), false, cl);
}
catch (ClassNotFoundException ignored) {}
}

// throw with original ClassNotFoundException
throw new IOException(
"Failed to read SimpleTypeSerializerSnapshot: Serializer class not found: " + className, e);
}
}

@SuppressWarnings("unchecked")
private static <T> Class<? extends TypeSerializer<T>> cast(Class<?> clazz) throws IOException {
if (!TypeSerializer.class.isAssignableFrom(clazz)) {
throw new IOException("Failed to read SimpleTypeSerializerSnapshot. " +
"Serializer class name leads to a class that is not a TypeSerializer: " + clazz.getName());
}

return (Class<? extends TypeSerializer<T>>) clazz;
}

static String guessClassNameFromCanonical(String className) {
int lastDot = className.lastIndexOf('.');
if (lastDot > 0 && lastDot < className.length() - 1) {
return className.substring(0, lastDot) + '$' + className.substring(lastDot + 1);
} else {
return className;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,11 @@ public TypeSerializerSnapshot<BigDecimal> snapshotConfiguration() {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class BigDecSerializerSnapshot extends SimpleTypeSerializerSnapshot<BigDecimal> {

public BigDecSerializerSnapshot() {
super(BigDecSerializer.class);
super(() -> INSTANCE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,10 +156,11 @@ public TypeSerializerSnapshot<BigInteger> snapshotConfiguration() {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class BigIntSerializerSnapshot extends SimpleTypeSerializerSnapshot<BigInteger> {

public BigIntSerializerSnapshot() {
super(BigIntSerializer.class);
super(() -> INSTANCE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ public TypeSerializerSnapshot<Boolean> snapshotConfiguration() {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class BooleanSerializerSnapshot extends SimpleTypeSerializerSnapshot<Boolean> {

public BooleanSerializerSnapshot() {
super(BooleanSerializer.class);
super(() -> INSTANCE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,11 @@ public TypeSerializerSnapshot<BooleanValue> snapshotConfiguration() {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class BooleanValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<BooleanValue> {

public BooleanValueSerializerSnapshot() {
super(BooleanValueSerializer.class);
super(() -> INSTANCE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ public TypeSerializerSnapshot<Byte> snapshotConfiguration() {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class ByteSerializerSnapshot extends SimpleTypeSerializerSnapshot<Byte> {

public ByteSerializerSnapshot() {
super(ByteSerializer.class);
super(() -> INSTANCE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ public TypeSerializerSnapshot<ByteValue> snapshotConfiguration() {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class ByteValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<ByteValue> {

public ByteValueSerializerSnapshot() {
super(ByteValueSerializer.class);
super(() -> INSTANCE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,11 @@ public TypeSerializerSnapshot<Character> snapshotConfiguration() {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class CharSerializerSnapshot extends SimpleTypeSerializerSnapshot<Character> {

public CharSerializerSnapshot() {
super(CharSerializer.class);
super(() -> INSTANCE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,11 @@ public TypeSerializerSnapshot<CharValue> snapshotConfiguration() {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class CharValueSerializerSnapshot extends SimpleTypeSerializerSnapshot<CharValue> {

public CharValueSerializerSnapshot() {
super(CharValueSerializer.class);
super(() -> INSTANCE);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,11 @@ public TypeSerializerSnapshot<Date> snapshotConfiguration() {
/**
* Serializer configuration snapshot for compatibility and format evolution.
*/
@SuppressWarnings("WeakerAccess")
public static final class DateSerializerSnapshot extends SimpleTypeSerializerSnapshot<Date> {

public DateSerializerSnapshot() {
super(DateSerializer.class);
super(() -> INSTANCE);
}
}
}
Loading

0 comments on commit edf6d59

Please sign in to comment.