Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow Schema field selections in DoFn using NewDoFn injection #8311

Merged

Conversation

reuvenlax
Copy link
Contributor

This change enables selecting fields out of input schemas directly in a DoFn. Fields can automatically be converted to user types if the types match.

For example, if a input schema contained a latitude and a longitude field, you could write:

ParDo.of(new DoFn<>() {
@ProcessElement public void process(
@FieldAccess("latitude") double latitude, @FieldAccess('longitude") double longitude) {
}
});

And Beam will automatically extract those fields from the input schema and inject just them into the dofn.

Beam will also automatically convert to user types. If the input schema has a nested row that matches a custom user type's schema, the user can use that Java type in their DoFn parameter and Beam will convert the field to that type.

R: @TheNeuralBit

@reuvenlax
Copy link
Contributor Author

Run Java_Examples_Dataflow PreCommit

@reuvenlax
Copy link
Contributor Author

Run Python PreCommit

Copy link
Member

@TheNeuralBit TheNeuralBit left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great! Thanks for taking some time to go over this with me in-person. Just a few relatively minor comments.

FieldAccessDescriptor.withFieldNames("integerField");

@FieldAccess("intsSelector")
final FieldAccessDescriptor stringsSelector =
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should be intsSelector

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't understand

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was referring to the variable name - it looks like a copy-paste error since the FieldAccess string, intsSelector, doesn't match the variable name, stringSelector

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed variable name

@Override
public int hashCode() {
return Objects.hash(integerField, stringField);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it make sense to use AutoValue here instead of implementing equals and hashCode?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very good suggestion, done. Note that this uncovered a bug in schema inference that I fixed.

public void process(
@FieldAccess("inner.*") PojoForExtraction pojo,
@FieldAccess("inner") PojoForExtraction pojo2,
@FieldAccess("inner.stringField") String stringField,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if a FieldAccess has a comma-separated list of selectors? like "inner.stringField, inner.integerField"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

actually the grammar for our expression parser doesn't include commas, so it will fail with parse error.

@@ -136,7 +136,7 @@
InputT element(DoFn<InputT, OutputT> doFn);

/** Provide a link to the input element. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you update the docstring here to explain what index is? It might also be useful to update the element docstring as well - "Provide a link to the input element." seems misleading, from what I can tell the implementations should just return a reference to the element.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


abstract DoFnSchemaInformation build();
}

public abstract Builder toBuilder();

public <T> DoFnSchemaInformation withElementParameterSchema(SchemaCoder<T> schemaCoder) {
return toBuilder().setElementParameterSchema(schemaCoder).build();
DoFnSchemaInformation withElementParameterSchema(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind adding a docstring here and on withUnboxParameter? I think that would help a lot with readability. It doesn't need to be exhaustive, but there are a couple of questions I had to work through by reading the code that I think could be clarified here:

  • Can elementCoder be considered outputCoder (i.e. the coder to use for selectOutputSchema), or is it something else?
  • What's the difference between withElementParameterSchema with unbox = true and withUnboxParameter?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@reuvenlax
Copy link
Contributor Author

Run Dataflow ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Dataflow Runner Nexmark Tests

@reuvenlax
Copy link
Contributor Author

Run Dataflow ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Dataflow Runner Nexmark Tests

@reuvenlax
Copy link
Contributor Author

Run Dataflow ValidatesRunner

1 similar comment
@reuvenlax
Copy link
Contributor Author

Run Dataflow ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Dataflow Runner Nexmark Tests

@reuvenlax reuvenlax force-pushed the pardo_convert_and_select_schema_input branch from 3b6db54 to bca3798 Compare April 25, 2019 19:18
@reuvenlax
Copy link
Contributor Author

Run Dataflow ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Dataflow ValidatesRunner

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

2 similar comments
@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

@reuvenlax
Copy link
Contributor Author

@TheNeuralBit I believe I've address all comments

@TheNeuralBit
Copy link
Member

Run Java PreCommit

@TheNeuralBit
Copy link
Member

Yep! LGTM

It looks like the last test failure was a timeout, so I guess this is probably still safe to merge?

@reuvenlax
Copy link
Contributor Author

@TheNeuralBit I already got a green Java PreCommit run, and as you said the rerun was a timeout.

@reuvenlax reuvenlax merged commit 77b295b into apache:master Apr 27, 2019
@reuvenlax
Copy link
Contributor Author

Run Dataflow Runner Nexmark Tests

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants