Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BEAM-4461] Support inner and outer style joins in CoGroup. #7353

Merged
merged 9 commits into from
Mar 1, 2019

Conversation

reuvenlax
Copy link
Contributor

@reuvenlax reuvenlax commented Dec 26, 2018

Multiple improvements to the schema CoGroup transform:

  • Allow the user to use strings instead of TupleTags. TupleTags existed to make Java type inference work, and this is not needed with the schema-based join as the types are in the schema. This also allows a simpler builder for PCollectionTuple.

  • Instead of multiple CoGroup.byFieldNames, byFieldIds, etc. the new syntax is CoGroup.join(By.fieldNames), CoGroup.join(By.fieldIds), etc. This shrinks the API surface area, and also provides a place to provide per-input options (used for outer joins).

  • Add a .crossProductJoin. This expands the iterables into an inner-product. For example:
    PCollection innerJoined = inputs.apply(
    CoGroup.join("input1", By.fieldNames("user"))
    .join("input2", By.fieldNames("user"))
    .crossProductJoin();

  • Each input can be marked for "outer-join" participation semantics. This means that if no records for that input are present for a join key, an output is still generated from the cross product with the value for that input replaced by a null. This generalizes normal left/right/full outer joins to N inputs. For example with two inputs:
    PCollection leftOuterJoined = inputs.apply(
    CoGroup.join("input1", By.fieldNames("user").withOuterJoinParticipation())
    .join("input2", By.fieldNames("user"))
    .crossProductJoin();
    R: @dpmills
    R: @akedin

@reuvenlax
Copy link
Contributor Author

@akedin any comments on this PR?

@reuvenlax
Copy link
Contributor Author

@kanterov do you have any time to help review this PR?

@kanterov
Copy link
Member

@reuvenlax I'm going on vacation, but I review on the week of 25th February

@reuvenlax
Copy link
Contributor Author

@kanterov thank you very much!

@kanterov
Copy link
Member

I'm looking into it.

Copy link
Member

@kanterov kanterov left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The code looks good. Would be useful to abstract over tuple cardinality in SQL transforms, and fixing issues, for instance BEAM-5049.

I've carefully gone through the Javadoc, and it's great, was mostly nitpicking there.

One case I didn't find covered is PCollectionTuple having not all tags from JoinArguments. It would be interesting to see performance comparing to join-library, however, as I see it, at the moment the focus is to stabilize APIs.

I find TODO for doing lazy Row implementation for iteration over CoGbkResult worth JIRA ticket.

* identical between input1 and input2.
*
* <p>This transform also supports outer-join semantics. By default, all input PCollections must
* participate fully in the join, providing inner-join semantics. This means that if if all input
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/if if/if/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*
* <p>This transform also supports outer-join semantics. By default, all input PCollections must
* participate fully in the join, providing inner-join semantics. This means that if if all input
* save one have values for a given user "Bob" the join will produce no values for "Bob." However,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: looks like it was phrased as "if all inputs have values for a given user" or similar

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*
* <pre>{@code
* PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2)
* .apply(CoGroup.join("input1", By.fieldNames("user").withOuterJoinParticipation()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing closing bracket in join( for code examples, if I get it correctly, it should be:

 PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2)
   .apply(CoGroup.join("input1", By.fieldNames("user").withOuterJoinParticipation())
                 .join("input2", By.fieldNames("user"))
                 .crossProductJoin();

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*
* <pre>{@code
* PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2)
* .apply(CoGroup.join("input1", By.fieldNames("user")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing bracket

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

*
* <pre>{@code
* PCollection<Row> joined = PCollectionTuple.of("input1", input1, "input2", input2)
* .apply(CoGroup.join("input1", By.fieldNames("user").withOuterJoinParticipation()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: missing bracket

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

String tag = sortedTags.get(tagIndex);
SerializableFunction<Object, Row> toRow = toRows.get(tag);
Iterable items = gbkResult.getAll(tagToTupleTag.get(tag));
if (!items.iterator().hasNext() && joinArgs.getOuterJoinParticipation(tag)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering if it worth to preserve iterator for the next loop because creating a new one and checking hasNext could be expensive in the case of UnionValueIterator.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

private final Schema outputSchema;
private final Map<String, String> tagToTupleTag;

public ExpandToRows(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: What do you think about splitting into two parts:

  • dealing with tags, gbkResult and creating iterators with a single null element in case of outer join participation
  • doing cross-product for List<Iterable>

From what I see, it could save us from doing lookups in sortedTags and tagToTupleTag for each output row, and simplify the code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is an excellent point. Split this out so we create the iterators ahead of time. Also changed maps to key off of tag index so that we don't have to hash the string on every single element.

@@ -190,6 +201,12 @@ public String toString() {
return innerGetOnly(tag, defaultValue, true);
}

/** Like {@link #getOnly(TupleTag, Object)} but uisng a String instead of a TupleTag. */
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: s/uisng/using/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -123,6 +200,10 @@ public static PCollectionTuple empty(Pipeline pipeline) {
return pcollectionMap.containsKey(tag);
}

public <T> boolean has(String tag) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we copy Javadoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@@ -137,6 +218,10 @@ public static PCollectionTuple empty(Pipeline pipeline) {
return pcollection;
}

public <T> PCollection<T> get(String tag) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: should we copy Javadoc?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

@reuvenlax
Copy link
Contributor Author

Also filed [BEAM-6756]

@reuvenlax
Copy link
Contributor Author

Run Java PreCommit

return byFieldAccessDescriptor(FieldAccessDescriptor.withFieldIds(fieldIds));
static {
NULL_LIST = Lists.newArrayList();
NULL_LIST.add(null);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure it's going to do safe publishing because list is modified after final value is assigned. What about NULL_LIST = Arrays.asList((Object) null)

@kanterov
Copy link
Member

kanterov commented Mar 1, 2019

LGTM

@kanterov
Copy link
Member

kanterov commented Mar 1, 2019

Run Java PreCommit

@reuvenlax reuvenlax merged commit c41b3c0 into apache:master Mar 1, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants