Skip to content

Commit

Permalink
Merge pull request JuliaLang#16663 from JuliaLang/amitm/pmap_fixup
Browse files Browse the repository at this point in the history
minor reorganization of pmap code
  • Loading branch information
amitmurthy committed May 31, 2016
2 parents f31723a + 730617a commit fb9f690
Show file tree
Hide file tree
Showing 4 changed files with 77 additions and 69 deletions.
5 changes: 2 additions & 3 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,8 @@ Breaking changes
in a `MethodError`. ([#6190])

* `pmap` keyword arguments `err_retry=true` and `err_stop=false` are deprecated.
`pmap` no longer retries or returns `Exception` objects in the result collection.
`pmap(retry(f), c)` or `pmap(@catch(f), c)` can be used instead.
([#15409](https://github.com/JuliaLang/julia/pull/15409#discussion_r57494701)).
Action to be taken on errors can be specified via the `on_error` keyword argument.
Retry is specified via `retry_n`, `retry_on` and `retry_max_delay`.

* `reshape` is now defined to always share data with the original array.
If a reshaped copy is needed, use `copy(reshape(a))` or `copy!` to a new array of
Expand Down
8 changes: 5 additions & 3 deletions base/error.jl
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,13 @@ macro assert(ex, msgs...)
:($(esc(ex)) ? $(nothing) : throw(Main.Base.AssertionError($msg)))
end

# NOTE: Please keep the constant values specified below in sync with the doc string
const DEFAULT_RETRY_N = 1
const DEFAULT_RETRY_ON = e->true
const DEFAULT_RETRY_MAX_DELAY = 10.0

"""
retry(f, [retry_on]; n=DEFAULT_RETRY_N, max_delay=DEFAULT_RETRY_MAX_DELAY) -> Function
retry(f, [retry_on]; n=1, max_delay=10.0) -> Function
Returns a lambda that retries function `f` up to `n` times in the
event of an exception. If `retry_on` is a `Type` then retry only
Expand Down Expand Up @@ -83,8 +84,9 @@ function retry(f::Function, retry_on::Function=DEFAULT_RETRY_ON; n=DEFAULT_RETRY
rethrow(e)
end
end
sleep(delay)
delay = min(max_delay, delay * (0.8 + (rand() * 0.4)) * 5)
delay = min(max_delay, delay)
sleep(delay * (0.8 + (rand() * 0.2)))
delay = delay * 5
end
end
end
Expand Down
131 changes: 69 additions & 62 deletions base/pmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ type BatchProcessingError <: Exception
end

"""
pgenerate([::WorkerPool], f, c...) -> (iterator, process_batch_errors)
pgenerate([::WorkerPool], f, c...) -> iterator
Apply `f` to each element of `c` in parallel using available workers and tasks.
Expand All @@ -19,10 +19,62 @@ Note that `f` must be made available to all worker processes; see
and Loading Packages <man-parallel-computing-code-availability>`)
for details.
"""
function pgenerate(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error=nothing,
retry_n=0,
retry_max_delay=DEFAULT_RETRY_MAX_DELAY,
retry_on=DEFAULT_RETRY_ON)
function pgenerate(p::WorkerPool, f, c)
if length(p) == 0
return AsyncGenerator(f, c)
end
batches = batchsplit(c, min_batch_count = length(p) * 3)
return flatten(AsyncGenerator(remote(p, b -> asyncmap(f, b)), batches))
end
pgenerate(p::WorkerPool, f, c1, c...) = pgenerate(p, a->f(a...), zip(c1, c...))
pgenerate(f, c) = pgenerate(default_worker_pool(), f, c)
pgenerate(f, c1, c...) = pgenerate(a->f(a...), zip(c1, c...))

"""
pmap([::WorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection
Transform collection `c` by applying `f` to each element using available
workers and tasks.
For multiple collection arguments, apply f elementwise.
Note that `f` must be made available to all worker processes; see
[Code Availability and Loading Packages](:ref:`Code Availability
and Loading Packages <man-parallel-computing-code-availability>`)
for details.
If a worker pool is not specified, all available workers, i.e., the default worker pool
is used.
By default, `pmap` distributes the computation over all specified workers. To use only the
local process and distribute over tasks, specify `distributed=false`. This is equivalent to `asyncmap`.
`pmap` can also use a mix of processes and tasks via the `batch_size` argument. For batch sizes
greater than 1, the collection is split into multiple batches, which are distributed across
workers. Each such batch is processed in parallel via tasks in each worker. The specified
`batch_size` is an upper limit, the actual size of batches may be smaller and is calculated
depending on the number of workers available and length of the collection.
Any error stops pmap from processing the remainder of the collection. To override this behavior
you can specify an error handling function via argument `on_error` which takes in a single argument, i.e.,
the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value
which is then returned inline with the results to the caller.
Failed computation can also be retried via `retry_on`, `retry_n`, `retry_max_delay`, which are passed through
to `retry` as arguments `retry_on`, `n` and `max_delay` respectively. If batching is specified, and an entire batch fails,
all items in the batch are retried.
The following are equivalent:
* `pmap(f, c; distributed=false)` and `asyncmap(f,c)`
* `pmap(f, c; retry_n=1)` and `asyncmap(retry(remote(f)),c)`
* `pmap(f, c; retry_n=1, on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)`
"""
function pmap(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error=nothing,
retry_n=0,
retry_max_delay=DEFAULT_RETRY_MAX_DELAY,
retry_on=DEFAULT_RETRY_ON)
f_orig = f
# Don't do remote calls if there are no workers.
if (length(p) == 0) || (length(p) == 1 && fetch(p.channel) == myid())
distributed = false
Expand All @@ -45,7 +97,7 @@ function pgenerate(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error
if on_error != nothing
f = wrap_on_error(f, on_error)
end
return (AsyncGenerator(f, c), nothing)
return collect(AsyncGenerator(f, c))
else
batches = batchsplit(c, min_batch_count = length(p) * 3,
max_batch_size = batch_size)
Expand All @@ -56,17 +108,21 @@ function pgenerate(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error
# to ensure that we do not call mapped function on the same element more than retry_n.
# This guarantee is not possible in case of worker death / network errors, wherein
# we will retry the entire batch on a new worker.
f = wrap_on_error(f, (x,e)->BatchProcessingError(x,e); capture_data=true)
if (on_error != nothing) || (retry_n > 0)
f = wrap_on_error(f, (x,e)->BatchProcessingError(x,e); capture_data=true)
end
f = wrap_batch(f, p, on_error)
return (flatten(AsyncGenerator(f, batches)),
(p, f, results)->process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry_max_delay))
results = collect(flatten(AsyncGenerator(f, batches)))
if (on_error != nothing) || (retry_n > 0)
process_batch_errors!(p, f_orig, results, on_error, retry_on, retry_n, retry_max_delay)
end
return results
end
end

pgenerate(p::WorkerPool, f, c1, c...; kwargs...) = pgenerate(p, a->f(a...), zip(c1, c...); kwargs...)

pgenerate(f, c; kwargs...) = pgenerate(default_worker_pool(), f, c...; kwargs...)
pgenerate(f, c1, c...; kwargs...) = pgenerate(a->f(a...), zip(c1, c...); kwargs...)
pmap(p::WorkerPool, f, c1, c...; kwargs...) = pmap(p, a->f(a...), zip(c1, c...); kwargs...)
pmap(f, c; kwargs...) = pmap(default_worker_pool(), f, c; kwargs...)
pmap(f, c1, c...; kwargs...) = pmap(a->f(a...), zip(c1, c...); kwargs...)

function wrap_on_error(f, on_error; capture_data=false)
return x -> begin
Expand Down Expand Up @@ -101,55 +157,6 @@ end

asyncmap_batch(f) = batch -> asyncmap(f, batch)

"""
pmap([::WorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection
Transform collection `c` by applying `f` to each element using available
workers and tasks.
For multiple collection arguments, apply f elementwise.
Note that `f` must be made available to all worker processes; see
[Code Availability and Loading Packages](:ref:`Code Availability
and Loading Packages <man-parallel-computing-code-availability>`)
for details.
If a worker pool is not specified, all available workers, i.e., the default worker pool
is used.
By default, `pmap` distributes the computation over all specified workers. To use only the
local process and distribute over tasks, specify `distributed=false`. This is equivalent to `asyncmap`.
`pmap` can also use a mix of processes and tasks via the `batch_size` argument. For batch sizes
greater than 1, the collection is split into multiple batches, which are distributed across
workers. Each such batch is processed in parallel via tasks in each worker. The specified
`batch_size` is an upper limit, the actual size of batches may be smaller and is calculated
depending on the number of workers available and length of the collection.
Any error stops pmap from processing the remainder of the collection. To override this behavior
you can specify an error handling function via argument `on_error` which takes in a single argument, i.e.,
the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value
which is then returned inline with the results to the caller.
Failed computation can also be retried via `retry_on`, `retry_n`, `retry_max_delay`, which are passed through
to `retry` as arguments `retry_on`, `n` and `max_delay` respectively. If batching is specified, and an entire batch fails,
all items in the batch are retried.
The following are equivalent:
* `pmap(f, c; distributed=false)` and `asyncmap(f,c)`
* `pmap(f, c; retry_n=1)` and `asyncmap(retry(remote(f)),c)`
* `pmap(f, c; retry_n=1, on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)`
"""
function pmap(p::WorkerPool, f, c...; kwargs...)
results_iter, process_errors! = pgenerate(p, f, c...; kwargs...)
results = collect(results_iter)
if isa(process_errors!, Function)
process_errors!(p, f, results)
end
results
end

function process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry_max_delay)
# Handle all the ones in error in another pmap, with batch size set to 1
if (on_error != nothing) || (retry_n > 0)
Expand Down
2 changes: 1 addition & 1 deletion doc/stdlib/base.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,7 @@ Errors
An error occurred when running a module's ``__init__`` function. The actual error thrown is available in the ``.error`` field.

.. function:: retry(f, [retry_on]; n=DEFAULT_RETRY_N, max_delay=DEFAULT_RETRY_MAX_DELAY) -> Function
.. function:: retry(f, [retry_on]; n=1, max_delay=10.0) -> Function

.. Docstring generated from Julia source
Expand Down

0 comments on commit fb9f690

Please sign in to comment.