Skip to content

Commit

Permalink
Merge pull request apache#10278: [BEAM-7274] Support recursive type t…
Browse files Browse the repository at this point in the history
…ransformation in ByteBuddyUtils
  • Loading branch information
reuvenlax authored and Jozef Vilcek committed Feb 21, 2020
1 parent 63d4ba2 commit 699d232
Show file tree
Hide file tree
Showing 13 changed files with 719 additions and 183 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,11 @@
*/
package org.apache.beam.sdk.schemas;

import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.lang.reflect.ParameterizedType;
import java.lang.reflect.Type;
import java.util.Arrays;
import java.util.Collection;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.beam.sdk.schemas.utils.ReflectUtils;
import org.apache.beam.sdk.values.TypeDescriptor;
Expand Down Expand Up @@ -129,9 +123,13 @@ public static FieldValueTypeInformation forGetter(Method method) {
}

public static FieldValueTypeInformation forSetter(Method method) {
return forSetter(method, "set");
}

public static FieldValueTypeInformation forSetter(Method method, String setterPrefix) {
String name;
if (method.getName().startsWith("set")) {
name = ReflectUtils.stripPrefix(method.getName(), "set");
if (method.getName().startsWith(setterPrefix)) {
name = ReflectUtils.stripPrefix(method.getName(), setterPrefix);
} else {
throw new RuntimeException("Setter has wrong prefix " + method.getName());
}
Expand Down Expand Up @@ -162,25 +160,9 @@ private static FieldValueTypeInformation getIterableComponentType(Field field) {
}

@Nullable
private static FieldValueTypeInformation getIterableComponentType(TypeDescriptor valueType) {
static FieldValueTypeInformation getIterableComponentType(TypeDescriptor valueType) {
// TODO: Figure out nullable elements.
TypeDescriptor componentType = null;
if (valueType.isArray()) {
Type component = valueType.getComponentType().getType();
if (!component.equals(byte.class)) {
componentType = TypeDescriptor.of(component);
}
} else if (valueType.isSubtypeOf(TypeDescriptor.of(Iterable.class))) {
TypeDescriptor<Iterable<?>> collection = valueType.getSupertype(Iterable.class);
if (collection.getType() instanceof ParameterizedType) {
ParameterizedType ptype = (ParameterizedType) collection.getType();
java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
checkArgument(params.length == 1);
componentType = TypeDescriptor.of(params[0]);
} else {
throw new RuntimeException("Collection parameter is not parameterized!");
}
}
TypeDescriptor componentType = ReflectUtils.getIterableComponentType(valueType);
if (componentType == null) {
return null;
}
Expand Down Expand Up @@ -223,17 +205,7 @@ private static FieldValueTypeInformation getMapValueType(TypeDescriptor typeDesc
@SuppressWarnings("unchecked")
@Nullable
private static FieldValueTypeInformation getMapType(TypeDescriptor valueType, int index) {
TypeDescriptor mapType = null;
if (valueType.isSubtypeOf(TypeDescriptor.of(Map.class))) {
TypeDescriptor<Collection<?>> map = valueType.getSupertype(Map.class);
if (map.getType() instanceof ParameterizedType) {
ParameterizedType ptype = (ParameterizedType) map.getType();
java.lang.reflect.Type[] params = ptype.getActualTypeArguments();
mapType = TypeDescriptor.of(params[index]);
} else {
throw new RuntimeException("Map type is not parameterized! " + map);
}
}
TypeDescriptor mapType = ReflectUtils.getMapType(valueType, index);
if (mapType == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkState;

import java.lang.reflect.Type;
import java.util.Iterator;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;
Expand All @@ -31,6 +31,9 @@
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.RowWithGetters;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Function;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Collections2;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;

Expand Down Expand Up @@ -107,7 +110,8 @@ private <ValueT> ValueT fromValue(
return (ValueT) fromRow((Row) value, (Class) fieldType, typeFactory);
} else if (TypeName.ARRAY.equals(type.getTypeName())) {
return (ValueT)
fromListValue(type.getCollectionElementType(), (List) value, elementType, typeFactory);
fromCollectionValue(
type.getCollectionElementType(), (Collection) value, elementType, typeFactory);
} else if (TypeName.ITERABLE.equals(type.getTypeName())) {
return (ValueT)
fromIterableValue(
Expand All @@ -127,25 +131,35 @@ private <ValueT> ValueT fromValue(
}
}

private static <SourceT, DestT> Collection<DestT> transformCollection(
Collection<SourceT> collection, Function<SourceT, DestT> function) {
if (collection instanceof List) {
// For performance reasons if the input is a list, make sure that we produce a list. Otherwise
// Row unwrapping
// is forced to physically copy the collection into a new List object.
return Lists.transform((List) collection, function);
} else {
return Collections2.transform(collection, function);
}
}

@SuppressWarnings("unchecked")
private <ElementT> List fromListValue(
private <ElementT> Collection fromCollectionValue(
FieldType elementType,
List<ElementT> rowList,
Collection<ElementT> rowCollection,
FieldValueTypeInformation elementTypeInformation,
Factory<List<FieldValueTypeInformation>> typeFactory) {
List list = Lists.newArrayList();
for (ElementT element : rowList) {
list.add(
fromValue(
elementType,
element,
elementTypeInformation.getType().getType(),
elementTypeInformation.getElementType(),
elementTypeInformation.getMapKeyType(),
elementTypeInformation.getMapValueType(),
typeFactory));
}
return list;
return transformCollection(
rowCollection,
element ->
fromValue(
elementType,
element,
elementTypeInformation.getType().getType(),
elementTypeInformation.getElementType(),
elementTypeInformation.getMapKeyType(),
elementTypeInformation.getMapValueType(),
typeFactory));
}

@SuppressWarnings("unchecked")
Expand All @@ -154,32 +168,17 @@ private <ElementT> Iterable fromIterableValue(
Iterable<ElementT> rowIterable,
FieldValueTypeInformation elementTypeInformation,
Factory<List<FieldValueTypeInformation>> typeFactory) {
return new Iterable<ElementT>() {
@Override
public Iterator<ElementT> iterator() {
return new Iterator<ElementT>() {
Iterator<ElementT> innerIter = rowIterable.iterator();

@Override
public boolean hasNext() {
return innerIter.hasNext();
}

@Override
public ElementT next() {
ElementT element = innerIter.next();
return fromValue(
return Iterables.transform(
rowIterable,
element ->
fromValue(
elementType,
element,
elementTypeInformation.getType().getType(),
elementTypeInformation.getElementType(),
elementTypeInformation.getMapKeyType(),
elementTypeInformation.getMapValueType(),
typeFactory);
}
};
}
};
typeFactory));
}

@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ public boolean equals(Object other) {
public int hashCode() {
return Arrays.hashCode(array);
}

@Override
public String toString() {
return Arrays.toString(array);
}
}
// A mapping between field names an indices.
private final BiMap<String, Integer> fieldIndices = HashBiMap.create();
Expand Down
Loading

0 comments on commit 699d232

Please sign in to comment.