-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-1622][java-api][scala-api] add a GroupCombine operator #466
Conversation
import java.util.List; | ||
|
||
/** | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mxm, could you add class description on why the base class need to be created and how it relates to existing ones?
We would love to add more documentation on the code to help new contributors get comfortable with the code flow.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for spotting the missing doc in this class. I'll add some.
I like the implementation, except for my comments on groupReducePartial() on grouped DataSets. Also, the tests seem a bit shady because of all the grouping and regular reduceGroup operations. I would suggest partitioning the data using a manual partition operation and then applying a GroupReducePartial. |
I assume this will be new operator but I do not see updates on the documentation files. |
@aljoscha Thanks for the comments. I agree, the tests are a bit shady because they test the operator by first performing a partial, then a full reduce. Using a custom partitioner would make more sense. @hsaputra Yes, we absolutely have to add docs apart from the Java/Scala docs. What do you think about the name? @fhueske suggested to expose the operator as |
8eb4dfe
to
e655d22
Compare
Yes, I've got a couple of comments as well. First of all, as @mxm said, I would propose to call this operator I think @aljoscha is right. Apart from documentation, also a few Scala tests should be added. |
* This operator behaves like the GroupReduceOperator with Combine but only runs the Combine part which reduces all data | ||
* locally in their partitions. The combine part can return an arbitrary data type. This is useful to pre-combine values | ||
* into an intermediate representation before applying a proper reduce operation. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isnt't the partial GroupReduce only working on subsets of a local partition (as much as fits in the sort buffer)?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is correct.
I have to correct myself. A combiner should of course be called on groups of records. Therefore, calling it on a Grouping makes absolute sense. However, the semantics of the Grouping are different for Combine and Reduce. For Reduce a full Grouping is established with repartitioning and full sort, whereas for Combine only a partial local grouping should be created. |
@aljoscha @fhueske For a general combine, the operator can be used without grouping. When we want to combine elements before performing a proper groupReduce with a groupBy, we also need to use groupBy for the combine. Otherwise, we wouldn't know in the combiner, which keys belong together. However, there are cases where a combiner without a groupBy would be appriopriate. That's why the new operator is exposed in |
Sorry, I completely blanked, of course, You still need the grouping, only the shuffle step you don't need. So, I suggest only better tests, using a combination of partitionByHash() and groupReducePartial(). |
@aljoscha @fhueske @hsaputra Thanks for the feedback. Some people suggested that the name is confusing and that my pull request involved too much code duplication. I propose to call the new operator @hsaputra I added documentation for the operator in the code and the official documentation. Some Scala tests for the API were added as well. When merging this pull request, I would squash the two commits and keep the commit message of the latest one. I simply wanted to keep the first one to show the development process. |
8b60fbb
to
11d0581
Compare
import java.util.Collections; | ||
import java.util.List; | ||
|
||
public final class GroupCombineProperties extends OperatorDescriptorSingle { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you add JavaDoc for this class? I know other derived classes for OperatorDescriptorSingle does not have JavaDoc, yet but I am trying to get new code added to have at least simple explanation why the class should be created.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally agree with you. We should not accept undocumented classes. I added some doc, also for the base class.
Thanks @mxm ! I add couple more comments about documentation. I am trying to promote habit of adding JavaDoc comment for new classes to help indicate why it has to be created. Reduction of code is harder to do than adding new ones. |
11d0581
to
f3aaa31
Compare
The partial GroupReduce operator acts like a regular GroupReduce operator but does not perform a full group reduce. Instead, it performs the GroupReduce only on the individual partitions. This may lead to a partial GroupReduce result. The operator can be used to pre-combine elements into an intermediate output format before applying a proper groupReduce to produce the final output format.
f3aaa31
to
2df89c9
Compare
The GroupCombine operator acts like a the optional combine step in the GroupReduceFunction. It is more general because it combines from an input to an arbitrary output type. Combining is performed on the partitions with as much data in memory as possible. This may lead to partial results. The operator can be used to pre-combine elements into an intermediate output format before applying a proper groupReduce to produce the final output format. * rename GroupReducePartial to GroupCombine * make Combine and FlatCombine generic by adding an output type * add documentation * Reuse GroupReduceCombineDriver and SynchronousChainedCombineDriver for GroupCombine operator ** make them more generic by specifying input and output type ** implement AllCombineDriver * add Java tests * add Scala test
2df89c9
to
5080602
Compare
@hsaputra Thanks for the feedback. Promoting the habit of documenting every class is a very good thing. |
HI @mxm, appreciate the ACK. Just trying to keep up all informed with more code coming in. |
Any further comments? |
I would say it's good to go now. |
The GroupCombine operator acts like a the optional combine step in the GroupReduceFunction. It is more general because it combines from an input to an arbitrary output type. Combining is performed on the partitions with as much data in memory as possible. This may lead to partial results. The operator can be used to pre-combine elements into an intermediate output format before applying a proper groupReduce to produce the final output format. * make Combine and FlatCombine generic by adding an output type * add documentation * Reuse GroupReduceCombineDriver and SynchronousChainedCombineDriver for GroupCombine operator ** make them more generic by specifying input and output type ** implement AllCombineDriver * add Java tests * add Scala test This closes apache#466
The GroupCombine operator acts like a the optional combine step in the GroupReduceFunction. It is more general because it combines from an input to an arbitrary output type. Combining is performed on the partitions with as much data in memory as possible. This may lead to partial results. The operator can be used to pre-combine elements into an intermediate output format before applying a proper groupReduce to produce the final output format. * make Combine and FlatCombine generic by adding an output type * add documentation * Reuse GroupReduceCombineDriver and SynchronousChainedCombineDriver for GroupCombine operator ** make them more generic by specifying input and output type ** implement AllCombineDriver * add Java tests * add Scala test This closes apache#466
The GroupCombine operator acts like a the optional combine step in the GroupReduceFunction. It is more general because it combines from an input to an arbitrary output type. Combining is performed on the partitions with as much data in memory as possible. This may lead to partial results. The operator can be used to pre-combine elements into an intermediate output format before applying a proper groupReduce to produce the final output format. * make Combine and FlatCombine generic by adding an output type * add documentation * Reuse GroupReduceCombineDriver and SynchronousChainedCombineDriver for GroupCombine operator ** make them more generic by specifying input and output type ** implement AllCombineDriver * add Java tests * add Scala test This closes apache#466
The GroupCombine operator acts like a the optional combine step in the
GroupReduceFunction. It is more general because it combines from an
input to an arbitrary output type. Combining is performed on the
partitions with as much data in memory as possible. This may lead to
partial results.
The operator can be used to pre-combine elements into an intermediate
output format before applying a proper groupReduce to produce the final
output format.