-
Notifications
You must be signed in to change notification settings - Fork 13.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement the convenience methods count and collect in DataSet #210
Conversation
Does this work when using you use |
well nevermind, just saw that you're calling |
Why didn't you implement these as an identity-(map)function and save the information to unique accumulators (maybe identified by a user-supplied ID? you could insert them into any plan at any point any number of times, i can see that being useful. |
Thank you for your feedback. The idea is to have intermediate results available to the user program within the execution of a Flink program. Currently, this is only possible through RichFunction which provide access to the RuntimeContext. |
I think executing right away makes sense, because we need the As for IDs, I agree that a random generated ID per method makes sense. |
Very cool! I think this is a great addition and we will improve on the performance soon :) @zentol, as @StephanEwen said it makes sense to execute right away. You can think of this as "actions" from Spark, which get results back to the user. He can't use a map function, because he doesn't emit any records, but just adds them to the accumulator. I've tried this out locally and it works, but you have to add the random IDs as you run into problem with multiple actions in the same program otherwise. With this change, we also should allow to have prorams without sinks. Currently, you get a warning if you only have "actions" like collect/count without a sink. Before we can merge this, you should also add test cases for both operators and make sure that it works with multiple actions per program. |
hmm...alright, i can see the point. Doesn't executing right away carries the risk of it being inefficient when using them multiple times though? Since it effectively means executing multiple jobs within the same program (i think ... ), any common part of the jobs are done an extra time. (if I'm wrong here skip the rest) example:
this would result in 2 jobs being executed, with map(X) being executed twice. whereas
would only be 1 job, with map(X) done only once. it is not as pretty (by a fair margin i admit), but in line with the current API. |
I changed the code to generate a unique identifier for each call of count or collect. @zentol Executing in a lazy fashion could be implemented but would require additional changes to the API. The methods should then return something like a LocalDataSet which lets the user retrieve the accumulator result once the job has been executed. |
why additional changes? you can already retrieve the accumulators by using the JobExecutionResult. the only necessary change i see is removing the
why would this not be sufficient? |
public void write(DataOutputView out) throws IOException { | ||
ObjectOutputStream outStream = new ObjectOutputStream(new DataOutputViewStream(out)); | ||
outStream.writeObject(typeSerializer); | ||
outStream.flush(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let us change this to close()
With the scheduler and intermediate data set enhancements coming up for 0.9 soon, this is now quite feasible to use. I suggest to merge it once the inline comments are addressed. |
I fixed the proposed changes and rebased to the current master. The changes are only reflected in the Java API and need to be added to the Scala API as well. |
I've implemented count and collect in the Scala API. There is still a problem with the |
Looks like this is now ready to merge. @zentol I understand your concern. However, I think that it is much easier to execute in this way. Most of the times, the user probably wants just one accumulator result and not multiple. This is supposed to be a convenience function. |
@zentol You are right, for the time being, that this results in parts in repeated execution. While not totally unavoidable in all cases, the code going in soon about caching intermediate results will help there big time. |
40f4e1c
to
1ef8e82
Compare
I squashed the commits and rebased to the current master. Any objections against merging this? |
@@ -58,6 +58,12 @@ under the License. | |||
<version>0.5.1</version> | |||
</dependency> | |||
|
|||
<dependency> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are you moving netty to flink-core
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Simply because AbstractID depends on io.netty.buffer.ByteBuf
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Allright, that makes sense
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, only a single convenience method in AbstractID
depends on this. I would propose to refactor that method into one of the existing utility classes in flink-runtime
and keep the Netty dependency in flink-runtime
.
Let us not move netty to flink-core. Let us rather pull that method out of the AbstractID, or, not use AbstractID at in your code, but the Java UUID. |
… DataSet to the client - this implements two convenience methods on DataSet for the Java and Scala API - appropriate tests have been added count(): returns the number of elements in a DataSet collect(): returns a List<T> with the actual elements of a DataSet<T> - both methods use accumulators to get the results back to the client - both methods force an execution of the job to generate the results
I moved the |
If there are no objections, I will merge this once Travis passes. |
… DataSet to the client - this implements two convenience methods on DataSet for the Java and Scala API - appropriate tests have been added count(): returns the number of elements in a DataSet collect(): returns a List<T> with the actual elements of a DataSet<T> - both methods use accumulators to get the results back to the client - both methods force an execution of the job to generate the results This closes apache#210
This closes apache#210.
These methods provide convenience for the API user to get intermediate results into the program.