Skip to content

Commit

Permalink
Merge pull request apache#11041 from reuvenlax/use_beam_join_api_in_sql
Browse files Browse the repository at this point in the history
[BEAM-4076] Use beam join api in sql
  • Loading branch information
reuvenlax committed Mar 23, 2020
2 parents 0351b49 + 37b1fba commit b564239
Show file tree
Hide file tree
Showing 17 changed files with 937 additions and 672 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,10 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.annotation.Nullable;
Expand All @@ -45,10 +43,10 @@
import org.apache.beam.sdk.schemas.Schema.FieldType;
import org.apache.beam.sdk.schemas.Schema.TypeName;
import org.apache.beam.sdk.schemas.parser.FieldAccessDescriptorParser;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.LinkedListMultimap;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Multimap;
Expand Down Expand Up @@ -142,8 +140,6 @@ abstract static class Builder {
abstract Builder setNestedFieldsAccessed(
Map<FieldDescriptor, FieldAccessDescriptor> nestedFieldsAccessedById);

abstract Builder setFieldInsertionOrder(boolean insertionOrder);

abstract FieldAccessDescriptor build();
}

Expand All @@ -154,14 +150,11 @@ abstract Builder setNestedFieldsAccessed(

public abstract Map<FieldDescriptor, FieldAccessDescriptor> getNestedFieldsAccessed();

public abstract boolean getFieldInsertionOrder();

abstract Builder toBuilder();

static Builder builder() {
return new AutoValue_FieldAccessDescriptor.Builder()
.setAllFields(false)
.setFieldInsertionOrder(false)
.setFieldsAccessed(Collections.emptyList())
.setNestedFieldsAccessed(Collections.emptyMap());
}
Expand Down Expand Up @@ -199,6 +192,7 @@ public static FieldAccessDescriptor withFieldNames(Iterable<String> fieldNames)
return union(fields);
}

/** Return a descriptor that accesses the specified fields, renaming those fields. */
public static FieldAccessDescriptor withFieldNamesAs(Map<String, String> fieldNamesAs) {
List<FieldAccessDescriptor> fields = Lists.newArrayListWithCapacity(fieldNamesAs.size());
for (Map.Entry<String, String> entry : fieldNamesAs.entrySet()) {
Expand All @@ -207,6 +201,84 @@ public static FieldAccessDescriptor withFieldNamesAs(Map<String, String> fieldNa
return union(fields);
}

/**
* Return a descriptor that accesses the specified field names as nested subfields of the
* baseDescriptor.
*
* <p>This is only supported when baseDescriptor refers to a single field.
*/
public static FieldAccessDescriptor withFieldNames(
FieldAccessDescriptor baseDescriptor, String... fieldNames) {
return withFieldNames(baseDescriptor, Arrays.asList(fieldNames));
}

/**
* Return a descriptor that accesses the specified field names as nested subfields of the
* baseDescriptor.
*
* <p>This is only supported when baseDescriptor refers to a single field.
*/
public static FieldAccessDescriptor withFieldNames(
FieldAccessDescriptor baseDescriptor, Iterable<String> fieldNames) {
if (baseDescriptor.getFieldsAccessed().isEmpty()
&& baseDescriptor.getNestedFieldsAccessed().isEmpty()) {
// If baseDescriptor is empty, this is no different than calling
// withFieldNames(Iterable<String>);
return withFieldNames(fieldNames);
}
if (!baseDescriptor.getFieldsAccessed().isEmpty()) {
checkArgument(baseDescriptor.getNestedFieldsAccessed().isEmpty());
FieldDescriptor fieldDescriptor =
Iterables.getOnlyElement(baseDescriptor.getFieldsAccessed());
return FieldAccessDescriptor.create()
.withNestedField(fieldDescriptor, FieldAccessDescriptor.withFieldNames(fieldNames));
} else {
checkArgument(baseDescriptor.getFieldsAccessed().isEmpty());
Map.Entry<FieldDescriptor, FieldAccessDescriptor> entry =
Iterables.getOnlyElement(baseDescriptor.getNestedFieldsAccessed().entrySet());
return FieldAccessDescriptor.create()
.withNestedField(entry.getKey(), withFieldNames(entry.getValue(), fieldNames));
}
}

/**
* Return a descriptor that accesses the specified field ids as nested subfields of the
* baseDescriptor.
*
* <p>This is only supported when baseDescriptor refers to a single field.
*/
public static FieldAccessDescriptor withFieldIds(
FieldAccessDescriptor baseDescriptor, Integer... fieldIds) {
return withFieldIds(baseDescriptor, Arrays.asList(fieldIds));
}

/**
* Return a descriptor that accesses the specified field ids as nested subfields of the
* baseDescriptor.
*
* <p>This is only supported when baseDescriptor refers to a single field.
*/
public static FieldAccessDescriptor withFieldIds(
FieldAccessDescriptor baseDescriptor, Iterable<Integer> fieldIds) {
if (baseDescriptor.getFieldsAccessed().isEmpty()
&& baseDescriptor.getNestedFieldsAccessed().isEmpty()) {
return withFieldIds(fieldIds);
}
if (!baseDescriptor.getFieldsAccessed().isEmpty()) {
checkArgument(baseDescriptor.getNestedFieldsAccessed().isEmpty());
FieldDescriptor fieldDescriptor =
Iterables.getOnlyElement(baseDescriptor.getFieldsAccessed());
return FieldAccessDescriptor.create()
.withNestedField(fieldDescriptor, FieldAccessDescriptor.withFieldIds(fieldIds));
} else {
checkArgument(baseDescriptor.getFieldsAccessed().isEmpty());
Map.Entry<FieldDescriptor, FieldAccessDescriptor> entry =
Iterables.getOnlyElement(baseDescriptor.getNestedFieldsAccessed().entrySet());
return FieldAccessDescriptor.create()
.withNestedField(entry.getKey(), withFieldIds(entry.getValue(), fieldIds));
}
}

/**
* Return a descriptor that access the specified fields.
*
Expand Down Expand Up @@ -243,13 +315,14 @@ public static FieldAccessDescriptor withFields(Iterable<FieldDescriptor> fields)
return builder().setFieldsAccessed(Lists.newArrayList(fields)).build();
}

// Union a set of FieldAccessDescriptors. This function currently only supports descriptors with
// containing named fields, not those containing ids.
private static FieldAccessDescriptor union(
// Union a set of FieldAccessDescriptors.
// This should generally be used only on resolved descriptors.
public static FieldAccessDescriptor union(
Iterable<FieldAccessDescriptor> fieldAccessDescriptors) {
// Use linked sets and maps to ensure that we union fields in the order specified.
Set<FieldDescriptor> fieldsAccessed = Sets.newLinkedHashSet();
Multimap<FieldDescriptor, FieldAccessDescriptor> nestedFieldsAccessed =
ArrayListMultimap.create();
LinkedListMultimap.create();
for (FieldAccessDescriptor fieldAccessDescriptor : fieldAccessDescriptors) {
if (fieldAccessDescriptor.getAllFields()) {
// If one of the descriptors is a wildcard, we can short circuit and return a wildcard.
Expand Down Expand Up @@ -354,21 +427,11 @@ public FieldAccessDescriptor withNestedFieldAs(
public FieldAccessDescriptor withNestedField(
FieldDescriptor field, FieldAccessDescriptor fieldAccess) {
Map<FieldDescriptor, FieldAccessDescriptor> newNestedFieldAccess =
ImmutableMap.<FieldDescriptor, FieldAccessDescriptor>builder()
.putAll(getNestedFieldsAccessed())
.put(field, fieldAccess)
.build();
Maps.newLinkedHashMap(getNestedFieldsAccessed());
newNestedFieldAccess.put(field, fieldAccess);
return toBuilder().setNestedFieldsAccessed(newNestedFieldAccess).build();
}

/**
* By default, fields are sorted by name. If this is set, they will instead be sorted by insertion
* order. All sorting happens in the {@link #resolve(Schema)} method.
*/
public FieldAccessDescriptor withOrderByFieldInsertionOrder() {
return toBuilder().setFieldInsertionOrder(true).build();
}

/**
* Return the field ids accessed. Should not be called until after {@link #resolve} is called.
* Iteration order is consistent with {@link #getFieldsAccessed}.
Expand Down Expand Up @@ -463,23 +526,11 @@ private List<FieldDescriptor> resolveDirectFieldsAccessed(Schema schema) {
field = fillInMissingQualifiers(field, schema);
fields.add(field);
}

if (!getFieldInsertionOrder()) {
// Re-order fields based on field ID, rather than keeping them in insertion order
Collections.sort(fields, Comparator.comparing(FieldDescriptor::getFieldId));
}
return fields;
}

private Map<FieldDescriptor, FieldAccessDescriptor> resolveNestedFieldsAccessed(Schema schema) {
Map<FieldDescriptor, FieldAccessDescriptor> nestedFields;
if (getFieldInsertionOrder()) {
nestedFields = Maps.newLinkedHashMap();
} else {
Function<FieldDescriptor, Integer> extract =
(Function<FieldDescriptor, Integer> & Serializable) FieldDescriptor::getFieldId;
nestedFields = Maps.newTreeMap(Comparator.comparing(extract));
}
Map<FieldDescriptor, FieldAccessDescriptor> nestedFields = Maps.newLinkedHashMap();

for (Map.Entry<FieldDescriptor, FieldAccessDescriptor> entry :
getNestedFieldsAccessed().entrySet()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ public boolean equals(Object o) {
Schema other = (Schema) o;
// If both schemas have a UUID set, we can simply compare the UUIDs.
if (uuid != null && other.uuid != null) {
return Objects.equals(uuid, other.uuid);
if (Objects.equals(uuid, other.uuid)) {
return true;
}
}
return Objects.equals(fieldIndices, other.fieldIndices)
&& Objects.equals(getFields(), other.getFields())
Expand Down Expand Up @@ -591,9 +593,9 @@ public abstract static class FieldType implements Serializable {

/** Returns optional extra metadata. */
@SuppressWarnings("mutable")
protected abstract Map<String, ByteArrayWrapper> getMetadata();
abstract Map<String, ByteArrayWrapper> getMetadata();

abstract FieldType.Builder toBuilder();
public abstract FieldType.Builder toBuilder();

public boolean isLogicalType(String logicalTypeIdentifier) {
return getTypeName().isLogicalType()
Expand Down Expand Up @@ -740,6 +742,11 @@ public FieldType withMetadata(String key, String metadata) {
return withMetadata(key, metadata.getBytes(StandardCharsets.UTF_8));
}

public Map<String, byte[]> getAllMetadata() {
return getMetadata().entrySet().stream()
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().array));
}

@Nullable
public byte[] getMetadata(String key) {
ByteArrayWrapper metadata = getMetadata().get(key);
Expand Down
Loading

0 comments on commit b564239

Please sign in to comment.