Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-7174] Add schema modification transforms #8425

Merged
merged 6 commits into from
May 15, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ArrayListMultimap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Lists;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Maps;
import org.apache.beam.vendor.guava.v20_0.com.google.common.collect.Multimap;
Expand Down Expand Up @@ -228,7 +229,7 @@ public static FieldAccessDescriptor withFields(Iterable<FieldDescriptor> fields)
return builder().setFieldsAccessed(Sets.newLinkedHashSet(fields)).build();
}

// Union a set of FieldAccessDescriptors. This function currenty only supports descriptors with
// Union a set of FieldAccessDescriptors. This function currently only supports descriptors with
// containing named fields, not those containing ids.
private static FieldAccessDescriptor union(
Iterable<FieldAccessDescriptor> fieldAccessDescriptors) {
Expand Down Expand Up @@ -323,6 +324,15 @@ public Set<Integer> fieldIdsAccessed() {
.collect(Collectors.toSet());
}

/**
* Return the field names accessed. Should not be called until after {@link #resolve} is called.
*/
public Set<String> fieldNamesAccessed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding some unit tests for these new functions?

return getFieldsAccessed().stream()
.map(FieldDescriptor::getFieldName)
.collect(Collectors.toSet());
}

/**
* Return the nested fields keyed by field ids. Should not be called until after {@link #resolve}
* is called.
Expand All @@ -332,6 +342,32 @@ public Map<Integer, FieldAccessDescriptor> nestedFieldsById() {
.collect(Collectors.toMap(f -> f.getKey().getFieldId(), f -> f.getValue()));
}

/**
* Return the nested fields keyed by field name. Should not be called until after {@link #resolve}
* is called.
*/
public Map<String, FieldAccessDescriptor> nestedFieldsByName() {
return getNestedFieldsAccessed().entrySet().stream()
.collect(Collectors.toMap(f -> f.getKey().getFieldName(), f -> f.getValue()));
}

/** Returns true if this descriptor references only a single, non-wildcard field. */
public boolean referencesSingleField() {
if (getAllFields()) {
return false;
}

if (getFieldsAccessed().size() == 1 && getNestedFieldsAccessed().isEmpty()) {
return true;
}

if (getFieldsAccessed().isEmpty() && getNestedFieldsAccessed().size() == 1) {
return getNestedFieldsAccessed().values().iterator().next().referencesSingleField();
}

return false;
}

/**
* Resolve the {@link FieldAccessDescriptor} against a schema.
*
Expand Down Expand Up @@ -513,4 +549,22 @@ private static void validateFieldDescriptor(Schema schema, FieldDescriptor field
}
}
}

@Override
public String toString() {
if (getAllFields()) {
return "*";
}

List<String> singleSelectors =
getFieldsAccessed().stream()
.map(FieldDescriptor::getFieldName)
.collect(Collectors.toList());
List<String> nestedSelectors =
getNestedFieldsAccessed().entrySet().stream()
.map(e -> e.getKey().getFieldName() + "." + e.getValue().toString())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are multiple sub-descriptors under a same field (e.g. a.b and a.c), won't this return something like "a.b, c"?
To make the string looks prettier, how about we simply add a bracket around sub-descriptor if it does not referencesSingleField (i.e. return something like "a.[b, c]")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current if you have a.b and a.c, this with return a.b, a.c.
We could make it return a.[b,c], but it might get confusing for multiple nested descriptors? For example then a.b.c.d, a.b.c.e, a.c.f, a.g

will be printed as a.[b.[c.[d, e] , f], g]

Where it's probably more readable as simply a.b.c.d, a.b.c.e, a.c.f, a.g

.collect(Collectors.toList());
;
return String.join(", ", Iterables.concat(singleSelectors, nestedSelectors));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,10 @@ public Builder addMapField(String name, FieldType keyType, FieldType valueType)
return this;
}

public int getLastFieldId() {
return fields.size() - 1;
}

public Schema build() {
return new Schema(fields);
}
Expand Down
Loading