Skip to content

Commit

Permalink
Configure pmap behavior via keyword args (JuliaLang#15975)
Browse files Browse the repository at this point in the history
* configure pmap via keyword args
  • Loading branch information
amitmurthy committed May 19, 2016
1 parent 68a38d2 commit 55e3a39
Show file tree
Hide file tree
Showing 10 changed files with 323 additions and 168 deletions.
20 changes: 11 additions & 9 deletions base/deprecated.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1011,29 +1011,31 @@ export call
# and added to pmap.jl
# pmap(f, c...) = pmap(default_worker_pool(), f, c...)

function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing)
function pmap(f, c...; err_retry=nothing, err_stop=nothing, pids=nothing, kwargs...)
kwargs = Dict{Symbol, Any}(kwargs)

if err_retry != nothing
depwarn("err_retry is deprecated, use pmap(retry(f), c...).", :pmap)
if err_retry == true
f = retry(f)
end
end

if err_stop != nothing
depwarn("err_stop is deprecated, use pmap(@catch(f), c...).", :pmap)
if err_stop == false
f = @catch(f)
end
end

if pids == nothing
p = default_worker_pool()
else
depwarn("pids is deprecated, use pmap(::WorkerPool, f, c...).", :pmap)
p = WorkerPool(pids)
end

return pmap(p, f, c...)
if err_stop != nothing
depwarn("err_stop is deprecated, use pmap(f, c...; on_error = error_handling_func).", :pmap)
if err_stop == false
kwargs[:on_error] = e->e
end
end

pmap(p, f, c...; kwargs...)
end

# 15692
Expand Down
52 changes: 17 additions & 35 deletions base/error.jl
Original file line number Diff line number Diff line change
Expand Up @@ -50,61 +50,43 @@ macro assert(ex, msgs...)
:($(esc(ex)) ? $(nothing) : throw(Main.Base.AssertionError($msg)))
end

const DEFAULT_RETRY_N = 1
const DEFAULT_RETRY_ON = e->true
const DEFAULT_RETRY_MAX_DELAY = 10.0

"""
retry(f, [condition]; n=3; max_delay=10) -> Function
retry(f, [retry_on]; n=DEFAULT_RETRY_N, max_delay=DEFAULT_RETRY_MAX_DELAY) -> Function
Returns a lambda that retries function `f` up to `n` times in the
event of an exception. If `condition` is a `Type` then retry only
for exceptions of that type. If `condition` is a function
`cond(::Exception) -> Bool` then retry only if it is true.
event of an exception. If `retry_on` is a `Type` then retry only
for exceptions of that type. If `retry_on` is a function
`test_error(::Exception) -> Bool` then retry only if it is true.
The first retry happens after a gap of 50 milliseconds or `max_delay`,
whichever is lower. Subsequently, the delays between retries are
exponentially increased with a random factor upto `max_delay`.
**Examples**
```julia
retry(http_get, e -> e.status == "503")(url)
retry(read, UVError)(io)
```
"""
function retry(f::Function, condition::Function=e->true;
n::Int=3, max_delay::Int=10)
function retry(f::Function, retry_on::Function=DEFAULT_RETRY_ON; n=DEFAULT_RETRY_N, max_delay=DEFAULT_RETRY_MAX_DELAY)
(args...) -> begin
delay = 0.05
for i = 1:n
delay = min(0.05, max_delay)
for i = 1:n+1
try
return f(args...)
catch e
if i == n || try condition(e) end != true
if i > n || try retry_on(e) end != true
rethrow(e)
end
end
sleep(delay * (0.8 + (rand() * 0.4)))
delay = min(max_delay, delay * 5)
sleep(delay)
delay = min(max_delay, delay * (0.8 + (rand() * 0.4)) * 5)
end
end
end

retry(f::Function, t::Type; kw...) = retry(f, e->isa(e, t); kw...)


"""
@catch(f) -> Function
Returns a lambda that executes `f` and returns either the result of `f` or
an `Exception` thrown by `f`.
**Examples**
```julia
julia> r = @catch(length)([1,2,3])
3
julia> r = @catch(length)()
MethodError(length,())
julia> typeof(r)
MethodError
```
"""
catchf(f) = (args...) -> try f(args...) catch ex; ex end
macro catch(f)
esc(:(Base.catchf($f)))
end
1 change: 0 additions & 1 deletion base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1036,7 +1036,6 @@ export
# errors
assert,
backtrace,
@catch,
catch_backtrace,
error,
rethrow,
Expand Down
168 changes: 152 additions & 16 deletions base/pmap.jl
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
# This file is a part of Julia. License is MIT: http:https://julialang.org/license

type BatchProcessingError <: Exception
data
ex
end

"""
pgenerate([::WorkerPool], f, c...) -> iterator
pgenerate([::WorkerPool], f, c...) -> (iterator, process_batch_errors)
Apply `f` to each element of `c` in parallel using available workers and tasks.
Expand All @@ -15,38 +19,170 @@ Note that `f` must be made available to all worker processes; see
and Loading Packages <man-parallel-computing-code-availability>`)
for details.
"""
function pgenerate(p::WorkerPool, f, c)
if length(p) == 0
return AsyncGenerator(f, c)
function pgenerate(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error=nothing,
retry_n=0,
retry_max_delay=DEFAULT_RETRY_MAX_DELAY,
retry_on=DEFAULT_RETRY_ON)
# Don't do remote calls if there are no workers.
if (length(p) == 0) || (length(p) == 1 && fetch(p.channel) == myid())
distributed = false
end

# Don't do batching if not doing remote calls.
if !distributed
batch_size = 1
end

# If not batching, do simple remote call.
if batch_size == 1
if distributed
f = remote(p, f)
end

if retry_n > 0
f = wrap_retry(f, retry_on, retry_n, retry_max_delay)
end
if on_error != nothing
f = wrap_on_error(f, on_error)
end
return (AsyncGenerator(f, c), nothing)
else
batches = batchsplit(c, min_batch_count = length(p) * 3,
max_batch_size = batch_size)

# During batch processing, We need to ensure that if on_error is set, it is called
# for each element in error, and that we return as many elements as the original list.
# retry, if set, has to be called element wise and we will do a best-effort
# to ensure that we do not call mapped function on the same element more than retry_n.
# This guarantee is not possible in case of worker death / network errors, wherein
# we will retry the entire batch on a new worker.
f = wrap_on_error(f, (x,e)->BatchProcessingError(x,e); capture_data=true)
f = wrap_batch(f, p, on_error)
return (flatten(AsyncGenerator(f, batches)),
(p, f, results)->process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry_max_delay))
end
batches = batchsplit(c, min_batch_count = length(p) * 3)
return flatten(AsyncGenerator(remote(p, b -> asyncmap(f, b)), batches))
end

pgenerate(p::WorkerPool, f, c1, c...) = pgenerate(p, a->f(a...), zip(c1, c...))
pgenerate(p::WorkerPool, f, c1, c...; kwargs...) = pgenerate(p, a->f(a...), zip(c1, c...); kwargs...)

pgenerate(f, c; kwargs...) = pgenerate(default_worker_pool(), f, c...; kwargs...)
pgenerate(f, c1, c...; kwargs...) = pgenerate(a->f(a...), zip(c1, c...); kwargs...)

function wrap_on_error(f, on_error; capture_data=false)
return x -> begin
try
f(x)
catch e
if capture_data
on_error(x, e)
else
on_error(e)
end
end
end
end

pgenerate(f, c) = pgenerate(default_worker_pool(), f, c...)
pgenerate(f, c1, c...) = pgenerate(a->f(a...), zip(c1, c...))
wrap_retry(f, retry_on, n, max_delay) = retry(f, retry_on; n=n, max_delay=max_delay)

function wrap_batch(f, p, on_error)
f = asyncmap_batch(f)
return batch -> begin
try
remotecall_fetch(f, p, batch)
catch e
if on_error != nothing
return Any[BatchProcessingError(batch[i], e) for i in 1:length(batch)]
else
rethrow(e)
end
end
end
end

asyncmap_batch(f) = batch -> asyncmap(f, batch)

"""
pmap([::WorkerPool], f, c...) -> collection
pmap([::WorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection
Transform collection `c` by applying `f` to each element using available
workers and tasks.
For multiple collection arguments, apply f elementwise.
Note that `err_retry=true` and `err_stop=false` are deprecated,
use `pmap(retry(f), c)` or `pmap(@catch(f), c)` instead
(or to retry on a different worker, use `asyncmap(retry(remote(f)), c)`).
Note that `f` must be made available to all worker processes; see
[Code Availability and Loading Packages](:ref:`Code Availability
and Loading Packages <man-parallel-computing-code-availability>`)
for details.
If a worker pool is not specified, all available workers, i.e., the default worker pool
is used.
By default, `pmap` distributes the computation over all specified workers. To use only the
local process and distribute over tasks, specify `distributed=false`. This is equivalent to `asyncmap`.
`pmap` can also use a mix of processes and tasks via the `batch_size` argument. For batch sizes
greater than 1, the collection is split into multiple batches, which are distributed across
workers. Each such batch is processed in parallel via tasks in each worker. The specified
`batch_size` is an upper limit, the actual size of batches may be smaller and is calculated
depending on the number of workers available and length of the collection.
Any error stops pmap from processing the remainder of the collection. To override this behavior
you can specify an error handling function via argument `on_error` which takes in a single argument, i.e.,
the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value
which is then returned inline with the results to the caller.
Failed computation can also be retried via `retry_on`, `retry_n`, `retry_max_delay`, which are passed through
to `retry` as arguments `retry_on`, `n` and `max_delay` respectively. If batching is specified, and an entire batch fails,
all items in the batch are retried.
The following are equivalent:
* `pmap(f, c; distributed=false)` and `asyncmap(f,c)`
* `pmap(f, c; retry_n=1)` and `asyncmap(retry(remote(f)),c)`
* `pmap(f, c; retry_n=1, on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)`
"""
pmap(p::WorkerPool, f, c...) = collect(pgenerate(p, f, c...))
function pmap(p::WorkerPool, f, c...; kwargs...)
results_iter, process_errors! = pgenerate(p, f, c...; kwargs...)
results = collect(results_iter)
if isa(process_errors!, Function)
process_errors!(p, f, results)
end
results
end

function process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry_max_delay)
# Handle all the ones in error in another pmap, with batch size set to 1
if (on_error != nothing) || (retry_n > 0)
reprocess = []
for (idx, v) in enumerate(results)
if isa(v, BatchProcessingError)
push!(reprocess, (idx,v))
end
end

if length(reprocess) > 0
errors = [x[2] for x in reprocess]
exceptions = [x.ex for x in errors]
if (retry_n > 0) && all([retry_on(ex) for ex in exceptions])
retry_n = retry_n - 1
error_processed = pmap(p, f, [x.data for x in errors];
on_error=on_error,
retry_on=retry_on,
retry_n=retry_n,
retry_max_delay=retry_max_delay)
elseif on_error != nothing
error_processed = map(on_error, exceptions)
else
throw(CompositeException(exceptions))
end

for (idx, v) in enumerate(error_processed)
results[reprocess[idx][1]] = v
end
end
end
nothing
end


"""
Expand All @@ -72,7 +208,7 @@ function batchsplit(c; min_batch_count=1, max_batch_size=100)
# If there are not enough batches, use a smaller batch size
if length(head) < min_batch_count
batch_size = max(1, div(sum(length, head), min_batch_count))
return partition(flatten(head), batch_size)
return partition(collect(flatten(head)), batch_size)
end

return flatten((head, tail))
Expand Down
1 change: 1 addition & 0 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ showerror(io::IO, ce::CapturedException) = showerror(io, ce.ex, ce.processed_bt,
type CompositeException <: Exception
exceptions::Vector{Any}
CompositeException() = new(Any[])
CompositeException(exceptions) = new(exceptions)
end
length(c::CompositeException) = length(c.exceptions)
push!(c::CompositeException, ex) = push!(c.exceptions, ex)
Expand Down
25 changes: 4 additions & 21 deletions doc/stdlib/base.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1220,11 +1220,13 @@ Errors
An error occurred when running a module's ``__init__`` function. The actual error thrown is available in the ``.error`` field.

.. function:: retry(f, [condition]; n=3; max_delay=10) -> Function
.. function:: retry(f, [retry_on]; n=DEFAULT_RETRY_N, max_delay=DEFAULT_RETRY_MAX_DELAY) -> Function

.. Docstring generated from Julia source
Returns a lambda that retries function ``f`` up to ``n`` times in the event of an exception. If ``condition`` is a ``Type`` then retry only for exceptions of that type. If ``condition`` is a function ``cond(::Exception) -> Bool`` then retry only if it is true.
Returns a lambda that retries function ``f`` up to ``n`` times in the event of an exception. If ``retry_on`` is a ``Type`` then retry only for exceptions of that type. If ``retry_on`` is a function ``test_error(::Exception) -> Bool`` then retry only if it is true.

The first retry happens after a gap of 50 milliseconds or ``max_delay``\ , whichever is lower. Subsequently, the delays between retries are exponentially increased with a random factor upto ``max_delay``\ .

**Examples**

Expand All @@ -1233,25 +1235,6 @@ Errors
retry(http_get, e -> e.status == "503")(url)
retry(read, UVError)(io)
.. function:: @catch(f) -> Function

.. Docstring generated from Julia source
Returns a lambda that executes ``f`` and returns either the result of ``f`` or an ``Exception`` thrown by ``f``\ .

**Examples**

.. code-block:: julia
julia> r = @catch(length)([1,2,3])
3
julia> r = @catch(length)()
MethodError(length,())
julia> typeof(r)
MethodError
Events
------

Expand Down
Loading

0 comments on commit 55e3a39

Please sign in to comment.