Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4461] A transform to perform binary joins of PCollections with schemas #8273

Merged
merged 4 commits into from
Apr 17, 2019

Conversation

reuvenlax
Copy link
Contributor

This supports inner, outer, left outer, and right outer joins.

Example usage:

pCollection1.apply(Join.innerJoin(pCollection2).using("user", "country"));

R: @robinyqiu

Copy link
Contributor

@robinyqiu robinyqiu left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some idea on the API design of CoGroup and Join:
Currently we have

  1. pcTuple.apply(CoGroup.join(...).join(...)) output a PCollection<KV<Row, Row>>
  2. pcTuple.apply(CoGroup.join(...).join(...).crossProductJoin(...)) output a PCollection<Row>
  3. pc1.apply(Join.innerJoin(pc2).on(...)) output a PCollection<Row>

From a user's perspective, I would say 2) and 3) share more commonality than 1) and 2), because they both output the schema of a cross product of all/both inputs; whereas 1) output a KV<K, map from tag to Iterable>. So how about we move the implementation of 2) under the Join class?

Also, with the crossProductJoin() removed, CoGroup will always output PCollection<KV<Row, Row>>, which makes its usage closer to Group.ByFields. (They both do aggregation on one table/multiple tables, instead of cross product.)

Please let me know what you think. I am also happy to help with making some changes if needed.

@@ -159,7 +159,7 @@
*
* <pre>{@code
* PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2)
* .apply(CoGroup.join("input1", By.fieldNames("user").withOuterJoinParticipation())
* .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: update doc at L152 as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -244,7 +244,7 @@ public static By fieldAccessDescriptor(FieldAccessDescriptor fieldAccessDescript
*
* <p>This only affects the results of expandCrossProduct.
*/
public By withOuterJoinParticipation() {
public By withOptionalParticipation() {
return toBuilder().setOuterJoinParticipation(true).build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should be changed to get/setOptionalParticipation()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

*
* <p>Full outer joins, left outer joins, and right outer joins are also supported.
*/
public class Join {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we mark this class and CoGroup with @Experimental(Kind.SCHEMAS)? I think we may be able to make some improvement on the API design later.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good catch! done.


@Test
@Category(NeedsRunner.class)
public void testRightOuterJoin() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: rename to testRightOuterJoinSameKeys?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


@Test
@Category(NeedsRunner.class)
public void testLeftOuterJoin() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: rename to testLeftOuterJoinSameKeys?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

};

/** Implementation class . */
public static class Inner<LhsT, RhsT> extends PTransform<PCollection<LhsT>, PCollection<Row>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I personally find the name Inner a bit misleading because we also have "inner" joins here, but I saw this name is used throughout the package. Maybe something like "Impl" is more unambiguous.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used Inner, because that's the name traditionally used in BeamJava transforms. However I agree it is confusing in this context; changed to Impl.

}

@Override
public PCollection<Row> expand(PCollection lhs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why don't use the more specific type here: PCollection<LhsT>

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because the output schema here is the joined schema, so it would not match the schema of either LhsT or RhsT.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was asking about the input parameter PCollection lhs, not the return type.

@robinyqiu
Copy link
Contributor

LGTM. Also please let me know what's your opinion on my comments here: #8273 (review)

@reuvenlax
Copy link
Contributor Author

@robinyqiu I think the fundamental difference between CoGroup and Join isn't the cross product, it's that CoGroup is a general grouping/join on N inputs while Join is something closer to a standard binary join. Moving the cross-product into Join would mean that we would lose it for the N-input case where N > 2. I don't think we want the Join transform to start dealing with PCollectionTuples - the whole point of the binary join transform is simply to be syntactic sugar that makes common use cases easy.

You are .correct that standard CoGroup is closer to Group. That's the same with standard Beam too - CoGroupByKey is just the multi-input version of GroupByKey. I think that it would make sense to allow similar aggregations in CoGroup as we do in Group

@reuvenlax
Copy link
Contributor Author

@robinyqiu I also considered calling this transform BinaryJoin to emphasize that, but opted for the shorter name (since SQL joins are always binary joins anyway).

@reuvenlax reuvenlax merged commit a00ba2b into apache:master Apr 17, 2019
@robinyqiu
Copy link
Contributor

robinyqiu commented Apr 17, 2019

@robinyqiu I think the fundamental difference between CoGroup and Join isn't the cross product, it's that CoGroup is a general grouping/join on N inputs while Join is something closer to a standard binary join.

Yes I agree that's the fundamental difference between the current design of CoGroup and Join

Moving the cross-product into Join would mean that we would lose it for the N-input case where N > 2.

I was thinking of providing two different flavors of APIs and put them both under Join, one for the general N case (deal with PCollectionTuple) and one for N=2 case (syntactic sugar for the first case).

I don't think we want the Join transform to start dealing with PCollectionTuples - the whole point of the binary join transform is simply to be syntactic sugar that makes common use cases easy.

Ok I see your point. If binary join is a much more common use case than N>2 join, then I agree that the current API is easier to use.

@reuvenlax
Copy link
Contributor Author

reuvenlax commented Apr 18, 2019 via email

ibzib pushed a commit to ibzib/beam that referenced this pull request Apr 22, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants