Skip to content

Commit

Permalink
[FLINK-12501] Use SpecificRecord.getSchema in AvroFactory
Browse files Browse the repository at this point in the history
Before, we were using SpecificData.getSchema(type) which was not working
for types that were generated using Avrohugger (for Scala) because
the SCHEMA was generated in the companion object. Now we use a method
that must be available on all SpecificRecord(s).

We still use the old method as a fallback if we cannot instantiate or
call getSchema() on the instance.
  • Loading branch information
aljoscha committed Sep 6, 2019
1 parent 878b6b1 commit f43138e
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.avro.specific.SpecificDatumWriter;
import org.apache.avro.specific.SpecificRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Optional;

Expand All @@ -50,6 +52,8 @@
@Internal
final class AvroFactory<T> {

private static final Logger LOG = LoggerFactory.getLogger(AvroFactory.class);

private final DataOutputEncoder encoder = new DataOutputEncoder();
private final DataInputDecoder decoder = new DataInputDecoder();

Expand Down Expand Up @@ -94,7 +98,7 @@ static Schema parseSchemaString(@Nullable String schemaString) {
@SuppressWarnings("OptionalUsedAsFieldOrParameterType")
private static <T> AvroFactory<T> fromSpecific(Class<T> type, ClassLoader cl, Optional<Schema> previousSchema) {
SpecificData specificData = new SpecificData(cl);
Schema newSchema = specificData.getSchema(type);
Schema newSchema = extractAvroSpecificSchema(type, specificData);

return new AvroFactory<>(
specificData,
Expand Down Expand Up @@ -130,6 +134,36 @@ private static <T> AvroFactory<T> fromReflective(Class<T> type, ClassLoader cl,
);
}

/**
* Extracts an Avro {@link Schema} from a {@link SpecificRecord}. We do this either via {@link
* SpecificData} or by instantiating a record and extracting the schema from the instance.
*/
static <T> Schema extractAvroSpecificSchema(
Class<T> type,
SpecificData specificData) {
Optional<Schema> newSchemaOptional = tryExtractAvroSchemaViaInstance(type);
return newSchemaOptional.orElseGet(() -> specificData.getSchema(type));
}

/**
* Extracts an Avro {@link Schema} from a {@link SpecificRecord}. We do this by creating an
* instance of the class using the zero-argument constructor and calling {@link
* SpecificRecord#getSchema()} on it.
*/
@SuppressWarnings("unchecked")
private static Optional<Schema> tryExtractAvroSchemaViaInstance(Class<?> type) {
try {
SpecificRecord instance = (SpecificRecord) type.newInstance();
return Optional.ofNullable(instance.getSchema());
} catch (InstantiationException | IllegalAccessException e) {
LOG.warn(
"Could not extract schema from Avro-generated SpecificRecord class {}: {}.",
type,
e);
return Optional.empty();
}
}

private AvroFactory(
GenericData avroData,
Schema schema,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ private static Schema tryExtractAvroSchema(ClassLoader cl, Class<?> runtimeType)
}
if (isSpecificRecord(runtimeType)) {
SpecificData d = new SpecificData(cl);
return d.getSchema(runtimeType);
return AvroFactory.extractAvroSpecificSchema(runtimeType, d);
}
ReflectData d = new ReflectData(cl);
return d.getSchema(runtimeType);
Expand Down

0 comments on commit f43138e

Please sign in to comment.