Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-1622][java-api][scala-api] add a GroupCombine operator #466

Closed
wants to merge 2 commits into from

Conversation

mxm
Copy link
Contributor

@mxm mxm commented Mar 9, 2015

The GroupCombine operator acts like a the optional combine step in the
GroupReduceFunction. It is more general because it combines from an
input to an arbitrary output type. Combining is performed on the
partitions with as much data in memory as possible. This may lead to
partial results.

The operator can be used to pre-combine elements into an intermediate
output format before applying a proper groupReduce to produce the final
output format.

import java.util.List;

/**
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mxm, could you add class description on why the base class need to be created and how it relates to existing ones?
We would love to add more documentation on the code to help new contributors get comfortable with the code flow.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for spotting the missing doc in this class. I'll add some.

@aljoscha
Copy link
Contributor

I like the implementation, except for my comments on groupReducePartial() on grouped DataSets. Also, the tests seem a bit shady because of all the grouping and regular reduceGroup operations. I would suggest partitioning the data using a manual partition operation and then applying a GroupReducePartial.

@hsaputra
Copy link
Contributor

I assume this will be new operator but I do not see updates on the documentation files.

@mxm
Copy link
Contributor Author

mxm commented Mar 10, 2015

@aljoscha Thanks for the comments. I agree, the tests are a bit shady because they test the operator by first performing a partial, then a full reduce. Using a custom partitioner would make more sense.

@hsaputra Yes, we absolutely have to add docs apart from the Java/Scala docs.

What do you think about the name? @fhueske suggested to expose the operator as combine because it is essentially a user-accessible combiner..

@fhueske
Copy link
Contributor

fhueske commented Mar 10, 2015

Yes, I've got a couple of comments as well.

First of all, as @mxm said, I would propose to call this operator combine because it is a generalized combiner (output type may be different from input type). The current combiner could be implemented as special case of this one (input = output type).

I think @aljoscha is right. partialGroupReduce (or combine) should be applied on a DataSet, not a GroupedDataSet. Otherwise, you lose most of the benefits (local processing) and would ship all data which is exactly what you want to avoid when using a combiner. If you want to run a reduceafterwards, you simply do groupBy(x).reduceGroup().

Apart from documentation, also a few Scala tests should be added.

* This operator behaves like the GroupReduceOperator with Combine but only runs the Combine part which reduces all data
* locally in their partitions. The combine part can return an arbitrary data type. This is useful to pre-combine values
* into an intermediate representation before applying a proper reduce operation.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt't the partial GroupReduce only working on subsets of a local partition (as much as fits in the sort buffer)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That is correct.

@fhueske
Copy link
Contributor

fhueske commented Mar 10, 2015

I have to correct myself. A combiner should of course be called on groups of records. Therefore, calling it on a Grouping makes absolute sense. However, the semantics of the Grouping are different for Combine and Reduce. For Reduce a full Grouping is established with repartitioning and full sort, whereas for Combine only a partial local grouping should be created.

@mxm
Copy link
Contributor Author

mxm commented Mar 10, 2015

@aljoscha @fhueske For a general combine, the operator can be used without grouping. When we want to combine elements before performing a proper groupReduce with a groupBy, we also need to use groupBy for the combine. Otherwise, we wouldn't know in the combiner, which keys belong together. However, there are cases where a combiner without a groupBy would be appriopriate. That's why the new operator is exposed in DataSet, GroupedDataSet, ad SortedDataSet. We currently have the same behavior for a normal GroupReduce despite the fact that the combine of the GroupReduce cannot change the input type.

@aljoscha
Copy link
Contributor

Sorry, I completely blanked, of course, You still need the grouping, only the shuffle step you don't need.

So, I suggest only better tests, using a combination of partitionByHash() and groupReducePartial().

@mxm
Copy link
Contributor Author

mxm commented Mar 13, 2015

@aljoscha @fhueske @hsaputra Thanks for the feedback. Some people suggested that the name is confusing and that my pull request involved too much code duplication. I propose to call the new operator combineGroup because it is a combiner that works on Groups, just like the existing combiners in GroupReduceCombineDriver and AllGroupReduceDriver. I refactored the code to reuse Flink's existing logic for combiners.

@hsaputra I added documentation for the operator in the code and the official documentation. Some Scala tests for the API were added as well.

When merging this pull request, I would squash the two commits and keep the commit message of the latest one. I simply wanted to keep the first one to show the development process.

@mxm mxm changed the title [FLINK-1622][java-api][scala-api] add a partial GroupReduce operator [FLINK-1622][java-api][scala-api] add a GroupCombine operator Mar 13, 2015
import java.util.Collections;
import java.util.List;

public final class GroupCombineProperties extends OperatorDescriptorSingle {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add JavaDoc for this class? I know other derived classes for OperatorDescriptorSingle does not have JavaDoc, yet but I am trying to get new code added to have at least simple explanation why the class should be created.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Totally agree with you. We should not accept undocumented classes. I added some doc, also for the base class.

@hsaputra
Copy link
Contributor

Thanks @mxm ! I add couple more comments about documentation. I am trying to promote habit of adding JavaDoc comment for new classes to help indicate why it has to be created. Reduction of code is harder to do than adding new ones.

The partial GroupReduce operator acts like a regular GroupReduce
operator but does not perform a full group reduce. Instead, it performs
the GroupReduce only on the individual partitions. This may lead to a
partial GroupReduce result.

The operator can be used to pre-combine elements into an intermediate
output format before applying a proper groupReduce to produce the final
output format.
The GroupCombine operator acts like a the optional combine step in the
GroupReduceFunction. It is more general because it combines from an
input to an arbitrary output type. Combining is performed on the
partitions with as much data in memory as possible. This may lead to
partial results.

The operator can be used to pre-combine elements into an intermediate
output format before applying a proper groupReduce to produce the final
output format.

* rename GroupReducePartial to GroupCombine

* make Combine and FlatCombine generic by adding an output type

* add documentation

* Reuse GroupReduceCombineDriver and SynchronousChainedCombineDriver for GroupCombine operator
** make them more generic by specifying input and output type
** implement AllCombineDriver

* add Java tests
* add Scala test
@mxm
Copy link
Contributor Author

mxm commented Mar 16, 2015

@hsaputra Thanks for the feedback. Promoting the habit of documenting every class is a very good thing.

@hsaputra
Copy link
Contributor

HI @mxm, appreciate the ACK. Just trying to keep up all informed with more code coming in.
It is harder to read other people code and flow so was just trying to ease up the process =)

@mxm
Copy link
Contributor Author

mxm commented Mar 18, 2015

Any further comments?

@aljoscha
Copy link
Contributor

I would say it's good to go now.

@asfgit asfgit closed this in e93e0cb Mar 18, 2015
mafernandez-stratio pushed a commit to mafernandez-stratio/flink that referenced this pull request Mar 18, 2015
The GroupCombine operator acts like a the optional combine step in the
GroupReduceFunction. It is more general because it combines from an
input to an arbitrary output type. Combining is performed on the
partitions with as much data in memory as possible. This may lead to
partial results.

The operator can be used to pre-combine elements into an intermediate
output format before applying a proper groupReduce to produce the final
output format.

* make Combine and FlatCombine generic by adding an output type

* add documentation

* Reuse GroupReduceCombineDriver and SynchronousChainedCombineDriver for GroupCombine operator
** make them more generic by specifying input and output type
** implement AllCombineDriver

* add Java tests
* add Scala test

This closes apache#466
bhatsachin pushed a commit to bhatsachin/flink that referenced this pull request May 5, 2015
The GroupCombine operator acts like a the optional combine step in the
GroupReduceFunction. It is more general because it combines from an
input to an arbitrary output type. Combining is performed on the
partitions with as much data in memory as possible. This may lead to
partial results.

The operator can be used to pre-combine elements into an intermediate
output format before applying a proper groupReduce to produce the final
output format.

* make Combine and FlatCombine generic by adding an output type

* add documentation

* Reuse GroupReduceCombineDriver and SynchronousChainedCombineDriver for GroupCombine operator
** make them more generic by specifying input and output type
** implement AllCombineDriver

* add Java tests
* add Scala test

This closes apache#466
marthavk pushed a commit to marthavk/flink that referenced this pull request Jun 9, 2015
The GroupCombine operator acts like a the optional combine step in the
GroupReduceFunction. It is more general because it combines from an
input to an arbitrary output type. Combining is performed on the
partitions with as much data in memory as possible. This may lead to
partial results.

The operator can be used to pre-combine elements into an intermediate
output format before applying a proper groupReduce to produce the final
output format.

* make Combine and FlatCombine generic by adding an output type

* add documentation

* Reuse GroupReduceCombineDriver and SynchronousChainedCombineDriver for GroupCombine operator
** make them more generic by specifying input and output type
** implement AllCombineDriver

* add Java tests
* add Scala test

This closes apache#466
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants