Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@catch, retry, partition, asyncmap and pmap #15409

Merged
merged 2 commits into from
Apr 14, 2016

Conversation

samoconnor
Copy link
Contributor

Third step in Simplifying and generalising pmap (as per #14843).

See also #15058 StreamMapIterator (was asyncmap) and #15073 WorkerPool and remote

@catch

@catch takes a function and returns a lambda that catches any exceptions thrown by the function and returns the exception object.

@test map(typeof, map(@catch(i->[1,2,3][i]), 1:6)) ==
      [Int, Int, Int, BoundsError, BoundsError, BoundsError]

retry

retry takes a function and returns a lambda that retries the function 3 times if an error occurs

#pmap(f, v; err_retry=true)
pmap(retry(f), v)

retry(http_get, e->e.status == "503")(url)

retry(read, UVError)(io)

retry(read, UVError, n=10)(io) # retry 10 times...

partition(collection, n)

partition(collection, n) returns an n elements at a time iterator over collection.

julia> collect(partition([1,2,3,4,5], 2))
3-element Array{Array{Int64,1},1}:
 [1,2]
 [3,4]
 [5]

batchsplit(c; min_batch_count=1, max_batch_size=100)

Split a collection into at least min_batch_count batches.

Equivalent to partition(c, max_batch_size) when length(c) >> max_batch_size.

asyncgenerate and asyncmap

Generate or map using parallel tasks.

asyncgenerate(f, c) = StreamGenerator(f, c)
asyncmap(f, c) = collect(asyncgenerate(f, c))

pgenerate and pmap

Generate or map using parallel workers and tasks.

function pgenerate(f, c)
    batches = batchsplit(c, min_batch_count = nworkers() * 3)
    return flatten(asyncgenerate(remote(b -> asyncmap(f, b)), batches))
end

pmap(f, c) = collect(pgenerate(f, c))

@@ -0,0 +1,72 @@
# This file is a part of Julia. License is MIT: http:https://julialang.org/license
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this needs to be added to test/choosetests.jl to actually run

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

true that 🙄

@samoconnor
Copy link
Contributor Author

Note: this commit adds a small default delay to retry with exponential back-off and some random jitter.

I'm happy to back this out if it's seen as adding too much at once.

However, it seems to me that it is probably a good thing to have a small delay by default when retrying something that caused an exception (or at the very least to yield()). retry is only useful for non-deterministic calls. It seems reasonable to assume that the primary source of non-determinism is likely to be an external or remote file or process or network or database. In all these cases exponential back-off is usually a good thing for overall system performance.

@samoconnor samoconnor force-pushed the retry_branch branch 3 times, most recently from acbe45e to 91bde90 Compare March 9, 2016 05:02
@samoconnor
Copy link
Contributor Author

rebased to ec6f886

@samoconnor
Copy link
Contributor Author

tests now ok.
default constructor for WorkerPool added per #15073 (comment)

@samoconnor samoconnor changed the title @catch and retry @catch, retry and collect(n, c) Mar 11, 2016
@tkelman
Copy link
Contributor

tkelman commented Mar 11, 2016

collect(n, collection) returning an iterator doesn't mesh very well with all other methods of collect. That should possibly go by a different name.

@samoconnor
Copy link
Contributor Author

@JeffBezanson any suggestions on what this thing should be called if not collect : samoconnor@0483198#diff-83c91f45204e96e58535f5491cca8cd8R347

eachchunk ?
eachn ?

@@ -3,6 +3,9 @@

type WorkerPool
channel::RemoteChannel{Channel{Int}}

# Create a shared queue of available workers...
WorkerPool() = new(RemoteChannel(()->Channel{Int}(128)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be changed to RemoteChannel(()->Channel(typemax(Int))), else WorkerPool(workers::Vector{Int}) will block when trying to add more than 128 workers.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will do

@amitmurthy
Copy link
Contributor

split? On strings it returns the split strings. But since an iterator is being returned here, maybe just chunk?

@samoconnor
Copy link
Contributor Author

There is some precedence for things that return an iterator having an each prefix.

Then there are a bunch of things that have no hint of iterator-ness in their names:
enumerate, zip, filter, cycle, rest, countfrom, take, drop, repeated, keys, values, graphemes

split would be consistent with enumerate, cycle, filter and zip
chunks would be consistent with graphemes, keys and values...

batch ?, partition ?, slice ?, divide ?, subdivide ?, segment ?, eachblock ?, blocks ?

@samoconnor samoconnor changed the title @catch, retry and collect(n, c) @catch, retry and split(c, n) Mar 12, 2016
@samoconnor
Copy link
Contributor Author

On reflection, @amitmurthy's suggestion of split(c, n) is good. Changed.

It would have been very handy for this JuliaCloud/AWSCore.jl@9330a7b#diff-c33c63b7ba6d9fce289488d1cf3dfcaeR64 just yesterday.

@nalimilan
Copy link
Member

Isn't collect what Iterators.jl calls partition? There's precedent of moving functions from there into Base.

@samoconnor
Copy link
Contributor Author

Isn't collect what Iterators.jl calls partition?

Thanks for pointing that out @nalimilan, its good to know about Iterators.jl.
However, while partition is similar, it isn't equivelant to collect(n, c) split(c, n)
Partition always produces Tuples and seems to ignore the tail of the collection...

julia> collect(partition("Hello World!", 5))
2-element Array{Tuple{Char,Char,Char,Char,Char},1}:
 ('H','e','l','l','o')
 (' ','W','o','r','l')

julia> collect(split("Hello World!", 5))
3-element Array{ASCIIString,1}:
 "Hello"
 " Worl"
 "d!"

julia> collect(partition([1,2,3,4,5], 3))
1-element Array{Tuple{Int64,Int64,Int64},1}:
 (1,2,3)

julia> collect(split([1,2,3,4,5], 3))
2-element Array{Array{Int64,1},1}:
 [1,2,3]
 [4,5]

@amitmurthy
Copy link
Contributor

Sorry for the delay reviewing this.

@samoconnor , I am thinking it will be easier to just have the complete new pmap in this PR with minimal exports. Separate smaller PRs is usually better, but here it appears that we are losing out on the bigger context of changes being done to generalize pmap and the need to add new exports if any.

Avoiding new exports makes the PR less controversial since we are not adding something that needs to be supported over the long term.

I'll push a fix separately for the WorkerPool and mapiterator corrections (since they do not really belong here).

@amitmurthy
Copy link
Contributor

I'll push a fix separately for the WorkerPool and mapiterator corrections (since they do not really belong here).

Unless you would rather submit a PR for these corrections. I should have asked.

@nalimilan
Copy link
Member

@samoconnor Looks like you should add a partial::Bool=false argument to partition then. As regards tuples vs. arrays, we cannot duplicate all functions for which this choice exists, so we have to find a solution.

@samoconnor
Copy link
Contributor Author

@amitmurthy, please go ahead and make the WorkerPool and mapiterator corrections on master. Thanks. I'll rebase this PR when that's done.

I'll also add pmap to this PR as you suggest.

My current intention is for pmap to do something like this by default:

asyncmap(f, c...) = collect(StreamMapIterator(f, c...))

pmap(f, c...) = vcat(asyncmap(remote(chunk->asyncmap(a->f(a...), chunk), split(zip(c), 100))

i.e. run batches of 100 (or whatever a good default is) async maps on each worker.
For IO bound code, the async-ness on each worker is is a big win.
For compute-bound code, the overhead is probably minimal. I assume that if the mapped function never yields then the async map is almost the same as normal map.
Of course this can be made tuneable with kw args.

@samoconnor samoconnor changed the title @catch, retry and split(c, n) WIP: @catch, retry and split(c, n) [ci skip] Mar 22, 2016
@samoconnor
Copy link
Contributor Author

@amitmurthy

I've just committed a WIP pmap() implementation.
It's not really ready for review, needs doc, tests etc, but it might be useful for you to see the direction I'm going in.

I've also included flatten from #14805

@amitmurthy
Copy link
Contributor

Thanks. Could you squash the commits? Will merge in a day or two if there are no major concerns.

@samoconnor
Copy link
Contributor Author

squashed

end

function next(itr::AsyncGenerator, state::AsyncGeneratorState)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think a line break is in order here, even if we allowed the exception of blocks starting with a comment.

@amitmurthy amitmurthy merged commit cf52a34 into JuliaLang:master Apr 14, 2016
@amitmurthy
Copy link
Contributor

Thanks @samoconnor for this, and everyone for reviewing it. I'll submit a PR for the remaining cosmetic changes discussed here.

@samoconnor
Copy link
Contributor Author

Thanks Amit

@JeffBezanson
Copy link
Sponsor Member

Looking at #15661 made me notice the @catch macro. We might not want macros with the same names as keywords. Happily, I also notice it doesn't really need to be a macro, since it just expands to a function call on the same argument. I think this functional style, where possible, is far preferable to macros. Can we just have the function and not the macro? I would maybe call it caught(f).

@samoconnor
Copy link
Contributor Author

samoconnor commented Apr 16, 2016

@JeffBezanson I though that someone might notice that @catch is not really a macro.
I was a little surprised that none of the reviewers mentioned it given that the review was thorough enough to get all the way down to discussing positioning of whitespace :)

I chose @catch because:

  • I really wanted catch(f) = ..., but catch is a keyword.
  • I thought that the @ doesn't hurt, because it conveys a bit of "something not quite normal might happen here". i.e. the return value might be an ordinary value, or it might be an Exception.
  • I couldn't think of an alternative name that retained a strong association with the metaphor of throwing and catching.

I'm not sure that caught is quite right. The thing that is caught is the exception. The function does not return an exception, it returns a function. The returned function does not always return an exception either, it mostly returns an ordinary value. The name catch seems logical because the returned function does something close to what a catch block does (sometimes nothing, sometimes catching).

My preference would be to remove @catch completely.
The only reason for including it was as a way to extract the old exception-as-value feature from pmap. e.g. pmap(f, c, error_stop = false) is replaced by pmap(@catch(f), c).
However, the user could just be asked to write pmap(x->try f(c) catch ex end, c) instead.

Also, why not make try f(x) catch end return the exception? (it currently returns Void).

Then you would just have pmap(x->try f(x) catch end, c).

... or even try f(x) end could return the exception.

Note: Midori has v = try Foo() else catch.

@nalimilan
Copy link
Member

You could call it trycatch, but I agree we could as well do without it.

@tkelman
Copy link
Contributor

tkelman commented Apr 17, 2016

Could also call it except(f) if it needs to have a name. Making try-catch return the exception by default feels like a pretty arbitrary thing to do.

@amitmurthy
Copy link
Contributor

+1 for except(f)

@nalimilan
Copy link
Member

What is "except" supposed to mean? "exception" or "except"?

@JeffBezanson
Copy link
Sponsor Member

Taking this in a slightly different direction, it makes me nervous to have an API that returns either a value or an exception that was thrown, without leaving any indication of which it was.

@tkelman
Copy link
Contributor

tkelman commented Apr 19, 2016

Something more Nullable-like, a la ExceptionOr would be easier to make type stable and less ambiguous if the non error branch happens to want to return an exception object.

@amitmurthy
Copy link
Contributor

This is useful in situations where the programmer is OK with a certain amount of errors. For example, a search engine, implementing a distributed map-reduce may be OK displaying the results (instead of an error) if the number of failed calls were less than a certain threshold. Or you would want to a really long running pmap to complete and just log any exceptions. Again, you may throw an error only if the number of exceptions crossed a certain threshold.

As another example, there is an open issue for always completing the entire CI test suite even when we encounter a failure.

This is a convenience wrapper for a functionality that users can always code into the mapping function.

@tkelman
Copy link
Contributor

tkelman commented Apr 20, 2016

These cases probably shouldn't be returning exceptions inline among the results though. Maybe a NullableArray for results with an auxiliary Any-array to hold any exceptions that occur would have more predictable behavior.

@amitmurthy
Copy link
Contributor

The exceptions array will have to be the same length as the NullableArray - in order to maintain the same indexes as the jobs array and easily track failed jobs - or we return an array of tuples (idx, excep). How would one enable this since It is preferable to have the default behavior same as map? I am OK with bringing back a keyword arg for this functionality, say catch_exceptions=true, which would enable this behavior. Default is false.

Only if catch_exceptions=true, we could either return exceptions inline or return a tuple (results, exceptions) where results is a Nullable Array and exceptions an array of (idx, excep).

@JeffBezanson
Copy link
Sponsor Member

Yes, while you can test isa(x, Exception), this does not actually determine whether an exception happened. However I suppose we could require this by convention.

@samoconnor
Copy link
Contributor Author

I don't think there is a compelling reason to have re-introduce catch_exceptions=true to pmap.

If doing pmap(x -> try f(x) catch ex; ex end, c) and isa(x, Exception) is not enough for some use-case, then something like this could be done (and be useful in places other than pmap):

witherr(f) = (args...) -> try (f(args...), nothing) catch ex; (nothing, ex) end

julia> v, err = witherr(string)(7)
("7",nothing)

julia> v, err = witherr(error)(7)
(nothing,ErrorException("7"))

pmap(witherr(f), c)

@amitmurthy
Copy link
Contributor

The scenarios which I described above requires pmap to be robust against network errors or workers dying too. I'll put a PR together and we can continue discussion on the interface there.

@amitmurthy
Copy link
Contributor

amitmurthy commented Apr 21, 2016

See #15975

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants