Skip to content

Commit

Permalink
[FLINK-9337] Implemented AvroDeserializationSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
dawidwys committed May 23, 2018
1 parent c10e03f commit fd101be
Show file tree
Hide file tree
Showing 12 changed files with 490 additions and 43 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.formats.avro;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.formats.avro.typeutils.AvroTypeInfo;
import org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo;
import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
import org.apache.flink.util.Preconditions;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificData;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificRecord;

import javax.annotation.Nullable;

import java.io.IOException;

/**
* Deserialization schema that deserializes from Avro binary format.
*
* @param <T> type of record it produces
*/
public class AvroDeserializationSchema<T> implements DeserializationSchema<T> {

/**
* Creates {@link AvroDeserializationSchema} that produces {@link GenericRecord} using provided schema.
*
* @param schema schema of produced records
* @return deserialized record in form of {@link GenericRecord}
*/
public static AvroDeserializationSchema<GenericRecord> forGeneric(Schema schema) {
return new AvroDeserializationSchema<>(GenericRecord.class, schema);
}

/**
* Creates {@link AvroDeserializationSchema} that produces classes that were generated from avro schema.
*
* @param tClass class of record to be produced
* @return deserialized record
*/
public static <T extends SpecificRecord> AvroDeserializationSchema<T> forSpecific(Class<T> tClass) {
return new AvroDeserializationSchema<>(tClass, null);
}

private static final long serialVersionUID = -6766681879020862312L;

/** Class to deserialize to. */
private final Class<T> recordClazz;

/** Schema in case of GenericRecord for serialization purpose. */
private final String schemaString;

/** Reader that deserializes byte array into a record. */
private transient GenericDatumReader<T> datumReader;

/** Input stream to read message from. */
private transient MutableByteArrayInputStream inputStream;

/** Avro decoder that decodes binary data. */
private transient Decoder decoder;

/** Avro schema for the reader. */
private transient Schema reader;

/**
* Creates a Avro deserialization schema.
*
* @param recordClazz class to which deserialize. Should be one of:
* {@link org.apache.avro.specific.SpecificRecord},
* {@link org.apache.avro.generic.GenericRecord}.
* @param reader reader's Avro schema. Should be provided if recordClazz is
* {@link GenericRecord}
*/
AvroDeserializationSchema(Class<T> recordClazz, @Nullable Schema reader) {
Preconditions.checkNotNull(recordClazz, "Avro record class must not be null.");
this.recordClazz = recordClazz;
this.reader = reader;
if (reader != null) {
this.schemaString = reader.toString();
} else {
this.schemaString = null;
}
}

GenericDatumReader<T> getDatumReader() {
return datumReader;
}

Schema getReaderSchema() {
return reader;
}

MutableByteArrayInputStream getInputStream() {
return inputStream;
}

Decoder getDecoder() {
return decoder;
}

@Override
public T deserialize(byte[] message) throws IOException {
// read record
checkAvroInitialized();
inputStream.setBuffer(message);
Schema readerSchema = getReaderSchema();
GenericDatumReader<T> datumReader = getDatumReader();

datumReader.setSchema(readerSchema);

return datumReader.read(null, decoder);
}

void checkAvroInitialized() {
if (datumReader != null) {
return;
}

ClassLoader cl = Thread.currentThread().getContextClassLoader();
if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
SpecificData specificData = new SpecificData(cl);
this.datumReader = new SpecificDatumReader<>(specificData);
this.reader = specificData.getSchema(recordClazz);
} else {
this.reader = new Schema.Parser().parse(schemaString);
GenericData genericData = new GenericData(cl);
this.datumReader = new GenericDatumReader<>(null, this.reader, genericData);
}

this.inputStream = new MutableByteArrayInputStream();
this.decoder = DecoderFactory.get().binaryDecoder(inputStream, null);
}

@Override
public boolean isEndOfStream(T nextElement) {
return false;
}

@Override
@SuppressWarnings("unchecked")
public TypeInformation<T> getProducedType() {
if (SpecificRecord.class.isAssignableFrom(recordClazz)) {
return new AvroTypeInfo(recordClazz, false);
} else {
return (TypeInformation<T>) new GenericRecordAvroTypeInfo(this.reader);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.formats.avro;

import org.apache.flink.api.common.serialization.AbstractDeserializationSchema;
import org.apache.flink.formats.avro.utils.MutableByteArrayInputStream;
import org.apache.flink.types.Row;
import org.apache.flink.util.Preconditions;

Expand All @@ -31,7 +32,6 @@
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.util.Utf8;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
Expand Down Expand Up @@ -153,27 +153,4 @@ private static Object convertToRow(Schema schema, Object recordObj) {
}
}

/**
* An extension of the ByteArrayInputStream that allows to change a buffer that should be
* read without creating a new ByteArrayInputStream instance. This allows to re-use the same
* InputStream instance, copying message to process, and creation of Decoder on every new message.
*/
private static final class MutableByteArrayInputStream extends ByteArrayInputStream {

public MutableByteArrayInputStream() {
super(new byte[0]);
}

/**
* Set buffer that can be read via the InputStream interface and reset the input stream.
* This has the same effect as creating a new ByteArrayInputStream with a new buffer.
*
* @param buf the new buffer to read.
*/
public void setBuffer(byte[] buf) {
this.buf = buf;
this.pos = 0;
this.count = buf.length;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@
import org.apache.avro.SchemaCompatibility;
import org.apache.avro.SchemaCompatibility.SchemaCompatibilityType;
import org.apache.avro.SchemaCompatibility.SchemaPairCompatibility;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericDatumReader;
import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.reflect.ReflectData;
import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.reflect.ReflectDatumWriter;
Expand All @@ -44,15 +48,19 @@

import java.io.IOException;

import static org.apache.flink.util.Preconditions.checkArgument;
import static org.apache.flink.util.Preconditions.checkNotNull;

/**
* A serializer that serializes types via Avro.
*
* <p>The serializer supports both efficient specific record serialization for
* types generated via Avro, as well as serialization via reflection
* (ReflectDatumReader / -Writer). The serializer instantiates them depending on
* the class of the type it should serialize.
* <p>The serializer supports:
* <ul>
* <li>efficient specific record serialization for types generated via Avro</li>
* <li>serialization via reflection (ReflectDatumReader / -Writer)</li>
* <li>serialization of generic records via GenericDatumReader / -Writer</li>
* </ul>
* The serializer instantiates them depending on the class of the type it should serialize.
*
* <p><b>Important:</b> This serializer is NOT THREAD SAFE, because it reuses the data encoders
* and decoders which have buffers that would be shared between the threads if used concurrently
Expand All @@ -77,15 +85,17 @@ public class AvroSerializer<T> extends TypeSerializer<T> {
/** The class of the type that is serialized by this serializer. */
private final Class<T> type;

private final String schemaString;

// -------- runtime fields, non-serializable, lazily initialized -----------

private transient SpecificDatumWriter<T> writer;
private transient SpecificDatumReader<T> reader;
private transient GenericDatumWriter<T> writer;
private transient GenericDatumReader<T> reader;

private transient DataOutputEncoder encoder;
private transient DataInputDecoder decoder;

private transient SpecificData avroData;
private transient GenericData avroData;

private transient Schema schema;

Expand All @@ -99,9 +109,28 @@ public class AvroSerializer<T> extends TypeSerializer<T> {

/**
* Creates a new AvroSerializer for the type indicated by the given class.
* This constructor is intended to be used with {@link SpecificRecord} or reflection serializer.
* For serializing {@link GenericData.Record} use {@link AvroSerializer#AvroSerializer(Class, Schema)}
*/
public AvroSerializer(Class<T> type) {
checkArgument(!isGenericRecord(type),
"For GenericData.Record use constructor with explicit schema.");
this.type = checkNotNull(type);
this.schemaString = null;
}

/**
* Creates a new AvroSerializer for the type indicated by the given class.
* This constructor is expected to be used only with {@link GenericData.Record}.
* For {@link SpecificRecord} or reflection serializer use
* {@link AvroSerializer#AvroSerializer(Class)}
*/
public AvroSerializer(Class<T> type, Schema schema) {
checkArgument(isGenericRecord(type),
"For classes other than GenericData.Record use constructor without explicit schema.");
this.type = checkNotNull(type);
this.schema = checkNotNull(schema);
this.schemaString = schema.toString();
}

/**
Expand Down Expand Up @@ -275,9 +304,19 @@ else if (configSnapshot instanceof AvroSerializerConfigSnapshot) {
// Utilities
// ------------------------------------------------------------------------

private static boolean isGenericRecord(Class<?> type) {
return !SpecificRecord.class.isAssignableFrom(type) &&
GenericRecord.class.isAssignableFrom(type);
}

@Override
public TypeSerializer<T> duplicate() {
return new AvroSerializer<>(type);
if (schemaString != null) {
return new AvroSerializer<>(type, schema);
} else {
return new AvroSerializer<>(type);

}
}

@Override
Expand Down Expand Up @@ -323,15 +362,23 @@ private void initializeAvro() {
final ClassLoader cl = Thread.currentThread().getContextClassLoader();

if (SpecificRecord.class.isAssignableFrom(type)) {
this.avroData = new SpecificData(cl);
this.schema = this.avroData.getSchema(type);
this.reader = new SpecificDatumReader<>(schema, schema, avroData);
this.writer = new SpecificDatumWriter<>(schema, avroData);
}
else {
SpecificData specificData = new SpecificData(cl);
this.avroData = specificData;
this.schema = specificData.getSchema(type);
this.reader = new SpecificDatumReader<>(schema, schema, specificData);
this.writer = new SpecificDatumWriter<>(schema, specificData);
} else if (GenericRecord.class.isAssignableFrom(type)) {
if (schema == null) {
this.schema = new Schema.Parser().parse(schemaString);
}
GenericData genericData = new GenericData(cl);
this.avroData = genericData;
this.reader = new GenericDatumReader<>(schema, schema, genericData);
this.writer = new GenericDatumWriter<>(schema, genericData);
} else {
final ReflectData reflectData = new ReflectData(cl);
this.avroData = reflectData;
this.schema = this.avroData.getSchema(type);
this.schema = reflectData.getSchema(type);
this.reader = new ReflectDatumReader<>(schema, schema, reflectData);
this.writer = new ReflectDatumWriter<>(schema, reflectData);
}
Expand Down
Loading

0 comments on commit fd101be

Please sign in to comment.