Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the convenience methods count and collect in DataSet #210

Closed
wants to merge 2 commits into from

Conversation

mxm
Copy link
Contributor

@mxm mxm commented Nov 17, 2014

These methods provide convenience for the API user to get intermediate results into the program.

@zentol
Copy link
Contributor

zentol commented Nov 17, 2014

Does this work when using count() or collect() multiple times in the same plan?

you use this.getExecutionEnvironment().getIdString() to identify the accumulators, and as far as i see this value is the same for all operators in a plan. As such, multiple usages of count() will return the sum count of all datasets, and collect() will throw an exception since the same key is used when calling getRuntimeContext().addAccumulator(id, accumulator)

@zentol
Copy link
Contributor

zentol commented Nov 17, 2014

well nevermind, just saw that you're calling this.getExecutionEnvironment().execute(); within these methods, so when using it, count() or collect() would be the last method called. I find that very odd behaviour, i can't think of another method that executes than right away.

@zentol
Copy link
Contributor

zentol commented Nov 17, 2014

Why didn't you implement these as an identity-(map)function and save the information to unique accumulators (maybe identified by a user-supplied ID? you could insert them into any plan at any point any number of times, i can see that being useful.

@mxm
Copy link
Contributor Author

mxm commented Nov 18, 2014

Why didn't you implement these as an identity-(map)function and save the information to unique accumulators (maybe identified by a user-supplied ID?

Thank you for your feedback. The idea is to have intermediate results available to the user program within the execution of a Flink program. Currently, this is only possible through RichFunction which provide access to the RuntimeContext.

@StephanEwen
Copy link
Contributor

I think executing right away makes sense, because we need the long value or the List<T> immediately. This is a new type of methods, where you fetch back data to the driver program.

As for IDs, I agree that a random generated ID per method makes sense.

@uce
Copy link
Contributor

uce commented Nov 18, 2014

Very cool! I think this is a great addition and we will improve on the performance soon :)

@zentol, as @StephanEwen said it makes sense to execute right away. You can think of this as "actions" from Spark, which get results back to the user. He can't use a map function, because he doesn't emit any records, but just adds them to the accumulator.

I've tried this out locally and it works, but you have to add the random IDs as you run into problem with multiple actions in the same program otherwise.

With this change, we also should allow to have prorams without sinks. Currently, you get a warning if you only have "actions" like collect/count without a sink.

Before we can merge this, you should also add test cases for both operators and make sure that it works with multiple actions per program.

@zentol
Copy link
Contributor

zentol commented Nov 18, 2014

hmm...alright, i can see the point.

Doesn't executing right away carries the risk of it being inefficient when using them multiple times though? Since it effectively means executing multiple jobs within the same program (i think ... ), any common part of the jobs are done an extra time. (if I'm wrong here skip the rest)

example:

List l1 = A.map(X).map(Y).collect();
List l2 = A.map(X).map(Z).collect();

<some user code using l1 & l2>

this would result in 2 jobs being executed, with map(X) being executed twice. whereas

B = A.map(X);
B.map(Y).collect("c1");
B.map(Z).collect("c2");

JobExecutionResult jre = env.execute()
List l1 = jre.getAccumulatorResult("c1");
List l2 = jre.getAccumulatorResult("c2);

<some user code using l1 & l2>

would only be 1 job, with map(X) done only once. it is not as pretty (by a fair margin i admit), but in line with the current API.

@mxm
Copy link
Contributor Author

mxm commented Nov 18, 2014

I changed the code to generate a unique identifier for each call of count or collect.

@zentol Executing in a lazy fashion could be implemented but would require additional changes to the API. The methods should then return something like a LocalDataSet which lets the user retrieve the accumulator result once the job has been executed.

@zentol
Copy link
Contributor

zentol commented Nov 18, 2014

why additional changes? you can already retrieve the accumulators by using the JobExecutionResult.

the only necessary change i see is removing the execute() within collect() and count(), and now that you generate the ID's yourself returning the ID, so the accumulators can be accessed later.

B = A.map(X);
String id1 = B.map(Y).collect();
String id2 = B.map(Z).collect();

JobExecutionResult jer = env.execute()
List l1 = jer.getAccumulatorResult(id1);
List l2 = jer.getAccumulatorResult(id2);

<some user code using l1 & l2>

why would this not be sufficient?

public void write(DataOutputView out) throws IOException {
ObjectOutputStream outStream = new ObjectOutputStream(new DataOutputViewStream(out));
outStream.writeObject(typeSerializer);
outStream.flush();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let us change this to close()

@StephanEwen
Copy link
Contributor

With the scheduler and intermediate data set enhancements coming up for 0.9 soon, this is now quite feasible to use. I suggest to merge it once the inline comments are addressed.

@mxm
Copy link
Contributor Author

mxm commented Jan 19, 2015

I fixed the proposed changes and rebased to the current master. The changes are only reflected in the Java API and need to be added to the Scala API as well.

@mxm
Copy link
Contributor Author

mxm commented Jan 20, 2015

I've implemented count and collect in the Scala API. There is still a problem with the ListAccumulator for non-primitive Objects (e.g. not Integer or Long) probably due to Object reuse.

@mxm
Copy link
Contributor Author

mxm commented Jan 27, 2015

Looks like this is now ready to merge.

@zentol I understand your concern. However, I think that it is much easier to execute in this way. Most of the times, the user probably wants just one accumulator result and not multiple. This is supposed to be a convenience function.

@StephanEwen
Copy link
Contributor

@zentol You are right, for the time being, that this results in parts in repeated execution. While not totally unavoidable in all cases, the code going in soon about caching intermediate results will help there big time.

@mxm
Copy link
Contributor Author

mxm commented Feb 26, 2015

I squashed the commits and rebased to the current master. Any objections against merging this?

@@ -58,6 +58,12 @@ under the License.
<version>0.5.1</version>
</dependency>

<dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are you moving netty to flink-core ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Simply because AbstractID depends on io.netty.buffer.ByteBuf

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Allright, that makes sense

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, only a single convenience method in AbstractID depends on this. I would propose to refactor that method into one of the existing utility classes in flink-runtime and keep the Netty dependency in flink-runtime.

@StephanEwen
Copy link
Contributor

Let us not move netty to flink-core. Let us rather pull that method out of the AbstractID, or, not use AbstractID at in your code, but the Java UUID.

Maximilian Michels and others added 2 commits February 27, 2015 13:04
… DataSet to the client

- this implements two convenience methods on DataSet for the Java and Scala API
- appropriate tests have been added

count(): returns the number of elements in a DataSet
collect(): returns a List<T> with the actual elements of a DataSet<T>

- both methods use accumulators to get the results back to the client
- both methods force an execution of the job to generate the results
@mxm
Copy link
Contributor Author

mxm commented Feb 27, 2015

I moved the writeInto(ByteBuf buf) method to the subclasses of AbstractID used by NettyMessage. This is the only context where the method is being used and the subclasses already have another fromByteBuf(ByteBuf buf) method.

@mxm
Copy link
Contributor Author

mxm commented Feb 27, 2015

If there are no objections, I will merge this once Travis passes.

@asfgit asfgit closed this in 3dc2fe1 Feb 27, 2015
marthavk pushed a commit to marthavk/flink that referenced this pull request Jun 9, 2015
… DataSet to the client

- this implements two convenience methods on DataSet for the Java and Scala API
- appropriate tests have been added

count(): returns the number of elements in a DataSet
collect(): returns a List<T> with the actual elements of a DataSet<T>

- both methods use accumulators to get the results back to the client
- both methods force an execution of the job to generate the results

This closes apache#210
zhijiangW pushed a commit to zhijiangW/flink that referenced this pull request Jul 23, 2019
HuangZhenQiu pushed a commit to HuangZhenQiu/flink that referenced this pull request Sep 20, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants