Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-3943] Add support for EXCEPT operator #2169

Closed
wants to merge 6 commits into from

Conversation

mushketyk
Copy link
Contributor

Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.

  • General
    • The pull request references the related JIRA issue ("[FLINK-XXX] Jira title text")
    • The pull request addresses only one issue
    • Each commit in the PR has a meaningful commit message (including the JIRA id)
  • Documentation
    • Documentation has been added for new functionality
    • Old documentation affected by the pull request has been updated
    • JavaDoc for public methods has been added
  • Tests & Build
    • Functionality added by the pull request is covered by tests
    • mvn clean verify has been executed successfully locally or a Travis build has passed

@mushketyk
Copy link
Contributor Author

Renamed UnionITCase to SetOperationsITCase as suggested here: #2159


val minusRes = leftDataSet.minus(rightDataSet)
if (!all) {
minusRes.distinct()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to remove duplicate records in CoGroup, emit only one record instead of every record from left data set when it is a minus without all. There is no need a distinct afterwards. It's the more robust choice because it won't create a huge intermediate result in case of many duplicate records.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense. Will update.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On the second hand "union" method in DataSet.java preserves duplicates and used to implement UNION ALL. UNION is implemented by using "distinct" operation on top of UNION ALL.
It seems that what you suggest will only add code duplication since it will basically implement a specialized version of "distinct" operator specifically for the "minus" method.

I have no strong preferences though. @fhueske what do you think?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to move the code of DataSet.minus() here. However, I think the semantics of EXCEPT ALL are a bit different than in your implementation. It is not simply checking if there is a match in the second input and forwarding everything if there is none. It basically removes for each match in the second input one matching record from the first input (see also the PostgreSQL docs).

I would be in favor of an implementation that is similar to @wuchong's implementation of INTERSECT / INTERSECT ALL in PR #2159.

@wuchong
Copy link
Member

wuchong commented Jun 28, 2016

Hi @mushketyk, I think we should remove duplicate records in CoGroup instead of using distinct. Others looks good to me.

@mushketyk
Copy link
Contributor Author

Updated documentation as suggested.

* @param other The other DataSet which is set minus with the current DataSet.
* @return The resulting DataSet.
*/
public CoGroupOperator<T, T, T> minus(DataSet<T> other){
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This issue is about adding EXCEPT to the Table. The DataSet API which is touched here is a rather low level API and we are quite careful about adding new operators. Therefore, changes to the DataSet API should go through a separate JIRA issue. Please move this code to DataSetMinus class and revert the changes to this file. You can open a JIRA issue to discuss adding a minus operator to the DataSet API. Thank you.

@fhueske
Copy link
Contributor

fhueske commented Jun 29, 2016

Hi @mushketyk, thanks for the PR! I added a few comments inline.

Best, Fabian
@wuchong, thanks for reviewing!

@mushketyk
Copy link
Contributor Author

@fhueske Thank you for the detailed review! I've updated my code according to your comments.

I noticed that @wuchong is performing some type conversions in his INTERSECT implementation: https://github.com/apache/flink/pull/2159/files#diff-a6c2112ca46d26fcf49f1edba1c73f75R121

Should I do something similar in the EXCEPT case? If yes, does it mean that my test coverage is not sufficient and does not cover some particular case?

@mushketyk
Copy link
Contributor Author

@fhueske Hey. Sorry for bothering you. Do I need to change something else in this PR?

@@ -536,6 +536,29 @@ Table result = left.unionAll(right);
</tr>

<tr>
<td><strong>Minus</strong></td>
<td>
<p>Similar to a SQL EXCEPT clause. Except returns records from the first table that do not exist in the second table. Duplicate records in the first table are returned exactly once, i.e., duplicates are removed. Both tables must have identical schema, i.e., field names and types.</p>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Except returns records" should be "Minus returns records" to be consistent. I would also use "left/right" table instead of first and second according to your example code.

import scala.collection.JavaConversions._

/**
* Flink RelNode which matches along with DataSetOperator.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This description does not make much sense.

@twalthr
Copy link
Contributor

twalthr commented Jul 8, 2016

@mushketyk Thanks for the PR.
I also reviewed the current status. I think it's good to compare your code with #2159 before you rework it.

@mushketyk
Copy link
Contributor Author

@twalthr Thank you for your review! I've update the PR according to your comments.

@mushketyk
Copy link
Contributor Author

@twalthr Updated PR according to your commits and rebased on top of the master branch to avoid merge conflicts.

@twalthr
Copy link
Contributor

twalthr commented Jul 11, 2016

Thanks for updating the PR. I reviewed the code again. There were still some issues regarding the documentation and also expectedType handling in DataSetMinus, I fixed them myself and will merge now...

@asfgit asfgit closed this in 9753393 Jul 11, 2016
@mushketyk
Copy link
Contributor Author

@twalthr Thank you for accepting my changes!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants