Skip to content

Commit

Permalink
Merge pull request apache#8425: [BEAM-7174] Add schema modification t…
Browse files Browse the repository at this point in the history
…ransforms
  • Loading branch information
reuvenlax authored and charithe committed May 16, 2019
1 parent aa21fcf commit ab7da29
Show file tree
Hide file tree
Showing 8 changed files with 1,607 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
Expand Down Expand Up @@ -229,7 +230,7 @@ public static FieldAccessDescriptor withFields(Iterable<FieldDescriptor> fields)
return builder().setFieldsAccessed(Lists.newArrayList(fields)).build();
}

// Union a set of FieldAccessDescriptors. This function currenty only supports descriptors with
// Union a set of FieldAccessDescriptors. This function currently only supports descriptors with
// containing named fields, not those containing ids.
private static FieldAccessDescriptor union(
Iterable<FieldAccessDescriptor> fieldAccessDescriptors) {
Expand Down Expand Up @@ -327,6 +328,15 @@ public List<Integer> fieldIdsAccessed() {
.collect(Collectors.toList());
}

/**
* Return the field names accessed. Should not be called until after {@link #resolve} is called.
*/
public Set<String> fieldNamesAccessed() {
return getFieldsAccessed().stream()
.map(FieldDescriptor::getFieldName)
.collect(Collectors.toSet());
}

/**
* Return the nested fields keyed by field ids. Should not be called until after {@link #resolve}
* is called.
Expand All @@ -336,6 +346,32 @@ public Map<Integer, FieldAccessDescriptor> nestedFieldsById() {
.collect(Collectors.toMap(f -> f.getKey().getFieldId(), f -> f.getValue()));
}

/**
* Return the nested fields keyed by field name. Should not be called until after {@link #resolve}
* is called.
*/
public Map<String, FieldAccessDescriptor> nestedFieldsByName() {
return getNestedFieldsAccessed().entrySet().stream()
.collect(Collectors.toMap(f -> f.getKey().getFieldName(), f -> f.getValue()));
}

/** Returns true if this descriptor references only a single, non-wildcard field. */
public boolean referencesSingleField() {
if (getAllFields()) {
return false;
}

if (getFieldsAccessed().size() == 1 && getNestedFieldsAccessed().isEmpty()) {
return true;
}

if (getFieldsAccessed().isEmpty() && getNestedFieldsAccessed().size() == 1) {
return getNestedFieldsAccessed().values().iterator().next().referencesSingleField();
}

return false;
}

/**
* Resolve the {@link FieldAccessDescriptor} against a schema.
*
Expand Down Expand Up @@ -515,4 +551,22 @@ private static void validateFieldDescriptor(Schema schema, FieldDescriptor field
}
}
}

@Override
public String toString() {
if (getAllFields()) {
return "*";
}

List<String> singleSelectors =
getFieldsAccessed().stream()
.map(FieldDescriptor::getFieldName)
.collect(Collectors.toList());
List<String> nestedSelectors =
getNestedFieldsAccessed().entrySet().stream()
.map(e -> e.getKey().getFieldName() + "." + e.getValue().toString())
.collect(Collectors.toList());
;
return String.join(", ", Iterables.concat(singleSelectors, nestedSelectors));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ public Builder addMapField(String name, FieldType keyType, FieldType valueType)
return this;
}

public int getLastFieldId() {
return fields.size() - 1;
}

public Schema build() {
return new Schema(fields);
}
Expand Down
Loading

0 comments on commit ab7da29

Please sign in to comment.