Distributed Arrays for Julia
NOTE Distributed Arrays will only work on Julia v0.4.0 or later.
DArray
s have been removed from Julia Base library in v0.4 so it is now necessary to import the DistributedArrays
package on all spawned processes.
@everywhere using DistributedArrays
Large computations are often organized around large arrays of data. In these cases, a particularly natural way to obtain parallelism is to distribute arrays among several processes. This combines the memory resources of multiple machines, allowing use of arrays too large to fit on one machine. Each process operates on the part of the array it owns, providing a ready answer to the question of how a program should be divided among machines.
Julia distributed arrays are implemented by the DArray
type. A
DArray
has an element type and dimensions just like an Array
.
A DArray
can also use arbitrary array-like types to represent the local
chunks that store actual data. The data in a DArray
is distributed by
dividing the index space into some number of blocks in each dimension.
Common kinds of arrays can be constructed with functions beginning with
d
:
dzeros(100,100,10)
dones(100,100,10)
drand(100,100,10)
drandn(100,100,10)
dfill(x,100,100,10)
In the last case, each element will be initialized to the specified
value x
. These functions automatically pick a distribution for you.
For more control, you can specify which processes to use, and how the
data should be distributed:
dzeros((100,100), workers()[1:4], [1,4])
The second argument specifies that the array should be created on the first
four workers. When dividing data among a large number of processes,
one often sees diminishing returns in performance. Placing DArray
\ s
on a subset of processes allows multiple DArray
computations to
happen at once, with a higher ratio of work to communication on each
process.
The third argument specifies a distribution; the nth element of
this array specifies how many pieces dimension n should be divided into.
In this example the first dimension will not be divided, and the second
dimension will be divided into 4 pieces. Therefore each local chunk will be
of size (100,25)
. Note that the product of the distribution array must
equal the number of processes.
-
distribute(a::Array)
converts a local array to a distributed array. -
localpart(a::DArray)
obtains the locally-stored portion of aDArray
. -
localindexes(a::DArray)
gives a tuple of the index ranges owned by the local process. -
convert(Array, a::DArray)
brings all the data to the local process.
Indexing a DArray
(square brackets) with ranges of indexes always
creates a SubArray
, not copying any data.
The primitive DArray
constructor has the following somewhat elaborate signature:
DArray(init, dims[, procs, dist])
init
is a function that accepts a tuple of index ranges. This function should
allocate a local chunk of the distributed array and initialize it for the specified
indices. dims
is the overall size of the distributed array.
procs
optionally specifies a vector of process IDs to use.
dist
is an integer vector specifying how many chunks the
distributed array should be divided into in each dimension.
The last two arguments are optional, and defaults will be used if they are omitted.
As an example, here is how to turn the local array constructor fill
into a distributed array constructor:
dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...)
In this case the init
function only needs to call fill
with the
dimensions of the local piece it is creating.
DArray
s can also be constructed from multidimensional Array
comprehensions with
the @DArray
macro syntax. This syntax is just sugar for the primitive DArray
constructor:
julia> [i+j for i = 1:5, j = 1:5]
5x5 Array{Int64,2}:
2 3 4 5 6
3 4 5 6 7
4 5 6 7 8
5 6 7 8 9
6 7 8 9 10
julia> @DArray [i+j for i = 1:5, j = 1:5]
5x5 DistributedArrays.DArray{Int64,2,Array{Int64,2}}:
2 3 4 5 6
3 4 5 6 7
4 5 6 7 8
5 6 7 8 9
6 7 8 9 10
At this time, distributed arrays do not have much functionality. Their major utility is allowing communication to be done via array indexing, which is convenient for many problems. As an example, consider implementing the "life" cellular automaton, where each cell in a grid is updated according to its neighboring cells. To compute a chunk of the result of one iteration, each process needs the immediate neighbor cells of its local chunk. The following code accomplishes this::
function life_step(d::DArray)
DArray(size(d),procs(d)) do I
top = mod(first(I[1])-2,size(d,1))+1
bot = mod( last(I[1]) ,size(d,1))+1
left = mod(first(I[2])-2,size(d,2))+1
right = mod( last(I[2]) ,size(d,2))+1
old = Array(Bool, length(I[1])+2, length(I[2])+2)
old[1 , 1 ] = d[top , left] # left side
old[2:end-1, 1 ] = d[I[1], left]
old[end , 1 ] = d[bot , left]
old[1 , 2:end-1] = d[top , I[2]]
old[2:end-1, 2:end-1] = d[I[1], I[2]] # middle
old[end , 2:end-1] = d[bot , I[2]]
old[1 , end ] = d[top , right] # right side
old[2:end-1, end ] = d[I[1], right]
old[end , end ] = d[bot , right]
life_rule(old)
end
end
As you can see, we use a series of indexing expressions to fetch
data into a local array old
. Note that the do
block syntax is
convenient for passing init
functions to the DArray
constructor.
Next, the serial function life_rule
is called to apply the update rules
to the data, yielding the needed DArray
chunk. Nothing about life_rule
is DArray
\ -specific, but we list it here for completeness::
function life_rule(old)
m, n = size(old)
new = similar(old, m-2, n-2)
for j = 2:n-1
for i = 2:m-1
nc = +(old[i-1,j-1], old[i-1,j], old[i-1,j+1],
old[i ,j-1], old[i ,j+1],
old[i+1,j-1], old[i+1,j], old[i+1,j+1])
new[i-1,j-1] = (nc == 3 || nc == 2 && old[i,j])
end
end
new
end
Floating point arithmetic is not associative and this comes up
when performing distributed computations over DArray
s. All DArray
operations are performed over the localpart
chunks and then aggregated.
The change in ordering of the operations will change the numeric result as
seen in this simple example:
julia> addprocs(8);
julia> @everywhere using DistributedArrays
julia> A = fill(1.1, (100,100));
julia> sum(A)
11000.000000000013
julia> DA = distribute(A);
julia> sum(DA)
11000.000000000127
julia> sum(A) == sum(DA)
false
The ultimate ordering of operations will be dependent on how the Array is distributed.
When a DArray is constructed (typically on the master process), the returned DArray objects stores information on how the array is distributed, which procesor holds which indexes and so on. When the DArray object on the master process is garbage collected, all particpating workers are notified and localparts of the DArray freed on each worker.
Since the size of the DArray object itself is small, a problem arises as gc
on the master faces no memory pressure to
collect the DArray immediately. This results in a delay of the memory being released on the participating workers.
Therefore it is highly recommended to explcitly call close(d::DArray)
as soon as user code
has finished working with the distributed array.
It is also important to note that the localparts of the DArray is collected from all particpating workers when the DArray object on the process creating the DArray is collected. It is therefore important to maintain a reference to a DArray object on the creating process for as long as it is being computed upon.
darray_closeall()
is another useful function to manage distributed memory. It releases all darrays created from
the calling process, including any temporaries created during computation.