-
Notifications
You must be signed in to change notification settings - Fork 13.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-3943] Add support for EXCEPT operator #2169
Conversation
Renamed UnionITCase to SetOperationsITCase as suggested here: #2159 |
|
||
val minusRes = leftDataSet.minus(rightDataSet) | ||
if (!all) { | ||
minusRes.distinct() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's better to remove duplicate records in CoGroup, emit only one record instead of every record from left data set when it is a minus without all. There is no need a distinct afterwards. It's the more robust choice because it won't create a huge intermediate result in case of many duplicate records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense. Will update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
On the second hand "union" method in DataSet.java preserves duplicates and used to implement UNION ALL. UNION is implemented by using "distinct" operation on top of UNION ALL.
It seems that what you suggest will only add code duplication since it will basically implement a specialized version of "distinct" operator specifically for the "minus" method.
I have no strong preferences though. @fhueske what do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would like to move the code of DataSet.minus()
here. However, I think the semantics of EXCEPT ALL
are a bit different than in your implementation. It is not simply checking if there is a match in the second input and forwarding everything if there is none. It basically removes for each match in the second input one matching record from the first input (see also the PostgreSQL docs).
I would be in favor of an implementation that is similar to @wuchong's implementation of INTERSECT
/ INTERSECT ALL
in PR #2159.
Hi @mushketyk, I think we should remove duplicate records in CoGroup instead of using |
Updated documentation as suggested. |
* @param other The other DataSet which is set minus with the current DataSet. | ||
* @return The resulting DataSet. | ||
*/ | ||
public CoGroupOperator<T, T, T> minus(DataSet<T> other){ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This issue is about adding EXCEPT
to the Table. The DataSet API which is touched here is a rather low level API and we are quite careful about adding new operators. Therefore, changes to the DataSet API should go through a separate JIRA issue. Please move this code to DataSetMinus
class and revert the changes to this file. You can open a JIRA issue to discuss adding a minus
operator to the DataSet API. Thank you.
Hi @mushketyk, thanks for the PR! I added a few comments inline. Best, Fabian |
@fhueske Thank you for the detailed review! I've updated my code according to your comments. I noticed that @wuchong is performing some type conversions in his INTERSECT implementation: https://github.com/apache/flink/pull/2159/files#diff-a6c2112ca46d26fcf49f1edba1c73f75R121 Should I do something similar in the EXCEPT case? If yes, does it mean that my test coverage is not sufficient and does not cover some particular case? |
@fhueske Hey. Sorry for bothering you. Do I need to change something else in this PR? |
@@ -536,6 +536,29 @@ Table result = left.unionAll(right); | |||
</tr> | |||
|
|||
<tr> | |||
<td><strong>Minus</strong></td> | |||
<td> | |||
<p>Similar to a SQL EXCEPT clause. Except returns records from the first table that do not exist in the second table. Duplicate records in the first table are returned exactly once, i.e., duplicates are removed. Both tables must have identical schema, i.e., field names and types.</p> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Except returns records" should be "Minus returns records" to be consistent. I would also use "left/right" table instead of first and second according to your example code.
import scala.collection.JavaConversions._ | ||
|
||
/** | ||
* Flink RelNode which matches along with DataSetOperator. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This description does not make much sense.
@mushketyk Thanks for the PR. |
@twalthr Thank you for your review! I've update the PR according to your comments. |
@twalthr Updated PR according to your commits and rebased on top of the master branch to avoid merge conflicts. |
Thanks for updating the PR. I reviewed the code again. There were still some issues regarding the documentation and also |
@twalthr Thank you for accepting my changes! |
Thanks for contributing to Apache Flink. Before you open your pull request, please take the following check list into consideration.
If your changes take all of the items into account, feel free to open your pull request. For more information and/or questions please refer to the How To Contribute guide.
In addition to going through the list, please provide a meaningful description of your changes.
mvn clean verify
has been executed successfully locally or a Travis build has passed