Skip to content

Commit

Permalink
Merge pull request apache#10248: [BEAM-7274] Add type conversions fac…
Browse files Browse the repository at this point in the history
…tory
  • Loading branch information
reuvenlax committed Dec 3, 2019
1 parent 2cc1619 commit de6cdb0
Show file tree
Hide file tree
Showing 16 changed files with 481 additions and 188 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.beam.sdk.schemas.utils.AutoValueUtils;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory;
import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
import org.apache.beam.sdk.schemas.utils.ReflectUtils;
Expand Down Expand Up @@ -60,22 +61,31 @@ public List<FieldValueTypeInformation> get(Class<?> clazz) {
}

@Override
List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema) {
return JavaBeanUtils.getGetters(targetClass, schema, AbstractGetterTypeSupplier.INSTANCE);
public List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema) {
return JavaBeanUtils.getGetters(
targetClass,
schema,
AbstractGetterTypeSupplier.INSTANCE,
new DefaultTypeConversionsFactory());
}

@Override
List<FieldValueTypeInformation> fieldValueTypeInformations(Class<?> targetClass, Schema schema) {
public List<FieldValueTypeInformation> fieldValueTypeInformations(
Class<?> targetClass, Schema schema) {
return JavaBeanUtils.getFieldTypes(targetClass, schema, AbstractGetterTypeSupplier.INSTANCE);
}

@Override
SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema) {
public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema) {
// If a static method is marked with @SchemaCreate, use that.
Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetClass);
if (annotated != null) {
return JavaBeanUtils.getStaticCreator(
targetClass, annotated, schema, AbstractGetterTypeSupplier.INSTANCE);
targetClass,
annotated,
schema,
AbstractGetterTypeSupplier.INSTANCE,
new DefaultTypeConversionsFactory());
}

// Try to find a generated builder class. If one exists, use that to generate a
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,18 @@ public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
}

@Override
List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema) {
public List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema) {
return AvroUtils.getGetters(targetClass, schema);
}

@Override
List<FieldValueTypeInformation> fieldValueTypeInformations(Class<?> targetClass, Schema schema) {
public List<FieldValueTypeInformation> fieldValueTypeInformations(
Class<?> targetClass, Schema schema) {
return AvroUtils.getFieldTypes(targetClass, schema);
}

@Override
SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema) {
public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema) {
return AvroUtils.getCreator(targetClass, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
@Experimental(Kind.SCHEMAS)
public abstract class GetterBasedSchemaProvider implements SchemaProvider {
/** Implementing class should override to return FieldValueGetters. */
abstract List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema);
public abstract List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema);

/** Implementing class should override to return a list of type-informations. */
abstract List<FieldValueTypeInformation> fieldValueTypeInformations(
public abstract List<FieldValueTypeInformation> fieldValueTypeInformations(
Class<?> targetClass, Schema schema);

/** Implementing class should override to return a constructor. */
abstract SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema);
public abstract SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema);

private class ToRowWithValueGetters<T> implements SerializableFunction<T, Row> {
private final Schema schema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory;
import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
import org.apache.beam.sdk.schemas.utils.JavaBeanUtils;
import org.apache.beam.sdk.schemas.utils.ReflectUtils;
Expand Down Expand Up @@ -119,29 +120,39 @@ public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
}

@Override
List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema) {
return JavaBeanUtils.getGetters(targetClass, schema, GetterTypeSupplier.INSTANCE);
public List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema) {
return JavaBeanUtils.getGetters(
targetClass, schema, GetterTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory());
}

@Override
List<FieldValueTypeInformation> fieldValueTypeInformations(Class<?> targetClass, Schema schema) {
public List<FieldValueTypeInformation> fieldValueTypeInformations(
Class<?> targetClass, Schema schema) {
return JavaBeanUtils.getFieldTypes(targetClass, schema, GetterTypeSupplier.INSTANCE);
}

@Override
SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema) {
public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema) {
// If a static method is marked with @SchemaCreate, use that.
Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetClass);
if (annotated != null) {
return JavaBeanUtils.getStaticCreator(
targetClass, annotated, schema, GetterTypeSupplier.INSTANCE);
targetClass,
annotated,
schema,
GetterTypeSupplier.INSTANCE,
new DefaultTypeConversionsFactory());
}

// If a Constructor was tagged with @SchemaCreate, invoke that constructor.
Constructor<?> constructor = ReflectUtils.getAnnotatedConstructor(targetClass);
if (constructor != null) {
return JavaBeanUtils.getConstructorCreator(
targetClass, constructor, schema, GetterTypeSupplier.INSTANCE);
targetClass,
constructor,
schema,
GetterTypeSupplier.INSTANCE,
new DefaultTypeConversionsFactory());
}

// Else try to make a setter-based creator
Expand All @@ -154,7 +165,8 @@ SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema) {
public static class JavaBeanSetterFactory implements FieldValueSetterFactory {
@Override
public List<FieldValueSetter> create(Class<?> targetClass, Schema schema) {
return JavaBeanUtils.getSetters(targetClass, schema, SetterTypeSupplier.INSTANCE);
return JavaBeanUtils.getSetters(
targetClass, schema, SetterTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.beam.sdk.annotations.Experimental.Kind;
import org.apache.beam.sdk.schemas.annotations.SchemaFieldName;
import org.apache.beam.sdk.schemas.annotations.SchemaIgnore;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory;
import org.apache.beam.sdk.schemas.utils.FieldValueTypeSupplier;
import org.apache.beam.sdk.schemas.utils.POJOUtils;
import org.apache.beam.sdk.schemas.utils.ReflectUtils;
Expand Down Expand Up @@ -96,31 +97,42 @@ public <T> Schema schemaFor(TypeDescriptor<T> typeDescriptor) {
}

@Override
List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema) {
return POJOUtils.getGetters(targetClass, schema, JavaFieldTypeSupplier.INSTANCE);
public List<FieldValueGetter> fieldValueGetters(Class<?> targetClass, Schema schema) {
return POJOUtils.getGetters(
targetClass, schema, JavaFieldTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory());
}

@Override
List<FieldValueTypeInformation> fieldValueTypeInformations(Class<?> targetClass, Schema schema) {
public List<FieldValueTypeInformation> fieldValueTypeInformations(
Class<?> targetClass, Schema schema) {
return POJOUtils.getFieldTypes(targetClass, schema, JavaFieldTypeSupplier.INSTANCE);
}

@Override
SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema) {
public SchemaUserTypeCreator schemaTypeCreator(Class<?> targetClass, Schema schema) {
// If a static method is marked with @SchemaCreate, use that.
Method annotated = ReflectUtils.getAnnotatedCreateMethod(targetClass);
if (annotated != null) {
return POJOUtils.getStaticCreator(
targetClass, annotated, schema, JavaFieldTypeSupplier.INSTANCE);
targetClass,
annotated,
schema,
JavaFieldTypeSupplier.INSTANCE,
new DefaultTypeConversionsFactory());
}

// If a Constructor was tagged with @SchemaCreate, invoke that constructor.
Constructor<?> constructor = ReflectUtils.getAnnotatedConstructor(targetClass);
if (constructor != null) {
return POJOUtils.getConstructorCreator(
targetClass, constructor, schema, JavaFieldTypeSupplier.INSTANCE);
targetClass,
constructor,
schema,
JavaFieldTypeSupplier.INSTANCE,
new DefaultTypeConversionsFactory());
}

return POJOUtils.getSetFieldCreator(targetClass, schema, JavaFieldTypeSupplier.INSTANCE);
return POJOUtils.getSetFieldCreator(
targetClass, schema, JavaFieldTypeSupplier.INSTANCE, new DefaultTypeConversionsFactory());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.SchemaRegistry;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory;
import org.apache.beam.sdk.schemas.utils.ConvertHelpers;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
Expand Down Expand Up @@ -150,7 +151,8 @@ public void processElement(@Element Row row, OutputReceiver<OutputT> o) {
converted.outputSchemaCoder.getFromRowFunction());
} else {
SerializableFunction<?, OutputT> convertPrimitive =
ConvertHelpers.getConvertPrimitive(converted.unboxedType, outputTypeDescriptor);
ConvertHelpers.getConvertPrimitive(
converted.unboxedType, outputTypeDescriptor, new DefaultTypeConversionsFactory());
output =
input.apply(
ParDo.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import java.lang.reflect.Method;
import java.lang.reflect.Modifier;
import java.lang.reflect.Parameter;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
Expand All @@ -34,9 +35,10 @@
import org.apache.beam.sdk.schemas.FieldValueTypeInformation;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertValueForSetter;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.DefaultTypeConversionsFactory;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.InjectPackageStrategy;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversion;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversionsFactory;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.bytebuddy.v1_9_3.net.bytebuddy.ByteBuddy;
Expand Down Expand Up @@ -114,7 +116,11 @@ public static SchemaUserTypeCreator getConstructorCreator(
.map(
c ->
JavaBeanUtils.getConstructorCreator(
generatedClass, c, schema, fieldValueTypeSupplier))
generatedClass,
c,
schema,
fieldValueTypeSupplier,
new DefaultTypeConversionsFactory()))
.orElse(null);
}

Expand Down Expand Up @@ -236,6 +242,7 @@ public InstrumentedType prepare(InstrumentedType instrumentedType) {

@Override
public ByteCodeAppender appender(final Target implementationTarget) {
TypeConversionsFactory typeConversionsFactory = new DefaultTypeConversionsFactory();
ForLoadedType loadedBuilder = new ForLoadedType(builderClass);
return (methodVisitor, implementationContext, instrumentedMethod) -> {
// this + method parameters.
Expand All @@ -252,13 +259,13 @@ public ByteCodeAppender appender(final Target implementationTarget) {
ElementMatchers.isConstructor().and(ElementMatchers.takesArguments(0)))
.getOnly()));

ConvertType convertType = new ConvertType(true);
TypeConversion<Type> convertType = typeConversionsFactory.createTypeConversion(true);
for (int i = 0; i < setters.size(); ++i) {
Method setterMethod = checkNotNull(setters.get(i).getMethod());
Parameter parameter = setterMethod.getParameters()[0];
ForLoadedType convertedType =
new ForLoadedType(
(Class) convertType.convert(TypeDescriptor.of(parameter.getType())));
(Class) convertType.convert(TypeDescriptor.of(parameter.getParameterizedType())));

StackManipulation readParameter =
new StackManipulation.Compound(
Expand All @@ -271,7 +278,8 @@ public ByteCodeAppender appender(final Target implementationTarget) {
new StackManipulation.Compound(
stackManipulation,
Duplication.SINGLE,
new ConvertValueForSetter(readParameter)
typeConversionsFactory
.createSetterConversions(readParameter)
.convert(TypeDescriptor.of(parameter.getType())),
MethodInvocation.invoke(new ForLoadedMethod(setterMethod)),
Removal.SINGLE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,15 @@

import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Type;
import java.util.Map;
import org.apache.avro.specific.SpecificRecord;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaUserTypeCreator;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.ConvertType;
import org.apache.beam.sdk.schemas.utils.AvroUtils.AvroTypeConversionFactory;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.InjectPackageStrategy;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversion;
import org.apache.beam.sdk.schemas.utils.ByteBuddyUtils.TypeConversionsFactory;
import org.apache.beam.sdk.schemas.utils.ReflectUtils.ClassWithSchema;
import org.apache.beam.sdk.util.common.ReflectHelpers;
import org.apache.beam.sdk.values.TypeDescriptor;
Expand Down Expand Up @@ -101,11 +104,13 @@ private static <T> SchemaUserTypeCreator createCreator(Class<T> clazz, Schema sc

private static StackManipulation readAndConvertParameter(
Class<?> constructorParameterType, int index) {
TypeConversionsFactory typeConversionsFactory = new AvroTypeConversionFactory();

// The types in the AVRO-generated constructor might be the types returned by Beam's Row class,
// so we have to convert the types used by Beam's Row class.
// We know that AVRO generates constructor parameters in the same order as fields
// in the schema, so we can just add the parameters sequentially.
ConvertType convertType = new ConvertType(true);
TypeConversion<Type> convertType = typeConversionsFactory.createTypeConversion(true);

// Map the AVRO-generated type to the one Beam will use.
ForLoadedType convertedType =
Expand All @@ -121,7 +126,8 @@ private static StackManipulation readAndConvertParameter(
TypeCasting.to(convertedType));

// Convert to the parameter accepted by the SpecificRecord constructor.
return new ByteBuddyUtils.ConvertValueForSetter(readParameter)
return typeConversionsFactory
.createSetterConversions(readParameter)
.convert(TypeDescriptor.of(constructorParameterType));
}
}
Loading

0 comments on commit de6cdb0

Please sign in to comment.