Skip to content

Commit

Permalink
Merge pull request apache#10449: [BEAM-7274] Implement the Protobuf s…
Browse files Browse the repository at this point in the history
…chema provider for compiled protocol buffers
  • Loading branch information
reuvenlax committed Dec 25, 2019
1 parent 693bc91 commit bb3c63b
Show file tree
Hide file tree
Showing 17 changed files with 1,381 additions and 74 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
*
* <p>An interface to set a field of a class.
*
* <p>Implementations of this interface are generated at runtime to map Row fields back into objet
* <p>Implementations of this interface are generated at runtime to map Row fields back into object
* fields.
*/
@Internal
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Collections;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.schemas.utils.ReflectUtils;
import org.apache.beam.sdk.values.TypeDescriptor;

Expand All @@ -47,6 +50,8 @@ public abstract class FieldValueTypeInformation implements Serializable {
@Nullable
public abstract Method getMethod();

public abstract Map<String, FieldValueTypeInformation> getOneOfTypes();

/** If the field is a container type, returns the element type. */
@Nullable
public abstract FieldValueTypeInformation getElementType();
Expand All @@ -62,7 +67,7 @@ public abstract class FieldValueTypeInformation implements Serializable {
abstract Builder toBuilder();

@AutoValue.Builder
abstract static class Builder {
public abstract static class Builder {
public abstract Builder setName(String name);

public abstract Builder setNullable(boolean nullable);
Expand All @@ -75,6 +80,8 @@ abstract static class Builder {

public abstract Builder setMethod(@Nullable Method method);

public abstract Builder setOneOfTypes(Map<String, FieldValueTypeInformation> oneOfTypes);

public abstract Builder setElementType(@Nullable FieldValueTypeInformation elementType);

public abstract Builder setMapKeyType(@Nullable FieldValueTypeInformation mapKeyType);
Expand All @@ -84,6 +91,22 @@ abstract static class Builder {
abstract FieldValueTypeInformation build();
}

public static FieldValueTypeInformation forOneOf(
String name, boolean nullable, Map<String, FieldValueTypeInformation> oneOfTypes) {
final TypeDescriptor<OneOfType.Value> typeDescriptor = TypeDescriptor.of(OneOfType.Value.class);
return new AutoValue_FieldValueTypeInformation.Builder()
.setName(name)
.setNullable(nullable)
.setType(typeDescriptor)
.setRawType(typeDescriptor.getRawType())
.setField(null)
.setElementType(null)
.setMapKeyType(null)
.setMapValueType(null)
.setOneOfTypes(oneOfTypes)
.build();
}

public static FieldValueTypeInformation forField(Field field) {
TypeDescriptor type = TypeDescriptor.of(field.getGenericType());
return new AutoValue_FieldValueTypeInformation.Builder()
Expand All @@ -95,6 +118,7 @@ public static FieldValueTypeInformation forField(Field field) {
.setElementType(getIterableComponentType(field))
.setMapKeyType(getMapKeyType(field))
.setMapValueType(getMapValueType(field))
.setOneOfTypes(Collections.emptyMap())
.build();
}

Expand All @@ -119,6 +143,7 @@ public static FieldValueTypeInformation forGetter(Method method) {
.setElementType(getIterableComponentType(type))
.setMapKeyType(getMapKeyType(type))
.setMapValueType(getMapValueType(type))
.setOneOfTypes(Collections.emptyMap())
.build();
}

Expand Down Expand Up @@ -148,6 +173,7 @@ public static FieldValueTypeInformation forSetter(Method method, String setterPr
.setElementType(getIterableComponentType(type))
.setMapKeyType(getMapKeyType(type))
.setMapValueType(getMapValueType(type))
.setOneOfTypes(Collections.emptyMap())
.build();
}

Expand Down Expand Up @@ -175,6 +201,7 @@ static FieldValueTypeInformation getIterableComponentType(TypeDescriptor valueTy
.setElementType(getIterableComponentType(componentType))
.setMapKeyType(getMapKeyType(componentType))
.setMapValueType(getMapValueType(componentType))
.setOneOfTypes(Collections.emptyMap())
.build();
}

Expand Down Expand Up @@ -217,6 +244,7 @@ private static FieldValueTypeInformation getMapType(TypeDescriptor valueType, in
.setElementType(getIterableComponentType(mapType))
.setMapKeyType(getMapKeyType(mapType))
.setMapValueType(getMapValueType(mapType))
.setOneOfTypes(Collections.emptyMap())
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.logicaltypes.OneOfType;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.RowWithGetters;
Expand Down Expand Up @@ -80,13 +81,7 @@ public <ValueT> ValueT fromRow(
FieldValueTypeInformation typeInformation = checkNotNull(typeInformations.get(i));
params[i] =
fromValue(
type,
row.getValue(i),
typeInformation.getRawType(),
typeInformation.getElementType(),
typeInformation.getMapKeyType(),
typeInformation.getMapValueType(),
typeFactory);
type, row.getValue(i), typeInformation.getRawType(), typeInformation, typeFactory);
}

SchemaUserTypeCreator creator = schemaTypeCreatorFactory.create(clazz, schema);
Expand All @@ -99,10 +94,11 @@ private <ValueT> ValueT fromValue(
FieldType type,
ValueT value,
Type fieldType,
FieldValueTypeInformation elementType,
FieldValueTypeInformation keyType,
FieldValueTypeInformation valueType,
FieldValueTypeInformation fieldValueTypeInformation,
Factory<List<FieldValueTypeInformation>> typeFactory) {
FieldValueTypeInformation elementType = fieldValueTypeInformation.getElementType();
FieldValueTypeInformation keyType = fieldValueTypeInformation.getMapKeyType();
FieldValueTypeInformation valueType = fieldValueTypeInformation.getMapValueType();
if (value == null) {
return null;
}
Expand All @@ -127,6 +123,22 @@ private <ValueT> ValueT fromValue(
valueType,
typeFactory);
} else {
if (type.getTypeName().isLogicalType()
&& OneOfType.IDENTIFIER.equals(type.getLogicalType().getIdentifier())) {
OneOfType oneOfType = type.getLogicalType(OneOfType.class);
OneOfType.Value oneOfValue = oneOfType.toInputType((Row) value);
FieldValueTypeInformation oneOfFieldValueTypeInformation =
checkNotNull(
fieldValueTypeInformation.getOneOfTypes().get(oneOfValue.getCaseType().toString()));
Object fromValue =
fromValue(
oneOfValue.getFieldType(),
oneOfValue.getValue(),
oneOfFieldValueTypeInformation.getRawType(),
oneOfFieldValueTypeInformation,
typeFactory);
return (ValueT) oneOfType.createValue(oneOfValue.getCaseType(), fromValue);
}
return value;
}
}
Expand Down Expand Up @@ -156,9 +168,7 @@ private <ElementT> Collection fromCollectionValue(
elementType,
element,
elementTypeInformation.getType().getType(),
elementTypeInformation.getElementType(),
elementTypeInformation.getMapKeyType(),
elementTypeInformation.getMapValueType(),
elementTypeInformation,
typeFactory));
}

Expand All @@ -175,9 +185,7 @@ private <ElementT> Iterable fromIterableValue(
elementType,
element,
elementTypeInformation.getType().getType(),
elementTypeInformation.getElementType(),
elementTypeInformation.getMapKeyType(),
elementTypeInformation.getMapValueType(),
elementTypeInformation,
typeFactory));
}

Expand All @@ -196,18 +204,14 @@ private <ElementT> Iterable fromIterableValue(
keyType,
entry.getKey(),
keyTypeInformation.getType().getType(),
keyTypeInformation.getElementType(),
keyTypeInformation.getMapKeyType(),
keyTypeInformation.getMapValueType(),
keyTypeInformation,
typeFactory);
Object value =
fromValue(
valueType,
entry.getValue(),
valueTypeInformation.getType().getType(),
valueTypeInformation.getElementType(),
valueTypeInformation.getMapKeyType(),
valueTypeInformation.getMapValueType(),
valueTypeInformation,
typeFactory);
newMap.put(key, value);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class Convert {
* Convert a {@link PCollection}{@literal <InputT>} into a {@link PCollection}{@literal <Row>}.
*
* <p>The input {@link PCollection} must have a schema attached. The output collection will have
* the same schema as the iput.
* the same schema as the input.
*/
public static <InputT> PTransform<PCollection<InputT>, PCollection<Row>> toRows() {
return to(Row.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class AvroByteBuddyUtils {
static <T extends SpecificRecord> SchemaUserTypeCreator getCreator(
Class<T> clazz, Schema schema) {
return CACHED_CREATORS.computeIfAbsent(
new ClassWithSchema(clazz, schema), c -> createCreator(clazz, schema));
ClassWithSchema.create(clazz, schema), c -> createCreator(clazz, schema));
}

private static <T> SchemaUserTypeCreator createCreator(Class<T> clazz, Schema schema) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@ public class ByteBuddyUtils {
new ForLoadedType(ReadableInstant.class);
private static final ForLoadedType READABLE_PARTIAL_TYPE =
new ForLoadedType(ReadablePartial.class);
private static final ForLoadedType OBJECT_TYPE = new ForLoadedType(Object.class);
private static final ForLoadedType INTEGER_TYPE = new ForLoadedType(Integer.class);
private static final ForLoadedType ENUM_TYPE = new ForLoadedType(Enum.class);
private static final ForLoadedType BYTE_BUDDY_UTILS_TYPE =
Expand Down Expand Up @@ -134,7 +133,7 @@ protected String name(TypeDescription superClass) {

// Create a new FieldValueGetter subclass.
@SuppressWarnings("unchecked")
static DynamicType.Builder<FieldValueGetter> subclassGetterInterface(
public static DynamicType.Builder<FieldValueGetter> subclassGetterInterface(
ByteBuddy byteBuddy, Type objectType, Type fieldType) {
TypeDescription.Generic getterGenericType =
TypeDescription.Generic.Builder.parameterizedType(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ public static void validateJavaBean(
public static List<FieldValueTypeInformation> getFieldTypes(
Class<?> clazz, Schema schema, FieldValueTypeSupplier fieldValueTypeSupplier) {
return CACHED_FIELD_TYPES.computeIfAbsent(
new ClassWithSchema(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema));
ClassWithSchema.create(clazz, schema), c -> fieldValueTypeSupplier.get(clazz, schema));
}

// The list of getters for a class is cached, so we only create the classes the first time
Expand All @@ -121,7 +121,7 @@ public static List<FieldValueGetter> getGetters(
FieldValueTypeSupplier fieldValueTypeSupplier,
TypeConversionsFactory typeConversionsFactory) {
return CACHED_GETTERS.computeIfAbsent(
new ClassWithSchema(clazz, schema),
ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
return types.stream()
Expand All @@ -130,7 +130,7 @@ public static List<FieldValueGetter> getGetters(
});
}

private static <T> FieldValueGetter createGetter(
public static <T> FieldValueGetter createGetter(
FieldValueTypeInformation typeInformation, TypeConversionsFactory typeConversionsFactory) {
DynamicType.Builder<FieldValueGetter> builder =
ByteBuddyUtils.subclassGetterInterface(
Expand Down Expand Up @@ -184,7 +184,7 @@ public static List<FieldValueSetter> getSetters(
FieldValueTypeSupplier fieldValueTypeSupplier,
TypeConversionsFactory typeConversionsFactory) {
return CACHED_SETTERS.computeIfAbsent(
new ClassWithSchema(clazz, schema),
ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
return types.stream()
Expand All @@ -193,14 +193,14 @@ public static List<FieldValueSetter> getSetters(
});
}

private static FieldValueSetter createSetter(
public static FieldValueSetter createSetter(
FieldValueTypeInformation typeInformation, TypeConversionsFactory typeConversionsFactory) {
DynamicType.Builder<FieldValueSetter> builder =
ByteBuddyUtils.subclassSetterInterface(
BYTE_BUDDY,
typeInformation.getMethod().getDeclaringClass(),
typeConversionsFactory.createTypeConversion(false).convert(typeInformation.getType()));
builder = implementSetterMethods(builder, typeInformation.getMethod(), typeConversionsFactory);
builder = implementSetterMethods(builder, typeInformation, typeConversionsFactory);
try {
return builder
.make()
Expand All @@ -222,14 +222,13 @@ private static FieldValueSetter createSetter(

private static DynamicType.Builder<FieldValueSetter> implementSetterMethods(
DynamicType.Builder<FieldValueSetter> builder,
Method method,
FieldValueTypeInformation fieldValueTypeInformation,
TypeConversionsFactory typeConversionsFactory) {
FieldValueTypeInformation javaTypeInformation = FieldValueTypeInformation.forSetter(method);
return builder
.method(ElementMatchers.named("name"))
.intercept(FixedValue.reference(javaTypeInformation.getName()))
.intercept(FixedValue.reference(fieldValueTypeInformation.getName()))
.method(ElementMatchers.named("set"))
.intercept(new InvokeSetterInstruction(method, typeConversionsFactory));
.intercept(new InvokeSetterInstruction(fieldValueTypeInformation, typeConversionsFactory));
}

// The list of constructors for a class is cached, so we only create the classes the first time
Expand All @@ -244,7 +243,7 @@ public static SchemaUserTypeCreator getConstructorCreator(
FieldValueTypeSupplier fieldValueTypeSupplier,
TypeConversionsFactory typeConversionsFactory) {
return CACHED_CREATORS.computeIfAbsent(
new ClassWithSchema(clazz, schema),
ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
return createConstructorCreator(
Expand Down Expand Up @@ -291,7 +290,7 @@ public static SchemaUserTypeCreator getStaticCreator(
FieldValueTypeSupplier fieldValueTypeSupplier,
TypeConversionsFactory typeConversionsFactory) {
return CACHED_CREATORS.computeIfAbsent(
new ClassWithSchema(clazz, schema),
ClassWithSchema.create(clazz, schema),
c -> {
List<FieldValueTypeInformation> types = fieldValueTypeSupplier.get(clazz, schema);
return createStaticCreator(clazz, creator, schema, types, typeConversionsFactory);
Expand Down Expand Up @@ -377,11 +376,13 @@ public ByteCodeAppender appender(final Target implementationTarget) {
// Implements a method to write a public set out on an object.
private static class InvokeSetterInstruction implements Implementation {
// Setter method that will be invoked
private Method method;
private FieldValueTypeInformation fieldValueTypeInformation;
private final TypeConversionsFactory typeConversionsFactory;

InvokeSetterInstruction(Method method, TypeConversionsFactory typeConversionsFactory) {
this.method = method;
InvokeSetterInstruction(
FieldValueTypeInformation fieldValueTypeInformation,
TypeConversionsFactory typeConversionsFactory) {
this.fieldValueTypeInformation = fieldValueTypeInformation;
this.typeConversionsFactory = typeConversionsFactory;
}

Expand All @@ -393,13 +394,13 @@ public InstrumentedType prepare(InstrumentedType instrumentedType) {
@Override
public ByteCodeAppender appender(final Target implementationTarget) {
return (methodVisitor, implementationContext, instrumentedMethod) -> {
FieldValueTypeInformation javaTypeInformation = FieldValueTypeInformation.forSetter(method);
// this + method parameters.
int numLocals = 1 + instrumentedMethod.getParameters().size();

// The instruction to read the field.
StackManipulation readField = MethodVariableAccess.REFERENCE.loadFrom(2);

Method method = fieldValueTypeInformation.getMethod();
boolean setterMethodReturnsVoid = method.getReturnType().equals(Void.TYPE);
// Read the object onto the stack.
StackManipulation stackManipulation =
Expand All @@ -409,7 +410,7 @@ public ByteCodeAppender appender(final Target implementationTarget) {
// Do any conversions necessary.
typeConversionsFactory
.createSetterConversions(readField)
.convert(javaTypeInformation.getType()),
.convert(fieldValueTypeInformation.getType()),
// Now update the field and return void.
MethodInvocation.invoke(new ForLoadedMethod(method)));
if (!setterMethodReturnsVoid) {
Expand Down
Loading

0 comments on commit bb3c63b

Please sign in to comment.