Skip to content

Commit

Permalink
[FLINK-3602] Fix TypeExtractor and add support for recursive types
Browse files Browse the repository at this point in the history
This closes apache#1787
  • Loading branch information
twalthr authored and fhueske committed Mar 22, 2016
1 parent 77c867a commit 7785288
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ public static final class MyAvroType {

public String theString;

public MyAvroType recursive;

private double aDouble;

public double getaDouble() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,15 @@ public AvroTypeInfo(Class<T> typeClass) {

private static <T extends SpecificRecordBase> List<PojoField> generateFieldsFromAvroSchema(Class<T> typeClass) {
PojoTypeExtractor pte = new PojoTypeExtractor();
TypeInformation ti = pte.analyzePojo(typeClass, new ArrayList<Type>(), null, null, null);
ArrayList<Type> typeHierarchy = new ArrayList<>();
typeHierarchy.add(typeClass);
TypeInformation ti = pte.analyzePojo(typeClass, typeHierarchy, null, null, null);

if(!(ti instanceof PojoTypeInfo)) {
throw new IllegalStateException("Expecting type to be a PojoTypeInfo");
}
PojoTypeInfo pti = (PojoTypeInfo) ti;
List<PojoField> newFields = new ArrayList<PojoField>(pti.getTotalFields());
List<PojoField> newFields = new ArrayList<>(pti.getTotalFields());

for(int i = 0; i < pti.getArity(); i++) {
PojoField f = pti.getPojoFieldAt(i);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1322,11 +1322,22 @@ private <X> TypeInformation<X> privateGetForClass(Class<X> clazz, ArrayList<Type
private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz, ArrayList<Type> typeHierarchy,
ParameterizedType parameterizedType, TypeInformation<IN1> in1Type, TypeInformation<IN2> in2Type) {
Preconditions.checkNotNull(clazz);


// Object is handled as generic type info
if (clazz.equals(Object.class)) {
return new GenericTypeInfo<>(clazz);
}

// Class is handled as generic type info
if (clazz.equals(Class.class)) {
return new GenericTypeInfo<OUT>(clazz);
}


// recursive types are handled as generic type info
if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
return new GenericTypeInfo<>(clazz);
}

// check for arrays
if (clazz.isArray()) {

Expand Down Expand Up @@ -1394,20 +1405,11 @@ private <OUT,IN1,IN2> TypeInformation<OUT> privateGetForClass(Class<OUT> clazz,
return new AvroTypeInfo(clazz);
}

if (countTypeInHierarchy(typeHierarchy, clazz) > 1) {
return new GenericTypeInfo<OUT>(clazz);
}

if (Modifier.isInterface(clazz.getModifiers())) {
// Interface has no members and is therefore not handled as POJO
return new GenericTypeInfo<OUT>(clazz);
}

if (clazz.equals(Class.class)) {
// special case handling for Class, this should not be handled by the POJO logic
return new GenericTypeInfo<OUT>(clazz);
}

try {
TypeInformation<OUT> pojoType = analyzePojo(clazz, new ArrayList<Type>(typeHierarchy), parameterizedType, in1Type, in2Type);
if (pojoType != null) {
Expand Down

0 comments on commit 7785288

Please sign in to comment.