Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-7174] Add schema modification transforms #8425

Merged
merged 6 commits into from
May 15, 2019

Conversation

reuvenlax
Copy link
Contributor

@reuvenlax reuvenlax commented Apr 29, 2019

Add three transforms for modifying schemas:
AddFields Add new fields to a schema. Existing rows are padded with null values in the position of these new fields (or alternatively the user can specify a default value)
DropFields Drop fields from a schema.
RenameFields Rename schema fields.

@reuvenlax reuvenlax force-pushed the add_schema_modification_transforms branch from ba95f57 to ad66a04 Compare April 29, 2019 21:04
@reuvenlax reuvenlax changed the title Add schema modification transforms [BEAM-7174] Add schema modification transforms Apr 29, 2019
Reuven Lax added 2 commits May 2, 2019 11:03
@reuvenlax
Copy link
Contributor Author

R: @robinyqiu

.collect(Collectors.toList());
List<String> nestedSelectors =
getNestedFieldsAccessed().entrySet().stream()
.map(e -> e.getKey().getFieldName() + "." + e.getValue().toString())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there are multiple sub-descriptors under a same field (e.g. a.b and a.c), won't this return something like "a.b, c"?
To make the string looks prettier, how about we simply add a bracket around sub-descriptor if it does not referencesSingleField (i.e. return something like "a.[b, c]")?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Current if you have a.b and a.c, this with return a.b, a.c.
We could make it return a.[b,c], but it might get confusing for multiple nested descriptors? For example then a.b.c.d, a.b.c.e, a.c.f, a.g

will be printed as a.[b.[c.[d, e] , f], g]

Where it's probably more readable as simply a.b.c.d, a.b.c.e, a.c.f, a.g

/**
* Return the field names accessed. Should not be called until after {@link #resolve} is called.
*/
public Set<String> fieldNamesAccessed() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider adding some unit tests for these new functions?

* .field("userDetails.isSpecialUser", "FieldType.BOOLEAN", false));
* }</pre>
*/
public class AddFields {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add @Experiment (same for the other transforms).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. done.

FieldAccessDescriptor selectDescriptor =
complement(inputSchema, fieldsToDrop.resolve(inputSchema));

return Select.<T>fieldAccess(selectDescriptor).expand(input);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason why we call the expand function here explicitly, instead of doing
return input.apply(Select.<T>fieldAccess(selectDescriptor));
as usual?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, input.apply is the preferred way. Changed.

@Category(NeedsRunner.class)
public void testDropNestedField() {
Schema expectedSchema =
Schema.builder().addStringField("string").addStringField("field2").build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The original schema here is:
nested: ROW[field1: int, field2: string], string: string
After dropping nested.field1, the output schema becomes:
string: string, field2: string.

I have two questions here:

  1. The order of the fields has changed (string comes before field2 now). Is this the intended behavior? I see this is an implementation detail in the SelectHelpers.union function (It merges direct fields accessed first and then nested fields accessed).

  2. nested.field2 is "unwrapped" to field2. I agree to make nested.field2 a top-level field in the output schema, but I think we need to do some work to properly name the field (e.g. name it nested.field2 instead of only field2). Otherwise this may result in unexpected behaviors: e.g. if in the example schema the second top-level field is also named field2, then there will be a naming conflict and the output schema construction will fail.)

Either way, I think we need a clear documentation (and better unit tests) on the intended behavior in the javadoc on Select, in addition to SelectHelpers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. You're absolutely correct. The problem actually is that FieldAccessDescriptor stores top-level fields and nested fields separately so the transforms can't tell the original order. This is something we need to fix, and will require redoing FieldAccessDescriptor a bit. I'll file a JIRA as it's out of scope of this PR, and also affects the Select transform.

  2. This is a current weakness of the Select transform. select("a.b", "c.b") currently doesn't work. We need to add select(x).as(y) functionality. I think there might already be a JIRA for this, if not I'll add one.

* PCollection<Row> renamedEvents =
* events.apply(RenameFields.<Event>create()
* .rename("userName", "userId")
* .rename("location.country", "location.countryCode"));
Copy link
Contributor

@robinyqiu robinyqiu May 10, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation seems to be different as described here. I think this line should be:
.rename("location.country", "countryCode")

It would be good if we can add a line of comment here to make it clear that to rename a nested field, users don't need to specify the prefix for the new name.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

public static class Inner<T> extends PTransform<PCollection<T>, PCollection<Row>> {
/** Internal object representing a new field added. */
@AutoValue
abstract static class NewField implements Serializable {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we putting NewField and AddFieldsInformation inside Inner?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No specific reason. these are effectively private classes so it doesn't change the interface (we can't make them actually private due to AutoValue).

return new Inner<>(fields);
}

private AddFieldsInformation getAddFieldsInformation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe these functions can be static.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done.

builder.addField(field);
}

// Add any new fields at this level.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By reading the implementation code here, I see that top-level fields are added before nested fields. Can we document this in the class level javadoc? Otherwise this may be confusing to users, e.g. after calling
pc.apply(AddFields.create().field("new1.nested", FieldType.STRING).field("new2", FieldType.INT32)
users may expect a new schema in which new1 appears before new2, which is not the case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will talk about this more generally below.

// If there are brand new simple (i.e. have no nested values) fields at this level, then add
// the default values for all of them.
newValues.addAll(addFieldsInformation.getDefaultValues());
// If we are creating new recursive fields, populate new values for the here.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typo: them

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@robinyqiu
Copy link
Contributor

Hi Reuven, thank you for your patience. Reviewing this PR took some time. I added some comments in here.

A major problem I see here is that the order of the fields in the output schema of AddFields, DropFields (and Select) are not well specified. It depends on various implementation details, like wether the field is a nested field or top-level field. Sometimes the order of fields in the output schema could be unexpected. Can we have a better defined behavior of these transforms, and properly document it as well?

@reuvenlax
Copy link
Contributor Author

About your concern w.r.t. field ordering:
In general I agree with you. Transforms like Select etc. should return schemas with the fields in the order selected, and transforms like DropFields and RenameFields should preserve the original field order. This requires redoing FieldAcessDescriptor as it currently doesn't have enough information to preserve this (since it stores top-level fields and nested fields separately). This is a general bug, and I just filed BEAM-4076 for us to fix it.

For the AddFields transform it requires a bit more thought. If the user calls AddFields("a.b", "c", "a.d"), then we cannot preserve the exact order, as a is specified both as the first and the third field. IMO we should let the first one win and add in the order [a.[b, d], c].

@robinyqiu
Copy link
Contributor

LGTM. Thanks for the fix.

BEAM-7301 is tracking the remaining field ordering issue, and I am happy to work on the fix for that once I can find some time.

@reuvenlax reuvenlax merged commit c630217 into apache:master May 15, 2019
charithe pushed a commit to shehzaadn-vd/vend-beam that referenced this pull request May 16, 2019
ajamato pushed a commit to ajamato/beam that referenced this pull request May 18, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants