-
Notifications
You must be signed in to change notification settings - Fork 4.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BEAM-4461] Support inner and outer style joins in CoGroup. #7353
Conversation
@akedin any comments on this PR? |
@kanterov do you have any time to help review this PR? |
@reuvenlax I'm going on vacation, but I review on the week of 25th February |
@kanterov thank you very much! |
…eneralizes inner and outer joins to multiple input PCollections.
e2a191a
to
984ea49
Compare
I'm looking into it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code looks good. Would be useful to abstract over tuple cardinality in SQL transforms, and fixing issues, for instance BEAM-5049.
I've carefully gone through the Javadoc, and it's great, was mostly nitpicking there.
One case I didn't find covered is PCollectionTuple
having not all tags from JoinArguments
. It would be interesting to see performance comparing to join-library
, however, as I see it, at the moment the focus is to stabilize APIs.
I find TODO for doing lazy Row implementation for iteration over CoGbkResult worth JIRA ticket.
* identical between input1 and input2. | ||
* | ||
* <p>This transform also supports outer-join semantics. By default, all input PCollections must | ||
* participate fully in the join, providing inner-join semantics. This means that if if all input |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/if if/if/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* | ||
* <p>This transform also supports outer-join semantics. By default, all input PCollections must | ||
* participate fully in the join, providing inner-join semantics. This means that if if all input | ||
* save one have values for a given user "Bob" the join will produce no values for "Bob." However, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: looks like it was phrased as "if all inputs have values for a given user" or similar
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* | ||
* <pre>{@code | ||
* PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2) | ||
* .apply(CoGroup.join("input1", By.fieldNames("user").withOuterJoinParticipation() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing closing bracket in join(
for code examples, if I get it correctly, it should be:
PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2)
.apply(CoGroup.join("input1", By.fieldNames("user").withOuterJoinParticipation())
.join("input2", By.fieldNames("user"))
.crossProductJoin();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* | ||
* <pre>{@code | ||
* PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2) | ||
* .apply(CoGroup.join("input1", By.fieldNames("user") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing bracket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
* | ||
* <pre>{@code | ||
* PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2) | ||
* .apply(CoGroup.join("input1", By.fieldNames("user").withOuterJoinParticipation() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: missing bracket
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
String tag = sortedTags.get(tagIndex); | ||
SerializableFunction<Object, Row> toRow = toRows.get(tag); | ||
Iterable items = gbkResult.getAll(tagToTupleTag.get(tag)); | ||
if (!items.iterator().hasNext() && joinArgs.getOuterJoinParticipation(tag)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm wondering if it worth to preserve iterator for the next loop because creating a new one and checking hasNext
could be expensive in the case of UnionValueIterator
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
private final Schema outputSchema; | ||
private final Map<String, String> tagToTupleTag; | ||
|
||
public ExpandToRows( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: What do you think about splitting into two parts:
- dealing with tags, gbkResult and creating iterators with a single
null
element in case of outer join participation - doing cross-product for
List<Iterable>
From what I see, it could save us from doing lookups in sortedTags
and tagToTupleTag
for each output row, and simplify the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is an excellent point. Split this out so we create the iterators ahead of time. Also changed maps to key off of tag index so that we don't have to hash the string on every single element.
@@ -190,6 +201,12 @@ public String toString() { | |||
return innerGetOnly(tag, defaultValue, true); | |||
} | |||
|
|||
/** Like {@link #getOnly(TupleTag, Object)} but uisng a String instead of a TupleTag. */ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: s/uisng/using/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -123,6 +200,10 @@ public static PCollectionTuple empty(Pipeline pipeline) { | |||
return pcollectionMap.containsKey(tag); | |||
} | |||
|
|||
public <T> boolean has(String tag) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we copy Javadoc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
@@ -137,6 +218,10 @@ public static PCollectionTuple empty(Pipeline pipeline) { | |||
return pcollection; | |||
} | |||
|
|||
public <T> PCollection<T> get(String tag) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should we copy Javadoc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
Also filed [BEAM-6756] |
Run Java PreCommit |
return byFieldAccessDescriptor(FieldAccessDescriptor.withFieldIds(fieldIds)); | ||
static { | ||
NULL_LIST = Lists.newArrayList(); | ||
NULL_LIST.add(null); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure it's going to do safe publishing because list is modified after final value is assigned. What about NULL_LIST = Arrays.asList((Object) null)
LGTM |
Run Java PreCommit |
Multiple improvements to the schema CoGroup transform:
Allow the user to use strings instead of TupleTags. TupleTags existed to make Java type inference work, and this is not needed with the schema-based join as the types are in the schema. This also allows a simpler builder for PCollectionTuple.
Instead of multiple CoGroup.byFieldNames, byFieldIds, etc. the new syntax is CoGroup.join(By.fieldNames), CoGroup.join(By.fieldIds), etc. This shrinks the API surface area, and also provides a place to provide per-input options (used for outer joins).
Add a .crossProductJoin. This expands the iterables into an inner-product. For example:
PCollection innerJoined = inputs.apply(
CoGroup.join("input1", By.fieldNames("user"))
.join("input2", By.fieldNames("user"))
.crossProductJoin();
Each input can be marked for "outer-join" participation semantics. This means that if no records for that input are present for a join key, an output is still generated from the cross product with the value for that input replaced by a null. This generalizes normal left/right/full outer joins to N inputs. For example with two inputs:
PCollection leftOuterJoined = inputs.apply(
CoGroup.join("input1", By.fieldNames("user").withOuterJoinParticipation())
.join("input2", By.fieldNames("user"))
.crossProductJoin();
R: @dpmills
R: @akedin