-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-4461] A transform to perform binary joins of PCollections with schemas #8273
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some idea on the API design of CoGroup
and Join
:
Currently we have
pcTuple.apply(CoGroup.join(...).join(...))
output aPCollection<KV<Row, Row>>
pcTuple.apply(CoGroup.join(...).join(...).crossProductJoin(...))
output aPCollection<Row>
pc1.apply(Join.innerJoin(pc2).on(...))
output aPCollection<Row>
From a user's perspective, I would say 2) and 3) share more commonality than 1) and 2), because they both output the schema of a cross product of all/both inputs; whereas 1) output a KV<K, map from tag to Iterable>. So how about we move the implementation of 2) under the Join
class?
Also, with the crossProductJoin()
removed, CoGroup
will always output PCollection<KV<Row, Row>>
, which makes its usage closer to Group.ByFields
. (They both do aggregation on one table/multiple tables, instead of cross product.)
Please let me know what you think. I am also happy to help with making some changes if needed.
@@ -159,7 +159,7 @@ | |||
* | |||
* <pre>{@code | |||
* PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2) | |||
* .apply(CoGroup.join("input1", By.fieldNames("user").withOuterJoinParticipation()) | |||
* .apply(CoGroup.join("input1", By.fieldNames("user").withOptionalParticipation()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: update doc at L152 as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -244,7 +244,7 @@ public static By fieldAccessDescriptor(FieldAccessDescriptor fieldAccessDescript | |||
* | |||
* <p>This only affects the results of expandCrossProduct. | |||
*/ | |||
public By withOuterJoinParticipation() { | |||
public By withOptionalParticipation() { | |||
return toBuilder().setOuterJoinParticipation(true).build(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should be changed to get/setOptionalParticipation()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
* | ||
* <p>Full outer joins, left outer joins, and right outer joins are also supported. | ||
*/ | ||
public class Join { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we mark this class and CoGroup with @Experimental(Kind.SCHEMAS)
? I think we may be able to make some improvement on the API design later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch! done.
|
||
@Test | ||
@Category(NeedsRunner.class) | ||
public void testRightOuterJoin() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: rename to testRightOuterJoinSameKeys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
|
||
@Test | ||
@Category(NeedsRunner.class) | ||
public void testLeftOuterJoin() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: rename to testLeftOuterJoinSameKeys?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
}; | ||
|
||
/** Implementation class . */ | ||
public static class Inner<LhsT, RhsT> extends PTransform<PCollection<LhsT>, PCollection<Row>> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I personally find the name Inner a bit misleading because we also have "inner" joins here, but I saw this name is used throughout the package. Maybe something like "Impl" is more unambiguous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used Inner, because that's the name traditionally used in BeamJava transforms. However I agree it is confusing in this context; changed to Impl.
} | ||
|
||
@Override | ||
public PCollection<Row> expand(PCollection lhs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't use the more specific type here: PCollection<LhsT>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because the output schema here is the joined schema, so it would not match the schema of either LhsT or RhsT.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was asking about the input parameter PCollection lhs
, not the return type.
LGTM. Also please let me know what's your opinion on my comments here: #8273 (review) |
@robinyqiu I think the fundamental difference between CoGroup and Join isn't the cross product, it's that CoGroup is a general grouping/join on N inputs while Join is something closer to a standard binary join. Moving the cross-product into Join would mean that we would lose it for the N-input case where N > 2. I don't think we want the Join transform to start dealing with PCollectionTuples - the whole point of the binary join transform is simply to be syntactic sugar that makes common use cases easy. You are .correct that standard CoGroup is closer to Group. That's the same with standard Beam too - CoGroupByKey is just the multi-input version of GroupByKey. I think that it would make sense to allow similar aggregations in CoGroup as we do in Group |
@robinyqiu I also considered calling this transform BinaryJoin to emphasize that, but opted for the shorter name (since SQL joins are always binary joins anyway). |
Yes I agree that's the fundamental difference between the current design of CoGroup and Join
I was thinking of providing two different flavors of APIs and put them both under
Ok I see your point. If binary join is a much more common use case than N>2 join, then I agree that the current API is easier to use. |
Ah sorry I missed that! I'll send another PR to fix the lhs parameter.
…On Wed, Apr 17, 2019 at 5:04 PM Yueyang Qiu ***@***.***> wrote:
***@***.**** commented on this pull request.
------------------------------
In
sdks/java/core/src/main/java/org/apache/beam/sdk/schemas/transforms/Join.java
<#8273 (comment)>:
> + /**
+ * Perform a natural join between the PCollections. The fields are expected to exist in both
+ * PCollections
+ */
+ public Inner<LhsT, RhsT> using(FieldAccessDescriptor fieldAccessDescriptor) {
+ return new Inner<>(
+ joinType, rhs, FieldsEqual.left(fieldAccessDescriptor).right(fieldAccessDescriptor));
+ }
+
+ /** Join the PCollections using the provided predicate. */
+ public Inner<LhsT, RhsT> on(FieldsEqual.Inner predicate) {
+ return new Inner<>(joinType, rhs, predicate);
+ }
+
+ @OverRide
+ public PCollection<Row> expand(PCollection lhs) {
I was asking about the input parameter PCollection lhs, not the return
type.
—
You are receiving this because you modified the open/close state.
Reply to this email directly, view it on GitHub
<#8273 (comment)>, or mute
the thread
<https://github.com/notifications/unsubscribe-auth/AFAYJVKAK4MQ4IJNN5MMZCTPQ63KRANCNFSM4HFB7O6A>
.
|
…nary joins of PCollections with schemas
This supports inner, outer, left outer, and right outer joins.
Example usage:
pCollection1.apply(Join.innerJoin(pCollection2).using("user", "country"));
R: @robinyqiu