Skip to content

Commit

Permalink
Introduce CachingPool
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jun 26, 2016
1 parent d19e6f7 commit 96633b7
Show file tree
Hide file tree
Showing 8 changed files with 174 additions and 32 deletions.
2 changes: 2 additions & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1184,6 +1184,8 @@ export
# multiprocessing
addprocs,
asyncmap,
CachingPool,
clear!,
ClusterManager,
default_worker_pool,
fetch,
Expand Down
2 changes: 1 addition & 1 deletion base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1157,7 +1157,7 @@ function handle_msg(msg::JoinCompleteMsg, r_stream, w_stream, version)
ntfy_channel = lookup_ref(msg.notify_oid)
put!(ntfy_channel, w.id)

put!(default_worker_pool(), w)
push!(default_worker_pool(), w)
end

function disable_threaded_libs()
Expand Down
8 changes: 5 additions & 3 deletions base/pmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ pgenerate(f, c) = pgenerate(default_worker_pool(), f, c)
pgenerate(f, c1, c...) = pgenerate(a->f(a...), zip(c1, c...))

"""
pmap([::WorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection
pmap([::AbstractWorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection
Transform collection `c` by applying `f` to each element using available
workers and tasks.
Expand Down Expand Up @@ -70,7 +70,7 @@ The following are equivalent:
* `pmap(f, c; retry_n=1)` and `asyncmap(retry(remote(f)),c)`
* `pmap(f, c; retry_n=1, on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)`
"""
function pmap(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error=nothing,
function pmap(p::AbstractWorkerPool, f, c; distributed=true, batch_size=1, on_error=nothing,
retry_n=0,
retry_max_delay=DEFAULT_RETRY_MAX_DELAY,
retry_on=DEFAULT_RETRY_ON)
Expand All @@ -97,6 +97,7 @@ function pmap(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error=not
if on_error !== nothing
f = wrap_on_error(f, on_error)
end

return collect(AsyncGenerator(f, c))
else
batches = batchsplit(c, min_batch_count = length(p) * 3,
Expand All @@ -116,11 +117,12 @@ function pmap(p::WorkerPool, f, c; distributed=true, batch_size=1, on_error=not
if (on_error !== nothing) || (retry_n > 0)
process_batch_errors!(p, f_orig, results, on_error, retry_on, retry_n, retry_max_delay)
end

return results
end
end

pmap(p::WorkerPool, f, c1, c...; kwargs...) = pmap(p, a->f(a...), zip(c1, c...); kwargs...)
pmap(p::AbstractWorkerPool, f, c1, c...; kwargs...) = pmap(p, a->f(a...), zip(c1, c...); kwargs...)
pmap(f, c; kwargs...) = pmap(default_worker_pool(), f, c; kwargs...)
pmap(f, c1, c...; kwargs...) = pmap(a->f(a...), zip(c1, c...); kwargs...)

Expand Down
150 changes: 126 additions & 24 deletions base/workerpool.jl
Original file line number Diff line number Diff line change
@@ -1,11 +1,26 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

type WorkerPool
abstract AbstractWorkerPool

# An AbstractWorkerPool should implement
#
# `push!` - add a new worker to the overall pool (available + busy)
# `put!` - put back a worker to the available pool
# `take!` - take a worker from the available pool (to be used for remote function execution)
# `length` - number of workers available in the overall pool
# `isready` - return false if a `take!` on the pool would block, else true
#
# The default implementations of the above (on a AbstractWorkerPool) require fields
# channel::RemoteChannel{Channel{Int}}
# workers::Set{Int}
#

type WorkerPool <: AbstractWorkerPool
channel::RemoteChannel{Channel{Int}}
count::Int
workers::Set{Int}

# Create a shared queue of available workers
WorkerPool() = new(RemoteChannel(()->Channel{Int}(typemax(Int))), 0)
WorkerPool() = new(RemoteChannel(()->Channel{Int}(typemax(Int))), Set{Int}())
end


Expand All @@ -19,25 +34,26 @@ function WorkerPool(workers::Vector{Int})

# Add workers to the pool
for w in workers
put!(pool, w)
push!(pool, w)
end

return pool
end

push!(pool::AbstractWorkerPool, w::Int) = (push!(pool.workers, w); put!(pool.channel, w); pool)
push!(pool::AbstractWorkerPool, w::Worker) = push!(pool, w.id)
length(pool::AbstractWorkerPool) = length(pool.workers)
isready(pool::AbstractWorkerPool) = isready(pool.channel)

put!(pool::WorkerPool, w::Int) = (pool.count += 1; put!(pool.channel, w))
put!(pool::WorkerPool, w::Worker) = put!(pool, w.id)
put!(pool::AbstractWorkerPool, w::Int) = (put!(pool.channel, w); pool)

length(pool::WorkerPool) = pool.count
workers(pool::AbstractWorkerPool) = collect(pool.workers)

isready(pool::WorkerPool) = isready(pool.channel)

function remotecall_pool(rc_f, f, pool::WorkerPool, args...; kwargs...)
function take!(pool::AbstractWorkerPool)
# Find an active worker
worker = 0
while true
if pool.count == 0
if length(pool) == 0
if pool === default_worker_pool()
# No workers, the master process is used as a worker
worker = 1
Expand All @@ -51,42 +67,47 @@ function remotecall_pool(rc_f, f, pool::WorkerPool, args...; kwargs...)
if worker in procs()
break
else
pool.count = pool.count - 1
delete!(pool.workers, worker) # Remove invalid worker from pool
end
end
return worker
end

function remotecall_pool(rc_f, f, pool::AbstractWorkerPool, args...; kwargs...)
worker = take!(pool)
try
rc_f(f, worker, args...; kwargs...)
finally
if worker != 1
put!(pool.channel, worker)
end
# In case of default_worker_pool, the master is implictly considered a worker
# till the time new workers are added, and it is not added back to the available pool.
# However, it is perfectly valid for other pools to `push!` any worker (including 1)
# to the pool. Confirm the same before making a worker available.
worker in pool.workers && put!(pool, worker)
end
end


"""
remotecall(f, pool::WorkerPool, args...; kwargs...)
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...)
Call `f(args...; kwargs...)` on one of the workers in `pool`. Returns a `Future`.
"""
remotecall(f, pool::WorkerPool, args...; kwargs...) = remotecall_pool(remotecall, f, pool, args...; kwargs...)
remotecall(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remotecall, f, pool, args...; kwargs...)


"""
remotecall_wait(f, pool::WorkerPool, args...; kwargs...)
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...)
Call `f(args...; kwargs...)` on one of the workers in `pool`. Waits for completion, returns a `Future`.
"""
remotecall_wait(f, pool::WorkerPool, args...; kwargs...) = remotecall_pool(remotecall_wait, f, pool, args...; kwargs...)
remotecall_wait(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remotecall_wait, f, pool, args...; kwargs...)


"""
remotecall_fetch(f, pool::WorkerPool, args...; kwargs...)
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...)
Call `f(args...; kwargs...)` on one of the workers in `pool`. Waits for completion and returns the result.
"""
remotecall_fetch(f, pool::WorkerPool, args...; kwargs...) = remotecall_pool(remotecall_fetch, f, pool, args...; kwargs...)
remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...) = remotecall_pool(remotecall_fetch, f, pool, args...; kwargs...)

"""
default_worker_pool()
Expand All @@ -107,10 +128,91 @@ end


"""
remote([::WorkerPool], f) -> Function
remote([::AbstractWorkerPool], f) -> Function
Returns a lambda that executes function `f` on an available worker
using `remotecall_fetch`.
"""
remote(f) = (args...; kwargs...)->remotecall_fetch(f, default_worker_pool(), args...; kwargs...)
remote(p::WorkerPool, f) = (args...; kwargs...)->remotecall_fetch(f, p, args...; kwargs...)
remote(p::AbstractWorkerPool, f) = (args...; kwargs...)->remotecall_fetch(f, p, args...; kwargs...)

type CachingPool <: AbstractWorkerPool
channel::RemoteChannel{Channel{Int}}
workers::Set{Int}

# Mapping between a tuple (worker_id, f) and a remote_ref
map_obj2ref::Dict{Tuple{Int, Function}, RemoteChannel}

function CachingPool()
wp = new(RemoteChannel(()->Channel{Int}(typemax(Int))), Set{Int}(), Dict{Int, Function}())
finalizer(wp, clear!)
wp
end
end

"""
CachingPool(workers::Vector{Int})
An implementation of an `AbstractWorkerPool`. `remote`, `remotecall_fetch`, `pmap` and other
remote calls which execute functions remotely, benefit from caching the serialized/deserialized
functions on the worker nodes, especially for closures which capture large amounts of data.
The remote cache is maintained for the lifetime of the returned `CachingPool` object. To clear the
cache earlier, use `clear!(pool)`.
For global variables, only the bindings are captured in a closure, not the data.
`let` blocks can be used to capture global data.
For example:
```
const foo=rand(10^8);
wp=CachingPool(workers())
let foo=foo
pmap(wp, i->sum(foo)+i, 1:100);
end
```
The above would transfer `foo` only once to each worker.
"""
function CachingPool(workers::Vector{Int})
pool = CachingPool()
for w in workers
push!(pool, w)
end
return pool
end

CachingPool(wp::WorkerPool) = CachingPool(workers(wp))

"""
clear!(pool::CachingPool) -> pool
Removes all cached functions from all participating workers.
"""
function clear!(pool::CachingPool)
for (_,rr) in pool.map_obj2ref
finalize(rr)
end
empty!(pool.map_obj2ref)
pool
end

exec_from_cache(rr::RemoteChannel, args...; kwargs...) = fetch(rr)(args...; kwargs...)
function exec_from_cache(f_ref::Tuple{Function, RemoteChannel}, args...; kwargs...)
put!(f_ref[2], f_ref[1]) # Cache locally
f_ref[1](args...; kwargs...)
end

function remotecall_pool(rc_f, f, pool::CachingPool, args...; kwargs...)
worker = take!(pool)
f_ref = get(pool.map_obj2ref, (worker, f), (f, RemoteChannel(worker)))
isa(f_ref, Tuple) && (pool.map_obj2ref[(worker, f)] = f_ref[2]) # Add to tracker

try
rc_f(exec_from_cache, worker, f_ref, args...; kwargs...)
finally
# ensure that we do not add pid 1 back if it is not registered.
worker in pool.workers && put!(pool, worker)
end
end
34 changes: 31 additions & 3 deletions doc/stdlib/parallel.rst
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,28 @@ General Parallel Computing Support
Create a WorkerPool from a vector of worker ids.

.. function:: CachingPool(workers::Vector{Int})

.. Docstring generated from Julia source
An implementation of an ``AbstractWorkerPool``\ . ``remote``\ , ``remotecall_fetch``\ , ``pmap`` and other remote calls which execute functions remotely, benefit from caching the serialized/deserialized functions on the worker nodes, especially for closures which capture large amounts of data.

The remote cache is maintained for the lifetime of the returned ``CachingPool`` object. To clear the cache earlier, use ``clear!(pool)``\ .

For global variables, only the bindings are captured in a closure, not the data. ``let`` blocks can be used to capture global data.

For example:

.. code-block:: julia
const foo=rand(10^8);
wp=CachingPool(workers())
let foo=foo
pmap(wp, i->sum(foo)+i, 1:100);
end
The above would transfer ``foo`` only once to each worker.

.. function:: rmprocs(pids...)

.. Docstring generated from Julia source
Expand All @@ -273,7 +295,7 @@ General Parallel Computing Support

For multiple collection arguments, apply f elementwise.

.. function:: pmap([::WorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection
.. function:: pmap([::AbstractWorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection

.. Docstring generated from Julia source
Expand Down Expand Up @@ -377,13 +399,13 @@ General Parallel Computing Support
Perform ``fetch(remotecall(...))`` in one message. Keyword arguments, if any, are passed through to ``func``\ . Any remote exceptions are captured in a ``RemoteException`` and thrown.

.. function:: remotecall_fetch(f, pool::WorkerPool, args...; kwargs...)
.. function:: remotecall_fetch(f, pool::AbstractWorkerPool, args...; kwargs...)

.. Docstring generated from Julia source
Call ``f(args...; kwargs...)`` on one of the workers in ``pool``\ . Waits for completion and returns the result.

.. function:: remote([::WorkerPool], f) -> Function
.. function:: remote([::AbstractWorkerPool], f) -> Function

.. Docstring generated from Julia source
Expand Down Expand Up @@ -531,6 +553,12 @@ General Parallel Computing Support
foo = 1
@eval @everywhere bar=$foo
.. function:: clear!(pool::CachingPool) -> pool

.. Docstring generated from Julia source
Removes all cached functions from all participating workers.

.. function:: Base.remoteref_id(r::AbstractRemoteRef) -> (whence, id)

.. Docstring generated from Julia source
Expand Down
1 change: 1 addition & 0 deletions doc/stdlib/strings.rst
Original file line number Diff line number Diff line change
Expand Up @@ -500,3 +500,4 @@
.. Docstring generated from Julia source
Create a string from the address of a NUL-terminated UTF-32 string. A copy is made; the pointer can be safely freed. If ``length`` is specified, the string does not have to be NUL-terminated.

2 changes: 1 addition & 1 deletion doc/stdlib/test.rst
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ gives a `Broken` `Result`.

.. Docstring generated from Julia source
For use to indicate a test that should pass but currently intermittently fails. Does not evaluate the expression.
For use to indicate a test that should pass but currently intermittently fails. Does not evaluate the expression, which makes it useful for tests of not-yet-implemented functionality.

Creating Custom ``AbstractTestSet`` Types
-----------------------------------------
Expand Down
7 changes: 7 additions & 0 deletions test/parallel_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -803,6 +803,13 @@ end
# Test asyncmap
@test allunique(asyncmap(x->object_id(current_task()), 1:100))

# CachingPool tests
wp = CachingPool(workers())
@test [1:100...] == pmap(wp, x->x, 1:100)

clear!(wp)
@test length(wp.map_obj2ref) == 0


# The below block of tests are usually run only on local development systems, since:
# - tests which print errors
Expand Down

0 comments on commit 96633b7

Please sign in to comment.