-
Notifications
You must be signed in to change notification settings - Fork 13.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-758] Add count operator to DataSet #63
Conversation
Does it need corresponding Scala API counterpart? |
It already has the corresponding counterpart. But after your question I realized that the Java API is lacking the count on grouped datasets, which the Scala API supports.
I will add it to this PR. |
I think it should work, yes. |
Adds an all (ungrouped) reduce variant to DataSet, which allows to specify an initial value for the ReduceFunction. The initial value is an extra input element to the ReduceFuntion and needs to be of the same type as the ReduceFunction input type. The initial value is not used with the combiners. In cases, where there is no input to the reduce function, for example after a filter operator, which filters all elements, the ReduceFunction will be called with the initial value only.
Adds a count method to DataSet, which translates to a map-reduce. The map operator maps each element to a 1 and the reduce operator sums up all the 1s, resulting in the total count of elements.
The GroupReduceOperator did not work with DataSets, which have been grouped by expression keys like groupBy("myField").
Adds support for count() on grouped DataSets, which will be translated to a non-combinable GroupReduceFunction. If the grouping is done on an empty DataSet, e.g. after a filter-all, the result of the count will be empty (since there are no groups to count).
I've rebased this PR on the renamed master, but it is just a single commit (not the most elegant way to do this... if someone complains I'll fix it). |
return countReduceOp; | ||
} | ||
else { | ||
return new ReduceGroupOperator<IN, Long>(grouping, new CountingGroupReduceUdf<IN>()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Using a non-combinable GroupReduceFunction for counting is unnecessarily inefficient.
We could extract the key fields using a Mapper and add a count-1 and use a ReduceFunction as well.
This requires a few cases due to different key types but should be the way to go.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
I had a look at this PR and found a few issues:
|
Thanks for the review. The initial value for the reduce function and the count operator are tightly connected. The reduce with initial value is the general solution, of which the count operator is a special case. Therefore, I wouldn't say that these are independent features. The refactorings are also limited to files related to the initial value reduce/count operator. The counting for grouped data sets was a quick fix after @hsaputra's comment. We can either fix it with this PR or open a seperate issue if we want to merge it. I think the limitation to AllReduce was the result of a discussion with you and @StephanEwen. All in all, I think that we should wait for the upcoming changes to the runtime and scheduler to support the more intuitive API of simply returning the count to the user program. As you said, we might move some of the changes (like initial value reduce) to a separate issue if we find them useful. |
Sure, the count op requires the init value, but not vice versa. I meant we could do the init value change first (which also has a separate JIRA I think) and put the count op on top. If we wait for the runtime changes, should we just close this PR for now and extract the init value changes for a separate PR? |
Bear in mind that the count will be given for free also when the first-class handling of intermediate results is merged. |
See #210. |
I'll think it's best to close this PR. It's superceded by the upcoming intermediate result changes and #210. Nobody complained about not having a fold-style reduce with an initial value, so I think it's OK to discard these features as well. I will only merge the byte array serialization commit (1b893a1) of this PR as I think it might be useful in general. |
…Stateful Functions documentation This closes apache#63.
I've tested the count operator on a cluster with DOP 1, 200 and verified the results with
cat | wc -l
.