Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-758] Add count operator to DataSet #63

Closed
wants to merge 6 commits into from

Conversation

uce
Copy link
Contributor

@uce uce commented Jul 7, 2014

  • Adds a count operator to DataSet:
DataSet<String> text = env.fromElements(
    "Who's there?",
    "I think I hear them. Stand, ho! Who's there?");
DataSet<Long> count = text.count(); // 2
  • Adds an all (ungrouped) reduce variant, which allows to specify an initial value to the reduce function
  • Adds an utility method to InstantiationUtil to serialize a record to a byte array
  • Fixes some javadocs warnings in DataSet

I've tested the count operator on a cluster with DOP 1, 200 and verified the results with cat | wc -l.

@hsaputra
Copy link
Contributor

hsaputra commented Jul 8, 2014

Does it need corresponding Scala API counterpart?

@uce
Copy link
Contributor Author

uce commented Jul 9, 2014

It already has the corresponding counterpart. But after your question I realized that the Java API is lacking the count on grouped datasets, which the Scala API supports.

input.count() // works
input.groupBy(...).count() // does not work

I will add it to this PR.

@uce
Copy link
Contributor Author

uce commented Jul 9, 2014

Should be in line with Scala now.

@aljoscha: expression key grouping did not work wih group reduce. after looking into the reduce operator, where it does work, I figured that the small change in 3ccb500 should be enough. Can you confirm this?

@aljoscha
Copy link
Contributor

aljoscha commented Jul 9, 2014

I think it should work, yes.

uce added 5 commits July 21, 2014 14:02
Adds an all (ungrouped) reduce variant to DataSet, which allows to
specify an initial value for the ReduceFunction. The initial value is
an extra input element to the ReduceFuntion and needs to be of the same
type as the ReduceFunction input type. The initial value is not used
with the combiners.

In cases, where there is no input to the reduce function, for example
after a filter operator, which filters all elements, the ReduceFunction
will be called with the initial value only.
Adds a count method to DataSet, which translates to a map-reduce. The
map operator maps each element to a 1 and the reduce operator sums up
all the 1s, resulting in the total count of elements.
The GroupReduceOperator did not work with DataSets, which have been
grouped by expression keys like groupBy("myField").
Adds support for count() on grouped DataSets, which will be translated
to a non-combinable GroupReduceFunction. If the grouping is done on an
empty DataSet, e.g. after a filter-all, the result of the count will be
empty (since there are no groups to count).
@uce
Copy link
Contributor Author

uce commented Jul 21, 2014

I've rebased this PR on the renamed master, but it is just a single commit (not the most elegant way to do this... if someone complains I'll fix it).

return countReduceOp;
}
else {
return new ReduceGroupOperator<IN, Long>(grouping, new CountingGroupReduceUdf<IN>())
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using a non-combinable GroupReduceFunction for counting is unnecessarily inefficient.
We could extract the key fields using a Mapper and add a count-1 and use a ReduceFunction as well.
This requires a few cases due to different key types but should be the way to go.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@fhueske
Copy link
Contributor

fhueske commented Sep 7, 2014

I had a look at this PR and found a few issues:

  • it contains changes for several independent features
    • Initial value for ReduceFunction
    • Count operator
    • many cosmetic changes / documentation improvements
  • my gut feeling is, that rebasing this PR onto the current master will cause many merge conflicts. It might be worthwhile to separate these issues into independent PRs to make the merging easier.
  • counting for grouped datasets is done with a non-combinable GroupReduceFunction which is not vey efficient
  • An initial value for ReduceFunction is only supported for AllReduce. I see that the original motivation for this (a 0-valued count for empty datasets) does not make sense for grouped ReduceFunctions, but this is not the only way an initial value could be used.

@uce
Copy link
Contributor Author

uce commented Sep 7, 2014

Thanks for the review. The initial value for the reduce function and the count operator are tightly connected. The reduce with initial value is the general solution, of which the count operator is a special case. Therefore, I wouldn't say that these are independent features. The refactorings are also limited to files related to the initial value reduce/count operator.

The counting for grouped data sets was a quick fix after @hsaputra's comment. We can either fix it with this PR or open a seperate issue if we want to merge it.

I think the limitation to AllReduce was the result of a discussion with you and @StephanEwen.


All in all, I think that we should wait for the upcoming changes to the runtime and scheduler to support the more intuitive API of simply returning the count to the user program. As you said, we might move some of the changes (like initial value reduce) to a separate issue if we find them useful.

@fhueske
Copy link
Contributor

fhueske commented Sep 8, 2014

Sure, the count op requires the init value, but not vice versa. I meant we could do the init value change first (which also has a separate JIRA I think) and put the count op on top.

If we wait for the runtime changes, should we just close this PR for now and extract the init value changes for a separate PR?

@StephanEwen
Copy link
Contributor

Bear in mind that the count will be given for free also when the first-class handling of intermediate results is merged.

@uce
Copy link
Contributor Author

uce commented Nov 18, 2014

See #210.

@uce
Copy link
Contributor Author

uce commented Nov 19, 2014

I'll think it's best to close this PR. It's superceded by the upcoming intermediate result changes and #210. Nobody complained about not having a fold-style reduce with an initial value, so I think it's OK to discard these features as well. I will only merge the byte array serialization commit (1b893a1) of this PR as I think it might be useful in general.

@uce uce closed this Nov 19, 2014
@uce uce deleted the FLINK-758-count_operator branch February 23, 2015 09:18
tzulitai pushed a commit to tzulitai/flink that referenced this pull request Jan 15, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
6 participants