Message passing, control and display utilities for distributed and parallel computing. The primary use case is convenience in:
- assigning variables on remote processes.
- computing on remote variables many times.
- retrieving some facts about variables on remote processes.
If you were hoping for parallel behaviour that looks something like:
reap(2, :(x = myid()))
#Dict{Int64,Any} with 1 entry:
# 2 => 2
reap(2, :(sqrt(x)))
#Dict{Int64,Any} with 1 entry:
# 2 => 1.41421
sow(2, :y, myid())
#Future(2,1,16,Nullable{Any}())
reap(2, :y)
#Dict{Int64,Any} with 1 entry:
# 2 => 1
Then this package may be of interest.
Current version of this repository requires Julia 0.5.
Traditional map-reduce can be effected by use of reap
.
Suppose we want to add together 3 vectors of 5 squared uniform random variables:
using ClusterUtils
pids = [2,3,4]
reduce(+, values(reap(pids, :(rand(5).^2))))
#5-element Array{Float64,1}:
# 1.61008
# 2.43127
# 2.10954
# 0.696425
# 3.06037
We can map the results of an evaluation to a symbol on remote processes using sow
. This makes the symbol's value available for future operations.
The default is that the symbol has global scope (keyword arg mod=Main
).
We can retrieve the value of a symbol on remote processes using reap
.
sow(3, :bob, pi^2)
#RemoteRef{Channel{Any}}(3,1,877)
@fetchfrom 3 bob
#9.869604401089358
sow([2,3,4], :morebob, :(100 + myid()))
#3-element Array{Any,1}:
# RemoteRef{Channel{Any}}(2,1,879)
# RemoteRef{Channel{Any}}(3,1,880)
# RemoteRef{Channel{Any}}(4,1,881)
reap([2,3,4], :morebob)
#Dict{Any,Any} with 3 entries:
# 4 => 104
# 2 => 102
# 3 => 103
sow(4, :specificbob, :(sqrt(myid())); mod=ClusterUtils)
#RemoteRef{Channel{Any}}(4,1,933)
@fetchfrom 4 ClusterUtils.specificbob
#2.0
We can also map to slices, keys, etc. like so:
pids = [2,3,4]
sow(pids, :bar, :(randn(4,4)))
sow(pids, :(bar[1:2,:]), 0)
reap(pids, :bar)[pids[1]]
#4x4 Array{Float64,2}:
# 0.0 0.0 0.0 0.0
# 0.0 0.0 0.0 0.0
# -1.15992 -1.45188 0.282003 -0.135126
# 0.652393 -1.14758 0.648452 1.17555
sow(pids, :somedict, :(Dict(:buzz=>randn(2,2), :badger=>"hello")));
sow(pids, :(somedict[:buzz][:,:]), pi);
reap(pids, :(somedict[:buzz]))[pids[1]]
#2x2 Array{Float64,2}:
# 3.14159 3.14159
# 3.14159 3.14159
The next example is of a utility to describe which processes are on which hosts as this is useful for SharedArray creation.
describepids
returns a dict where the keys are processes on unique hosts, the keyed value represents all processes on that host.
With no arguments only remote processes are described.
@everywhere using ClusterUtils
topo = describepids() # only procs on remote machines
#Dict{Any,Any} with 2 entries:
# 13 => [2,3,4,5,6,7,8,9,10,11,12,13]
# 25 => [14,15,16,17,18,19,20,21,22,23,24,25]
topo = describepids(remote=1) # only procs on local machine
#Dict{Any,Any} with 1 entry:
# 1 => [1]
topo = describepids(remote=2) # procs on all machines
#Dict{Any,Any} with 3 entries:
# 13 => [2,3,4,5,6,7,8,9,10,11,12,13]
# 25 => [14,15,16,17,18,19,20,21,22,23,24,25]
# 1 => [1]
topo = describepids(procs(); filterfn=(x)->ismatch(r"tesla", x)) # custom filter based on `hostname`
#Dict{Any,Any} with 2 entries:
# 13 => [2,3,4,5,6,7,8,9,10,11,12,13]
# 25 => [14,15,16,17,18,19,20,21,22,23,24,25]
Using the network topology information we can setup SharedArray
objects such that memory is shared between processes on the same machines
This is currently not recommended for heavy use (OK to setup at the beggining of an algorithm, but not multiple calls per second).
topo = describepids();
reps = collect(keys(topo)) #a representative process from each machine
#5-element Array{Int64,1}:
# 86
# 98
# 74
# 62
# 110
# initialise a SharedArray using the same random matrix on each machine:
broadcast_shared(topo, Float32, rand(4,4), :foo);
sow(reps, :(foo[1,:]), :(map(Float32, pi)))
someworkers = reps+1
#5-element Array{Int64,1}:
# 87
# 99
# 75
# 63
# 111
workerslices = reap(someworkers, :(foo[1:2,:]));
workerslices[someworkers[1]]
#2x4 Array{Float32,2}:
# 3.14159 3.14159 3.14159 3.14159
# 0.317024 0.381314 0.726252 0.709073
The stitch
function concatenates objects stored on remotes in the ascending order of their process numbers
pids
#5-element Array{Int64,1}:
# 86
# 98
# 74
# 62
# 110
sow(pids, :x, :(myid()^2*randn(3)))
stitch(pids, :x)
#15-element Array{Float64,1}:
# -724.409
# 4207.66
# 2020.86
# -8955.88
# -8417.0
# 5453.98
# -3782.38
# 2769.99
# 7397.81
# -12128.3
# -2169.37
# -772.394
# -10736.4
# -8078.21
# -31016.8
Here we use some remote MessageDict
s to swap messages between different processes.
@everywhere using ClusterUtils
remotes = workers()
#24-element Array{Int64,1}:
# 2
# 3
# 4
# [...]
msgmaster = Dict(k=>0 for k in remotes);
sow(remotes, :somemsg, msgmaster );
isdefined(:somemsg)
#false
@fetchfrom remotes[1] somemsg
#ClusterUtils.MessageDict
#Dict{Int64,Any} with 24 entries:
# 2 => 0
# 11 => 0
# 7 => 0
# [...]
[@spawnat k somemsg[k] = 2k for k in remotes];
@fetchfrom remotes[1] somemsg
#ClusterUtils.MessageDict
#Dict{Int64,Any} with 24 entries:
# 2 => 4
# 11 => 0
# 7 => 0
# [...]
swapmsgs(remotes, :somemsg)
@fetchfrom remotes[1] somemsg
#ClusterUtils.MessageDict
#Dict{Int64,Any} with 24 entries:
# 2 => 4
# 11 => 22
# 7 => 14
# [...]
ClusterUtils.@timeit 100 swapmsgs( workers(), :somemsg )
#0.016565912109999994
Carrying on from the previous example. Suppose we did not need to swapmsgs
between workers, but instead only to aggregate the results on a single process.
Here we use a Dict held on process 1 to aggregate work done on remote servers.
collectmsgs(:somemsg, remotes)
#Dict{Int64,Any} with 24 entries:
# 2 => 4
# 16 => 32
# 11 => 22
# [...]
Two functions save
and load
are provided for conveniently storing and recovering data, built on top of Base.serialize
. First usage is for objects of Any
type:
save("temp.jlf", randn(5))
load("temp.jlf")
#5-element Array{Float64,1}:
# -1.41679
# -1.2292
# 0.103825
# 0.0804196
# 1.4737
Second usage takes a list of symbols for variables defined in Main (default kwarg mod=Main
), turns them into a Dict
and save
s that.
R = [1,2,3];
X = "Fnordic";
save("temp.jlf", :X, :R)
load("temp.jlf")
#Dict{Symbol,Any} with 2 entries:
# :R => [1,2,3]
# :X => "Fnordic"
Display of SharedArray
s is not well behaved when all processes with access to the underlying shared memory are on a remote host.
The module also includes a workaround for issue #14445, which is related to garbage collection interrupts from remotecall operations.