Skip to content

Commit

Permalink
[FLINK-6022] [avro] Use Avro to serialize Avro in flight and in State
Browse files Browse the repository at this point in the history
This falls back to the original serializer (Pojo / Kryo) in cases where
an old snapshot is resumed.
  • Loading branch information
StephanEwen committed Nov 6, 2017
1 parent 88737cf commit f3a2197
Show file tree
Hide file tree
Showing 13 changed files with 1,208 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -309,6 +309,10 @@ public TypeSerializer<T> createSerializer(ExecutionConfig config) {
return AvroUtils.getAvroUtils().createAvroSerializer(getTypeClass());
}

return createPojoSerializer(config);
}

public PojoSerializer<T> createPojoSerializer(ExecutionConfig config) {
TypeSerializer<?>[] fieldSerializers = new TypeSerializer<?>[fields.length];
Field[] reflectiveFields = new Field[fields.length];

Expand All @@ -319,7 +323,7 @@ public TypeSerializer<T> createSerializer(ExecutionConfig config) {

return new PojoSerializer<T>(getTypeClass(), fieldSerializers, reflectiveFields, config);
}

@Override
public boolean equals(Object obj) {
if (obj instanceof PojoTypeInfo) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,6 @@
public class AvroTypeInfo<T extends SpecificRecordBase> extends PojoTypeInfo<T> {

public AvroTypeInfo(Class<T> typeClass) {
super(typeClass, generateFieldsFromAvroSchema(typeClass));
super(typeClass, generateFieldsFromAvroSchema(typeClass, true));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,337 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http:https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.api.java.typeutils.runtime;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.CompatibilityResult;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot;
import org.apache.flink.api.java.typeutils.runtime.kryo.Serializers;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.formats.avro.utils.DataInputDecoder;
import org.apache.flink.formats.avro.utils.DataOutputEncoder;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;

import com.esotericsoftware.kryo.Kryo;

import org.apache.avro.generic.GenericData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.avro.util.Utf8;

import org.objenesis.strategy.StdInstantiatorStrategy;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.LinkedHashMap;
import java.util.Map;

import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* Old deprecated Avro serializer. It is retained for a smoother experience when
* upgrading from an earlier Flink savepoint that stored this serializer.
*/
@Internal
@Deprecated
@SuppressWarnings({"unused", "deprecation"})
public final class AvroSerializer<T> extends TypeSerializer<T> {

private static final long serialVersionUID = 1L;

private final Class<T> type;

private final Class<? extends T> typeToInstantiate;

/**
* Map of class tag (using classname as tag) to their Kryo registration.
*
* <p>This map serves as a preview of the final registration result of
* the Kryo instance, taking into account registration overwrites.
*/
private LinkedHashMap<String, KryoRegistration> kryoRegistrations;

private transient ReflectDatumWriter<T> writer;
private transient ReflectDatumReader<T> reader;

private transient DataOutputEncoder encoder;
private transient DataInputDecoder decoder;

private transient Kryo kryo;

private transient T deepCopyInstance;

// --------------------------------------------------------------------------------------------

public AvroSerializer(Class<T> type) {
this(type, type);
}

public AvroSerializer(Class<T> type, Class<? extends T> typeToInstantiate) {
this.type = checkNotNull(type);
this.typeToInstantiate = checkNotNull(typeToInstantiate);

InstantiationUtil.checkForInstantiation(typeToInstantiate);

this.kryoRegistrations = buildKryoRegistrations(type);
}

// --------------------------------------------------------------------------------------------

@Override
public boolean isImmutableType() {
return false;
}

@Override
public AvroSerializer<T> duplicate() {
return new AvroSerializer<>(type, typeToInstantiate);
}

@Override
public T createInstance() {
return InstantiationUtil.instantiate(this.typeToInstantiate);
}

@Override
public T copy(T from) {
checkKryoInitialized();

return KryoUtils.copy(from, kryo, this);
}

@Override
public T copy(T from, T reuse) {
checkKryoInitialized();

return KryoUtils.copy(from, reuse, kryo, this);
}

@Override
public int getLength() {
return -1;
}

@Override
public void serialize(T value, DataOutputView target) throws IOException {
checkAvroInitialized();
this.encoder.setOut(target);
this.writer.write(value, this.encoder);
}

@Override
public T deserialize(DataInputView source) throws IOException {
checkAvroInitialized();
this.decoder.setIn(source);
return this.reader.read(null, this.decoder);
}

@Override
public T deserialize(T reuse, DataInputView source) throws IOException {
checkAvroInitialized();
this.decoder.setIn(source);
return this.reader.read(reuse, this.decoder);
}

@Override
public void copy(DataInputView source, DataOutputView target) throws IOException {
checkAvroInitialized();

if (this.deepCopyInstance == null) {
this.deepCopyInstance = InstantiationUtil.instantiate(type, Object.class);
}

this.decoder.setIn(source);
this.encoder.setOut(target);

T tmp = this.reader.read(this.deepCopyInstance, this.decoder);
this.writer.write(tmp, this.encoder);
}

private void checkAvroInitialized() {
if (this.reader == null) {
this.reader = new ReflectDatumReader<>(type);
this.writer = new ReflectDatumWriter<>(type);
this.encoder = new DataOutputEncoder();
this.decoder = new DataInputDecoder();
}
}

private void checkKryoInitialized() {
if (this.kryo == null) {
this.kryo = new Kryo();

Kryo.DefaultInstantiatorStrategy instantiatorStrategy = new Kryo.DefaultInstantiatorStrategy();
instantiatorStrategy.setFallbackInstantiatorStrategy(new StdInstantiatorStrategy());
kryo.setInstantiatorStrategy(instantiatorStrategy);

kryo.setAsmEnabled(true);

KryoUtils.applyRegistrations(kryo, kryoRegistrations.values());
}
}

// --------------------------------------------------------------------------------------------

@Override
public int hashCode() {
return 31 * this.type.hashCode() + this.typeToInstantiate.hashCode();
}

@Override
public boolean equals(Object obj) {
if (obj instanceof AvroSerializer) {
@SuppressWarnings("unchecked")
AvroSerializer<T> avroSerializer = (AvroSerializer<T>) obj;

return avroSerializer.canEqual(this) &&
type == avroSerializer.type &&
typeToInstantiate == avroSerializer.typeToInstantiate;
} else {
return false;
}
}

@Override
public boolean canEqual(Object obj) {
return obj instanceof AvroSerializer;
}

// --------------------------------------------------------------------------------------------
// Serializer configuration snapshotting & compatibility
// --------------------------------------------------------------------------------------------

@Override
public AvroSerializerConfigSnapshot<T> snapshotConfiguration() {
return new AvroSerializerConfigSnapshot<>(type, typeToInstantiate, kryoRegistrations);
}

@SuppressWarnings("unchecked")
@Override
public CompatibilityResult<T> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) {
if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
final AvroSerializerConfigSnapshot<T> config = (AvroSerializerConfigSnapshot<T>) configSnapshot;

if (type.equals(config.getTypeClass()) && typeToInstantiate.equals(config.getTypeToInstantiate())) {
// resolve Kryo registrations; currently, since the Kryo registrations in Avro
// are fixed, there shouldn't be a problem with the resolution here.

LinkedHashMap<String, KryoRegistration> oldRegistrations = config.getKryoRegistrations();
oldRegistrations.putAll(kryoRegistrations);

for (Map.Entry<String, KryoRegistration> reconfiguredRegistrationEntry : kryoRegistrations.entrySet()) {
if (reconfiguredRegistrationEntry.getValue().isDummy()) {
return CompatibilityResult.requiresMigration();
}
}

this.kryoRegistrations = oldRegistrations;
return CompatibilityResult.compatible();
}
}

// ends up here if the preceding serializer is not
// the ValueSerializer, or serialized data type has changed
return CompatibilityResult.requiresMigration();
}

/**
* Config snapshot for this serializer.
*/
public static class AvroSerializerConfigSnapshot<T> extends KryoRegistrationSerializerConfigSnapshot<T> {

private static final int VERSION = 1;

private Class<? extends T> typeToInstantiate;

public AvroSerializerConfigSnapshot() {}

public AvroSerializerConfigSnapshot(
Class<T> baseType,
Class<? extends T> typeToInstantiate,
LinkedHashMap<String, KryoRegistration> kryoRegistrations) {

super(baseType, kryoRegistrations);
this.typeToInstantiate = Preconditions.checkNotNull(typeToInstantiate);
}

@Override
public void write(DataOutputView out) throws IOException {
super.write(out);

out.writeUTF(typeToInstantiate.getName());
}

@SuppressWarnings("unchecked")
@Override
public void read(DataInputView in) throws IOException {
super.read(in);

String classname = in.readUTF();
try {
typeToInstantiate = (Class<? extends T>) Class.forName(classname, true, getUserCodeClassLoader());
} catch (ClassNotFoundException e) {
throw new IOException("Cannot find requested class " + classname + " in classpath.", e);
}
}

@Override
public int getVersion() {
return VERSION;
}

public Class<? extends T> getTypeToInstantiate() {
return typeToInstantiate;
}
}

// --------------------------------------------------------------------------------------------

private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
in.defaultReadObject();

// kryoRegistrations may be null if this Avro serializer is deserialized from an old version
if (kryoRegistrations == null) {
this.kryoRegistrations = buildKryoRegistrations(type);
}
}

private static <T> LinkedHashMap<String, KryoRegistration> buildKryoRegistrations(Class<T> serializedDataType) {
final LinkedHashMap<String, KryoRegistration> registrations = new LinkedHashMap<>();

// register Avro types.
registrations.put(
GenericData.Array.class.getName(),
new KryoRegistration(
GenericData.Array.class,
new ExecutionConfig.SerializableSerializer<>(new Serializers.SpecificInstanceCollectionSerializerForArrayList())));
registrations.put(Utf8.class.getName(), new KryoRegistration(Utf8.class));
registrations.put(GenericData.EnumSymbol.class.getName(), new KryoRegistration(GenericData.EnumSymbol.class));
registrations.put(GenericData.Fixed.class.getName(), new KryoRegistration(GenericData.Fixed.class));
registrations.put(GenericData.StringType.class.getName(), new KryoRegistration(GenericData.StringType.class));

// register the serialized data type
registrations.put(serializedDataType.getName(), new KryoRegistration(serializedDataType));

return registrations;
}
}
Loading

0 comments on commit f3a2197

Please sign in to comment.