-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-7174] Add schema modification transforms #8425
[BEAM-7174] Add schema modification transforms #8425
Conversation
ba95f57
to
ad66a04
Compare
R: @robinyqiu |
.collect(Collectors.toList()); | ||
List<String> nestedSelectors = | ||
getNestedFieldsAccessed().entrySet().stream() | ||
.map(e -> e.getKey().getFieldName() + "." + e.getValue().toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there are multiple sub-descriptors under a same field (e.g. a.b and a.c), won't this return something like "a.b, c"?
To make the string looks prettier, how about we simply add a bracket around sub-descriptor if it does not referencesSingleField (i.e. return something like "a.[b, c]")?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Current if you have a.b and a.c, this with return a.b, a.c.
We could make it return a.[b,c], but it might get confusing for multiple nested descriptors? For example then a.b.c.d, a.b.c.e, a.c.f, a.g
will be printed as a.[b.[c.[d, e] , f], g]
Where it's probably more readable as simply a.b.c.d, a.b.c.e, a.c.f, a.g
/** | ||
* Return the field names accessed. Should not be called until after {@link #resolve} is called. | ||
*/ | ||
public Set<String> fieldNamesAccessed() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider adding some unit tests for these new functions?
* .field("userDetails.isSpecialUser", "FieldType.BOOLEAN", false)); | ||
* }</pre> | ||
*/ | ||
public class AddFields { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add @Experiment
(same for the other transforms).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. done.
FieldAccessDescriptor selectDescriptor = | ||
complement(inputSchema, fieldsToDrop.resolve(inputSchema)); | ||
|
||
return Select.<T>fieldAccess(selectDescriptor).expand(input); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason why we call the expand
function here explicitly, instead of doing
return input.apply(Select.<T>fieldAccess(selectDescriptor));
as usual?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, input.apply is the preferred way. Changed.
@Category(NeedsRunner.class) | ||
public void testDropNestedField() { | ||
Schema expectedSchema = | ||
Schema.builder().addStringField("string").addStringField("field2").build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The original schema here is:
nested: ROW[field1: int, field2: string], string: string
After dropping nested.field1
, the output schema becomes:
string: string, field2: string
.
I have two questions here:
-
The order of the fields has changed (
string
comes beforefield2
now). Is this the intended behavior? I see this is an implementation detail in theSelectHelpers.union
function (It merges direct fields accessed first and then nested fields accessed). -
nested.field2
is "unwrapped" tofield2
. I agree to makenested.field2
a top-level field in the output schema, but I think we need to do some work to properly name the field (e.g. name itnested.field2
instead of onlyfield2
). Otherwise this may result in unexpected behaviors: e.g. if in the example schema the second top-level field is also namedfield2
, then there will be a naming conflict and the output schema construction will fail.)
Either way, I think we need a clear documentation (and better unit tests) on the intended behavior in the javadoc on Select
, in addition to SelectHelpers
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
-
You're absolutely correct. The problem actually is that FieldAccessDescriptor stores top-level fields and nested fields separately so the transforms can't tell the original order. This is something we need to fix, and will require redoing FieldAccessDescriptor a bit. I'll file a JIRA as it's out of scope of this PR, and also affects the Select transform.
-
This is a current weakness of the Select transform. select("a.b", "c.b") currently doesn't work. We need to add select(x).as(y) functionality. I think there might already be a JIRA for this, if not I'll add one.
* PCollection<Row> renamedEvents = | ||
* events.apply(RenameFields.<Event>create() | ||
* .rename("userName", "userId") | ||
* .rename("location.country", "location.countryCode")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The implementation seems to be different as described here. I think this line should be:
.rename("location.country", "countryCode")
It would be good if we can add a line of comment here to make it clear that to rename a nested field, users don't need to specify the prefix for the new name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fixed
public static class Inner<T> extends PTransform<PCollection<T>, PCollection<Row>> { | ||
/** Internal object representing a new field added. */ | ||
@AutoValue | ||
abstract static class NewField implements Serializable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we putting NewField
and AddFieldsInformation
inside Inner
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No specific reason. these are effectively private classes so it doesn't change the interface (we can't make them actually private due to AutoValue).
return new Inner<>(fields); | ||
} | ||
|
||
private AddFieldsInformation getAddFieldsInformation( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe these functions can be static.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done.
builder.addField(field); | ||
} | ||
|
||
// Add any new fields at this level. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
By reading the implementation code here, I see that top-level fields are added before nested fields. Can we document this in the class level javadoc? Otherwise this may be confusing to users, e.g. after calling
pc.apply(AddFields.create().field("new1.nested", FieldType.STRING).field("new2", FieldType.INT32)
users may expect a new schema in which new1
appears before new2
, which is not the case.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will talk about this more generally below.
// If there are brand new simple (i.e. have no nested values) fields at this level, then add | ||
// the default values for all of them. | ||
newValues.addAll(addFieldsInformation.getDefaultValues()); | ||
// If we are creating new recursive fields, populate new values for the here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo: them
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Hi Reuven, thank you for your patience. Reviewing this PR took some time. I added some comments in here. A major problem I see here is that the order of the fields in the output schema of |
About your concern w.r.t. field ordering: For the AddFields transform it requires a bit more thought. If the user calls AddFields("a.b", "c", "a.d"), then we cannot preserve the exact order, as a is specified both as the first and the third field. IMO we should let the first one win and add in the order [a.[b, d], c]. |
LGTM. Thanks for the fix. BEAM-7301 is tracking the remaining field ordering issue, and I am happy to work on the fix for that once I can find some time. |
Add three transforms for modifying schemas:
AddFields Add new fields to a schema. Existing rows are padded with null values in the position of these new fields (or alternatively the user can specify a default value)
DropFields Drop fields from a schema.
RenameFields Rename schema fields.