From a71470ad72a3bf4e73410453be7c968532437ec7 Mon Sep 17 00:00:00 2001 From: Andreas Noack Date: Wed, 3 Aug 2016 22:28:21 -0400 Subject: [PATCH] Split into files --- src/DistributedArrays.jl | 1607 +------------------------------------- src/core.jl | 712 +++++++++++++++++ src/linalg.jl | 260 ++++++ src/mapreduce.jl | 370 +++++++++ src/serialize.jl | 86 ++ src/sort.jl | 170 ++++ 6 files changed, 1603 insertions(+), 1602 deletions(-) create mode 100644 src/core.jl create mode 100644 src/linalg.jl create mode 100644 src/mapreduce.jl create mode 100644 src/serialize.jl create mode 100644 src/sort.jl diff --git a/src/DistributedArrays.jl b/src/DistributedArrays.jl index b6695d2..887c578 100644 --- a/src/DistributedArrays.jl +++ b/src/DistributedArrays.jl @@ -17,1607 +17,10 @@ export DArray, SubDArray, SubOrDArray, @DArray export dzeros, dones, dfill, drand, drandn, distribute, localpart, localindexes, ppeval, samedist export close, darray_closeall -const registry=Dict{Tuple, Any}() -const refs=Set() # Collection of darray identities created on this node - -let DID::Int = 1 - global next_did - next_did() = (id = DID; DID += 1; (myid(), id)) -end - -""" - next_did() - -Produces an incrementing ID that will be used for DArrays. -""" -next_did - -""" - DArray(init, dims, [procs, dist]) - -Construct a distributed array. - -The parameter `init` is a function that accepts a tuple of index ranges. -This function should allocate a local chunk of the distributed array and initialize it for the specified indices. - -`dims` is the overall size of the distributed array. - -`procs` optionally specifies a vector of process IDs to use. -If unspecified, the array is distributed over all worker processes only. Typically, when running in distributed mode, -i.e., nprocs() > 1, this would mean that no chunk of the distributed array exists on the process hosting the -interactive julia prompt. - -`dist` is an integer vector specifying how many chunks the distributed array should be divided into in each dimension. - -For example, the `dfill` function that creates a distributed array and fills it with a value `v` is implemented as: - -### Example -```jl -dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...) -``` -""" -type DArray{T,N,A} <: AbstractArray{T,N} - identity::Tuple - dims::NTuple{N,Int} - pids::Array{Int,N} # pids[i]==p ⇒ processor p has piece i - indexes::Array{NTuple{N,UnitRange{Int}},N} # indexes held by piece i - cuts::Vector{Vector{Int}} # cuts[d][i] = first index of chunk i in dimension d - - release::Bool - - function DArray(identity, dims, pids, indexes, cuts) - # check invariants - if dims != map(last, last(indexes)) - throw(ArgumentError("dimension of DArray (dim) and indexes do not match")) - end - release = (myid() == identity[1]) - - global registry - haskey(registry, (identity, :DARRAY)) && return registry[(identity, :DARRAY)] - - d = new(identity, dims, pids, indexes, cuts, release) - if release - push!(refs, identity) - registry[(identity, :DARRAY)] = d - -# println("Installing finalizer for : ", d.identity, ", : ", object_id(d), ", isbits: ", isbits(d)) - finalizer(d, close) - end - d - end - - DArray() = new() -end - -typealias SubDArray{T,N,D<:DArray} SubArray{T,N,D} -typealias SubOrDArray{T,N} Union{DArray{T,N}, SubDArray{T,N}} - -## core constructors ## - -function DArray(identity, init, dims, pids, idxs, cuts) - r=Channel(1) - @sync begin - for i = 1:length(pids) - @async begin - local typA - if isa(init, Function) - typA=remotecall_fetch(construct_localparts, pids[i], init, identity, dims, pids, idxs, cuts) - else - # constructing from an array of remote refs. - typA=remotecall_fetch(construct_localparts, pids[i], init[i], identity, dims, pids, idxs, cuts) - end - !isready(r) && put!(r, typA) - end - end - end - - typA = take!(r) - if myid() in pids - d = registry[(identity, :DARRAY)] - else - d = DArray{eltype(typA),length(dims),typA}(identity, dims, pids, idxs, cuts) - end - d -end - -function construct_localparts(init, identity, dims, pids, idxs, cuts) - A = isa(init, Function) ? init(idxs[localpartindex(pids)]) : fetch(init) - global registry - registry[(identity, :LOCALPART)] = A - typA = typeof(A) - d = DArray{eltype(typA),length(dims),typA}(identity, dims, pids, idxs, cuts) - registry[(identity, :DARRAY)] = d - typA -end - -function DArray(init, dims, procs, dist) - np = prod(dist) - procs = reshape(procs[1:np], ntuple(i->dist[i], length(dist))) - idxs, cuts = chunk_idxs([dims...], dist) - identity = next_did() - - return DArray(identity, init, dims, procs, idxs, cuts) -end - -function DArray(init, dims, procs) - if isempty(procs) - throw(ArgumentError("no processors given")) - end - return DArray(init, dims, procs, defaultdist(dims, procs)) -end -DArray(init, dims) = DArray(init, dims, workers()[1:min(nworkers(), maximum(dims))]) - -# Create a DArray from a collection of references -# The refs must have the same layout as the parts distributed. -# i.e. -# size(refs) must specify the distribution of dimensions across processors -# prod(size(refs)) must equal number of parts -# FIXME : Empty parts are currently not supported. -function DArray(refs) - dimdist = size(refs) - identity = next_did() - - npids = [r.where for r in refs] - nsizes = Array(Tuple, dimdist) - @sync for i in 1:length(refs) - let i=i - @async nsizes[i] = remotecall_fetch(rr_localpart, npids[i], refs[i], identity) - end - end - - nindexes = Array(NTuple{length(dimdist),UnitRange{Int}}, dimdist...) - - for i in 1:length(nindexes) - subidx = ind2sub(dimdist, i) - nindexes[i] = ntuple(length(subidx)) do x - idx_in_dim = subidx[x] - startidx = 1 - for j in 1:(idx_in_dim-1) - prevsubidx = ntuple(y -> y == x ? j : subidx[y], length(subidx)) - prevsize = nsizes[prevsubidx...] - startidx += prevsize[x] - end - startidx:startidx+(nsizes[i][x])-1 - end - end - - lastidxs = hcat([Int[last(idx_in_d)+1 for idx_in_d in idx] for idx in nindexes]...) - ncuts = Array{Int,1}[unshift!(sort(unique(lastidxs[x,:])), 1) for x in 1:length(dimdist)] - ndims = tuple([sort(unique(lastidxs[x,:]))[end]-1 for x in 1:length(dimdist)]...) - - DArray(identity, refs, ndims, reshape(npids, dimdist), nindexes, ncuts) -end - -macro DArray(ex0::Expr) - if ex0.head !== :comprehension - throw(ArgumentError("invalid @DArray syntax")) - end - ex = ex0.args[1] - if ex.head !== :generator - throw(ArgumentError("invalid @DArray syntax")) - end - ex.args[1] = esc(ex.args[1]) - ndim = length(ex.args) - 1 - ranges = map(r->esc(r.args[2]), ex.args[2:end]) - for d = 1:ndim - var = ex.args[d+1].args[1] - ex.args[d+1] = :( $(esc(var)) = ($(ranges[d]))[I[$d]] ) - end - return :( DArray((I::Tuple{Vararg{UnitRange{Int}}})->($ex0), - tuple($(map(r->:(length($r)), ranges)...))) ) -end - -# new DArray similar to an existing one -DArray(init, d::DArray) = DArray(next_did(), init, size(d), procs(d), d.indexes, d.cuts) - -function release_localpart(identity) - global registry - delete!(registry, (identity, :DARRAY)) - delete!(registry, (identity, :LOCALPART)) - nothing -end -release_localpart(d::DArray) = release_localpart(d.identity) - -function close_by_identity(identity, pids) -# @schedule println("Finalizer for : ", identity) - global refs - @sync begin - for p in pids - @async remotecall_fetch(release_localpart, p, identity) - end - if !(myid() in pids) - release_localpart(identity) - end - end - delete!(refs, identity) - nothing -end - -function close(d::DArray) -# @schedule println("close : ", d.identity, ", object_id : ", object_id(d), ", myid : ", myid() ) - if (myid() == d.identity[1]) && d.release - @schedule close_by_identity(d.identity, d.pids) - d.release = false - end - nothing -end - -function darray_closeall() - global registry - global refs - crefs = copy(refs) - for identity in crefs - if identity[1] == myid() # sanity check - haskey(registry, (identity, :DARRAY)) && close(registry[(identity, :DARRAY)]) - yield() - end - end -end - -function rr_localpart(ref, identity) - global registry - lp = fetch(ref) - registry[(identity, :LOCALPART)] = lp - return size(lp) -end - -function Base.serialize(S::AbstractSerializer, d::DArray) - # Only send the ident for participating workers - we expect the DArray to exist in the - # remote registry - destpid = Base.worker_id_from_socket(S.io) - Serializer.serialize_type(S, typeof(d)) - if (destpid in d.pids) || (destpid == d.identity[1]) - serialize(S, (true, d.identity)) # (identity_only, identity) - else - serialize(S, (false, d.identity)) - for n in [:dims, :pids, :indexes, :cuts] - serialize(S, getfield(d, n)) - end - end -end - -function Base.deserialize{T<:DArray}(S::AbstractSerializer, t::Type{T}) - what = deserialize(S) - identity_only = what[1] - identity = what[2] - - if identity_only - global registry - if haskey(registry, (identity, :DARRAY)) - return registry[(identity, :DARRAY)] - else - # access to fields will throw an error, at least the deserialization process will not - # result in worker death - d = T() - d.identity = identity - return d - end - else - # We are not a participating worker, deser fields and instantiate locally. - dims = deserialize(S) - pids = deserialize(S) - indexes = deserialize(S) - cuts = deserialize(S) - return T(identity, dims, pids, indexes, cuts) - end -end - - -Base.similar(d::DArray, T::Type, dims::Dims) = DArray(I->Array(T, map(length,I)), dims, procs(d)) -Base.similar(d::DArray, T::Type) = similar(d, T, size(d)) -Base.similar{T}(d::DArray{T}, dims::Dims) = similar(d, T, dims) -Base.similar{T}(d::DArray{T}) = similar(d, T, size(d)) - -Base.size(d::DArray) = d.dims - - -""" - procs(d::DArray) - -Get the vector of processes storing pieces of DArray `d`. -""" -Base.procs(d::DArray) = d.pids - -chunktype{T,N,A}(d::DArray{T,N,A}) = A - -## chunk index utilities ## - -# decide how to divide each dimension -# returns size of chunks array -function defaultdist(dims, pids) - dims = [dims...] - chunks = ones(Int, length(dims)) - np = length(pids) - f = sort!(collect(keys(factor(np))), rev=true) - k = 1 - while np > 1 - # repeatedly allocate largest factor to largest dim - if np % f[k] != 0 - k += 1 - if k > length(f) - break - end - end - fac = f[k] - (d, dno) = findmax(dims) - # resolve ties to highest dim - dno = last(find(dims .== d)) - if dims[dno] >= fac - dims[dno] = div(dims[dno], fac) - chunks[dno] *= fac - end - np = div(np, fac) - end - return chunks -end - -# get array of start indexes for dividing sz into nc chunks -function defaultdist(sz::Int, nc::Int) - if sz >= nc - return round(Int, linspace(1, sz+1, nc+1)) - else - return [[1:(sz+1);], zeros(Int, nc-sz);] - end -end - -# compute indexes array for dividing dims into chunks -function chunk_idxs(dims, chunks) - cuts = map(defaultdist, dims, chunks) - n = length(dims) - idxs = Array(NTuple{n,UnitRange{Int}},chunks...) - for cidx in CartesianRange(tuple(chunks...)) - idxs[cidx.I...] = ntuple(i -> (cuts[i][cidx[i]]:cuts[i][cidx[i] + 1] - 1), n) - end - return (idxs, cuts) -end - -function localpartindex(pids::Array{Int}) - mi = myid() - for i = 1:length(pids) - if pids[i] == mi - return i - end - end - return 0 -end -localpartindex(d::DArray) = localpartindex(procs(d)) - -""" - localpart(d::DArray) - -Get the local piece of a distributed array. -Returns an empty array if no local part exists on the calling process. -""" -function localpart{T,N,A}(d::DArray{T,N,A}) - lpidx = localpartindex(d) - if lpidx == 0 - return convert(A, Array(T, ntuple(zero, N)))::A - end - - global registry - return registry[(d.identity, :LOCALPART)]::A -end - -localpart(d::DArray, localidx...) = localpart(d)[localidx...] - -# fetch localpart of d at pids[i] -fetch{T,N,A}(d::DArray{T,N,A}, i) = remotecall_fetch(localpart, d.pids[i], d) - -""" - localpart(A) - -The identity when input is not distributed -""" -localpart(A) = A - -""" - localindexes(d) - -A tuple describing the indexes owned by the local process. -Returns a tuple with empty ranges if no local part exists on the calling process. -""" -function localindexes(d::DArray) - lpidx = localpartindex(d) - if lpidx == 0 - return ntuple(i -> 1:0, ndims(d)) - end - return d.indexes[lpidx] -end - -# find which piece holds index (I...) -locate(d::DArray, I::Int...) = - ntuple(i -> searchsortedlast(d.cuts[i], I[i]), ndims(d)) - -chunk{T,N,A}(d::DArray{T,N,A}, i...) = remotecall_fetch(localpart, d.pids[i...], d)::A - -## convenience constructors ## - -""" - dzeros(dims, ...) - -Construct a distributed array of zeros. -Trailing arguments are the same as those accepted by `DArray`. -""" -dzeros(dims::Dims, args...) = DArray(I->zeros(map(length,I)), dims, args...) -dzeros{T}(::Type{T}, dims::Dims, args...) = DArray(I->zeros(T,map(length,I)), dims, args...) -dzeros{T}(::Type{T}, d1::Integer, drest::Integer...) = dzeros(T, convert(Dims, tuple(d1, drest...))) -dzeros(d1::Integer, drest::Integer...) = dzeros(Float64, convert(Dims, tuple(d1, drest...))) -dzeros(d::Dims) = dzeros(Float64, d) - - -""" - dones(dims, ...) - -Construct a distributed array of ones. -Trailing arguments are the same as those accepted by `DArray`. -""" -dones(dims::Dims, args...) = DArray(I->ones(map(length,I)), dims, args...) -dones{T}(::Type{T}, dims::Dims, args...) = DArray(I->ones(T,map(length,I)), dims, args...) -dones{T}(::Type{T}, d1::Integer, drest::Integer...) = dones(T, convert(Dims, tuple(d1, drest...))) -dones(d1::Integer, drest::Integer...) = dones(Float64, convert(Dims, tuple(d1, drest...))) -dones(d::Dims) = dones(Float64, d) - -""" - dfill(x, dims, ...) - -Construct a distributed array filled with value `x`. -Trailing arguments are the same as those accepted by `DArray`. -""" -dfill(v, dims::Dims, args...) = DArray(I->fill(v, map(length,I)), dims, args...) -dfill(v, d1::Integer, drest::Integer...) = dfill(v, convert(Dims, tuple(d1, drest...))) - -""" - drand(dims, ...) - -Construct a distributed uniform random array. -Trailing arguments are the same as those accepted by `DArray`. -""" -drand{T}(::Type{T}, dims::Dims, args...) = DArray(I->rand(T,map(length,I)), dims, args...) -drand{T}(::Type{T}, d1::Integer, drest::Integer...) = drand(T, convert(Dims, tuple(d1, drest...))) -drand(d1::Integer, drest::Integer...) = drand(Float64, convert(Dims, tuple(d1, drest...))) -drand(d::Dims, args...) = drand(Float64, d, args...) - -""" - drandn(dims, ...) - -Construct a distributed normal random array. -Trailing arguments are the same as those accepted by `DArray`. -""" -drandn(dims::Dims, args...) = DArray(I->randn(map(length,I)), dims, args...) -drandn(d1::Integer, drest::Integer...) = drandn(convert(Dims, tuple(d1, drest...))) - -## conversions ## - -""" - distribute(A[; procs, dist]) - -Convert a local array to distributed. - -`procs` optionally specifies an array of process IDs to use. (defaults to all workers) -`dist` optionally specifies a vector or tuple of the number of partitions in each dimension -""" -function distribute(A::AbstractArray; - procs = workers()[1:min(nworkers(), maximum(size(A)))], - dist = defaultdist(size(A), procs)) - idxs, _ = chunk_idxs([size(A)...], dist) - - pas = PartitionedSerializer(A, procs, idxs) - return DArray(I->verify_and_get(pas, I), size(A), procs, dist) -end - -""" - distribute(A, DA) - -Distribute a local array `A` like the distributed array `DA`. - -""" -function distribute(A::AbstractArray, DA::DArray) - size(DA) == size(A) || throw(DimensionMismatch("Distributed array has size $(size(DA)) but array has $(size(A))")) - - pas = PartitionedSerializer(A, procs(DA), DA.indexes) - return DArray(I->verify_and_get(pas, I), DA) -end - -Base.convert{T,N,S<:AbstractArray}(::Type{DArray{T,N,S}}, A::S) = distribute(convert(AbstractArray{T,N}, A)) - -Base.convert{S,T,N}(::Type{Array{S,N}}, d::DArray{T,N}) = begin - a = Array(S, size(d)) - @sync begin - for i = 1:length(d.pids) - @async a[d.indexes[i]...] = chunk(d, i) - end - end - return a -end - -Base.convert{S,T,N}(::Type{Array{S,N}}, s::SubDArray{T,N}) = begin - I = s.indexes - d = s.parent - if isa(I,Tuple{Vararg{UnitRange{Int}}}) && S<:T && T<:S - l = locate(d, map(first, I)...) - if isequal(d.indexes[l...], I) - # SubDArray corresponds to a chunk - return chunk(d, l...) - end - end - a = Array(S, size(s)) - a[[1:size(a,i) for i=1:N]...] = s - return a -end - -function Base.convert{T,N}(::Type{DArray}, SD::SubArray{T,N}) - D = SD.parent - DArray(size(SD), procs(D)) do I - TR = typeof(SD.indexes[1]) - lindices = Array(TR, 0) - for (i,r) in zip(I, SD.indexes) - st = step(r) - lrstart = first(r) + st*(first(i)-1) - lrend = first(r) + st*(last(i)-1) - if TR <: UnitRange - push!(lindices, lrstart:lrend) - else - push!(lindices, lrstart:st:lrend) - end - end - convert(Array, D[lindices...]) - end -end - -Base.reshape{T,S<:Array}(A::DArray{T,1,S}, d::Dims) = begin - if prod(d) != length(A) - throw(DimensionMismatch("dimensions must be consistent with array size")) - end - return DArray(d) do I - sz = map(length,I) - d1offs = first(I[1]) - nd = length(I) - - B = Array(T,sz) - nr = size(B,1) - sztail = size(B)[2:end] - - for i=1:div(length(B),nr) - i2 = ind2sub(sztail, i) - globalidx = [ I[j][i2[j-1]] for j=2:nd ] - - a = sub2ind(d, d1offs, globalidx...) - - B[:,i] = A[a:(a+nr-1)] - end - B - end -end - -## indexing ## - -getlocalindex(d::DArray, idx...) = localpart(d)[idx...] -function getindex_tuple{T}(d::DArray{T}, I::Tuple{Vararg{Int}}) - chidx = locate(d, I...) - idxs = d.indexes[chidx...] - localidx = ntuple(i -> (I[i] - first(idxs[i]) + 1), ndims(d)) - pid = d.pids[chidx...] - return remotecall_fetch(getlocalindex, pid, d, localidx...)::T -end - -Base.getindex(d::DArray, i::Int) = getindex_tuple(d, ind2sub(size(d), i)) -Base.getindex(d::DArray, i::Int...) = getindex_tuple(d, i) - -Base.getindex(d::DArray) = d[1] -Base.getindex(d::DArray, I::Union{Int,UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) = view(d, I...) - -Base.copy!(dest::SubOrDArray, src::SubOrDArray) = begin - if !(size(dest) == size(src) && - procs(dest) == procs(src) && - dest.indexes == src.indexes && - dest.cuts == src.cuts) - throw(DimensionMismatch("destination array doesn't fit to source array")) - end - @sync for p in procs(dest) - @async remotecall_fetch((dest,src)->(copy!(localpart(dest), localpart(src)); nothing), p, dest, src) - end - return dest -end - -# local copies are obtained by convert(Array, ) or assigning from -# a SubDArray to a local Array. - -function Base.setindex!(a::Array, d::DArray, - I::Union{UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) - n = length(I) - @sync for i = 1:length(d.pids) - K = d.indexes[i] - @async a[[I[j][K[j]] for j=1:n]...] = chunk(d, i) - end - return a -end - -# We also want to optimize setindex! with a SubDArray source, but this is hard -# and only works on 0.5. - -# Similar to Base.indexin, but just create a logical mask. Note that this -# must return a logical mask in order to support merging multiple masks -# together into one linear index since we need to know how many elements to -# skip at the end. In many cases range intersection would be much faster -# than generating a logical mask, but that loses the endpoint information. -indexin_mask(a, b::Number) = a .== b -indexin_mask(a, r::Range{Int}) = [i in r for i in a] -indexin_mask(a, b::AbstractArray{Int}) = indexin_mask(a, IntSet(b)) -indexin_mask(a, b::AbstractArray) = indexin_mask(a, Set(b)) -indexin_mask(a, b) = [i in b for i in a] - -import Base: tail -# Given a tuple of indices and a tuple of masks, restrict the indices to the -# valid regions. This is, effectively, reversing Base.setindex_shape_check. -# We can't just use indexing into MergedIndices here because getindex is much -# pickier about singleton dimensions than setindex! is. -restrict_indices(::Tuple{}, ::Tuple{}) = () -function restrict_indices(a::Tuple{Any, Vararg{Any}}, b::Tuple{Any, Vararg{Any}}) - if (length(a[1]) == length(b[1]) == 1) || (length(a[1]) > 1 && length(b[1]) > 1) - (vec(a[1])[vec(b[1])], restrict_indices(tail(a), tail(b))...) - elseif length(a[1]) == 1 - (a[1], restrict_indices(tail(a), b)) - elseif length(b[1]) == 1 && b[1][1] - restrict_indices(a, tail(b)) - else - throw(DimensionMismatch("this should be caught by setindex_shape_check; please submit an issue")) - end -end -# The final indices are funky - they're allowed to accumulate together. -# An easy (albeit very inefficient) fix for too many masks is to use the -# outer product to merge them. But we can do that lazily with a custom type: -function restrict_indices(a::Tuple{Any}, b::Tuple{Any, Any, Vararg{Any}}) - (vec(a[1])[vec(ProductIndices(b, map(length, b)))],) -end -# But too many indices is much harder; this requires merging the indices -# in `a` before applying the final mask in `b`. -function restrict_indices(a::Tuple{Any, Any, Vararg{Any}}, b::Tuple{Any}) - if length(a[1]) == 1 - (a[1], restrict_indices(tail(a), b)) - else - # When one mask spans multiple indices, we need to merge the indices - # together. At this point, we can just use indexing to merge them since - # there's no longer special handling of singleton dimensions - (view(MergedIndices(a, map(length, a)), b[1]),) - end -end - -immutable ProductIndices{I,N} <: AbstractArray{Bool, N} - indices::I - sz::NTuple{N,Int} -end -Base.size(P::ProductIndices) = P.sz -# This gets passed to map to avoid breaking propagation of inbounds -Base.@propagate_inbounds propagate_getindex(A, I...) = A[I...] -Base.@propagate_inbounds Base.getindex{_,N}(P::ProductIndices{_,N}, I::Vararg{Int, N}) = - Bool((&)(map(propagate_getindex, P.indices, I)...)) - -immutable MergedIndices{I,N} <: AbstractArray{CartesianIndex{N}, N} - indices::I - sz::NTuple{N,Int} -end -Base.size(M::MergedIndices) = M.sz -Base.@propagate_inbounds Base.getindex{_,N}(M::MergedIndices{_,N}, I::Vararg{Int, N}) = - CartesianIndex(map(propagate_getindex, M.indices, I)) -# Additionally, we optimize bounds checking when using MergedIndices as an -# array index since checking, e.g., A[1:500, 1:500] is *way* faster than -# checking an array of 500^2 elements of CartesianIndex{2}. This optimization -# also applies to reshapes of MergedIndices since the outer shape of the -# container doesn't affect the index elements themselves. We can go even -# farther and say that even restricted views of MergedIndices must be valid -# over the entire array. This is overly strict in general, but in this -# use-case all the merged indices must be valid at some point, so it's ok. -typealias ReshapedMergedIndices{T,N,M<:MergedIndices} Base.ReshapedArray{T,N,M} -typealias SubMergedIndices{T,N,M<:Union{MergedIndices, ReshapedMergedIndices}} SubArray{T,N,M} -typealias MergedIndicesOrSub Union{MergedIndices, ReshapedMergedIndices, SubMergedIndices} -import Base: checkbounds_indices -@inline checkbounds_indices(::Type{Bool}, inds::Tuple{}, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) = - checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...)) -@inline checkbounds_indices(::Type{Bool}, inds::Tuple{Any}, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) = - checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...)) -@inline checkbounds_indices(::Type{Bool}, inds::Tuple, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) = - checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...)) - -# The tricky thing here is that we want to optimize the accesses into the -# distributed array, but in doing so, we lose track of which indices in I we -# should be using. -# -# I’ve come to the conclusion that the function is utterly insane. -# There are *6* flavors of indices with four different reference points: -# 1. Find the indices of each portion of the DArray. -# 2. Find the valid subset of indices for the SubArray into that portion. -# 3. Find the portion of the `I` indices that should be used when you access the -# `K` indices in the subarray. This guy is nasty. It’s totally backwards -# from all other arrays, wherein we simply iterate over the source array’s -# elements. You need to *both* know which elements in `J` were skipped -# (`indexin_mask`) and which dimensions should match up (`restrict_indices`) -# 4. If `K` doesn’t correspond to an entire chunk, reinterpret `K` in terms of -# the local portion of the source array -function Base.setindex!(a::Array, s::SubDArray, - I::Union{UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) - Base.setindex_shape_check(s, Base.index_lengths(a, I...)...) - n = length(I) - d = s.parent - J = Base.decolon(d, s.indexes...) - @sync for i = 1:length(d.pids) - K_c = d.indexes[i] - K = map(intersect, J, K_c) - if !any(isempty, K) - K_mask = map(indexin_mask, J, K_c) - idxs = restrict_indices(Base.decolon(a, I...), K_mask) - if isequal(K, K_c) - # whole chunk - @async a[idxs...] = chunk(d, i) - else - # partial chunk - @async a[idxs...] = - remotecall_fetch(d.pids[i]) do - view(localpart(d), [K[j]-first(K_c[j])+1 for j=1:length(J)]...) - end - end - end - end - return a -end - -Base.fill!(A::DArray, x) = begin - @sync for p in procs(A) - @async remotecall_fetch((A,x)->(fill!(localpart(A), x); nothing), p, A, x) - end - return A -end - -## higher-order functions ## - -Base.map(f, d::DArray) = DArray(I->map(f, localpart(d)), d) - -function Base.reduce(f, d::DArray) - results=[] - @sync begin - for p in procs(d) - @async push!(results, remotecall_fetch((f,d)->reduce(f, localpart(d)), p, f, d)) - end - end - reduce(f, results) -end - -function _mapreduce(f, opt, d::DArray) -# TODO Change to an @async remotecall_fetch - will reduce one extra network hop - -# once bug in master is fixed. - results=[] - @sync begin - for p in procs(d) - @async push!(results, remotecall_fetch((f,opt,d)->mapreduce(f, opt, localpart(d)), p, f, opt, d)) - end - end - reduce(opt, results) -end -Base.mapreduce(f, opt::Union{typeof(@functorize(|)), typeof(@functorize(&))}, d::DArray) = _mapreduce(f, opt, d) -Base.mapreduce(f, opt::Function, d::DArray) = _mapreduce(f, opt, d) -Base.mapreduce(f, opt, d::DArray) = _mapreduce(f, opt, d) - -Base.map!{F}(f::F, d::DArray) = begin - @sync for p in procs(d) - @async remotecall_fetch((f,d)->(map!(f, localpart(d)); nothing), p, f, d) - end - return d -end - -# mapreducedim -Base.reducedim_initarray{R}(A::DArray, region, v0, ::Type{R}) = begin - # Store reduction on lowest pids - pids = A.pids[ntuple(i -> i in region ? (1:1) : (:), ndims(A))...] - chunks = similar(pids, Future) - @sync for i in eachindex(pids) - @async chunks[i...] = remotecall_wait(() -> Base.reducedim_initarray(localpart(A), region, v0, R), pids[i...]) - end - return DArray(chunks) -end -Base.reducedim_initarray{T}(A::DArray, region, v0::T) = Base.reducedim_initarray(A, region, v0, T) - -Base.reducedim_initarray0{R}(A::DArray, region, v0, ::Type{R}) = begin - # Store reduction on lowest pids - pids = A.pids[ntuple(i -> i in region ? (1:1) : (:), ndims(A))...] - chunks = similar(pids, Future) - @sync for i in eachindex(pids) - @async chunks[i...] = remotecall_wait(() -> Base.reducedim_initarray0(localpart(A), region, v0, R), pids[i...]) - end - return DArray(chunks) -end -Base.reducedim_initarray0{T}(A::DArray, region, v0::T) = Base.reducedim_initarray0(A, region, v0, T) - -# Compute mapreducedim of each localpart and store the result in a new DArray -mapreducedim_within(f, op, A::DArray, region) = begin - arraysize = [size(A)...] - gridsize = [size(A.indexes)...] - arraysize[[region...]] = gridsize[[region...]] - indx = similar(A.indexes) - for i in CartesianRange(size(indx)) - indx[i] = ntuple(j -> j in region ? (i.I[j]:i.I[j]) : A.indexes[i][j], ndims(A)) - end - cuts = [i in region ? collect(1:arraysize[i] + 1) : A.cuts[i] for i in 1:ndims(A)] - return DArray(next_did(), I -> mapreducedim(f, op, localpart(A), region), - tuple(arraysize...), procs(A), indx, cuts) -end - -# Compute mapreducedim accros the processes. This should be done after mapreducedim -# has been run on each localpart with mapreducedim_within. Eventually, we might -# want to write mapreducedim_between! as a binary reduction. -function mapreducedim_between!(f, op, R::DArray, A::DArray, region) - @sync for p in procs(R) - @async remotecall_fetch(p, f, op, R, A, region) do f, op, R, A, region - localind = [r for r = localindexes(A)] - localind[[region...]] = [1:n for n = size(A)[[region...]]] - B = convert(Array, A[localind...]) - Base.mapreducedim!(f, op, localpart(R), B) - nothing - end - end - return R -end - -Base.mapreducedim!(f, op, R::DArray, A::DArray) = begin - lsize = Base.check_reducedims(R,A) - if isempty(A) - return copy(R) - end - region = tuple(collect(1:ndims(A))[[size(R)...] .!= [size(A)...]]...) - if isempty(region) - return copy!(R, A) - end - B = mapreducedim_within(f, op, A, region) - return mapreducedim_between!(identity, op, R, B, region) -end - -Base.mapreducedim(f, op, R::DArray, A::DArray) = begin - Base.mapreducedim!(f, op, Base.reducedim_initarray(A, region, v0), A) -end - -function nnz(A::DArray) - B = Array(Any, size(A.pids)) - @sync begin - for i in eachindex(A.pids) - @async B[i...] = remotecall_fetch(x -> nnz(localpart(x)), A.pids[i...], A) - end - end - return reduce(+, B) -end - -# LinAlg -Base.scale!(A::DArray, x::Number) = begin - @sync for p in procs(A) - @async remotecall_fetch((A,x)->(scale!(localpart(A), x); nothing), p, A, x) - end - return A -end - -# TODO -# - avoid temporary darrays being created by sum, mean, std, etc when called along specific dimensions - -# reduce like -for (fn, fr) in ((:sum, :+), - (:prod, :*), - (:maximum, :max), - (:minimum, :min), - (:any, :|), - (:all, :&)) - @eval (Base.$fn)(d::DArray) = reduce(@functorize($fr), d) -end - -# mapreduce like -for (fn, fr1, fr2) in ((:maxabs, :abs, :max), - (:minabs, :abs, :min), - (:sumabs, :abs, :+), - (:sumabs2, :abs2, :+)) - @eval (Base.$fn)(d::DArray) = mapreduce(@functorize($fr1), @functorize($fr2), d) -end - -# semi mapreduce -for (fn, fr) in ((:any, :|), - (:all, :&), - (:count, :+)) - @eval begin - (Base.$fn)(f::typeof(@functorize(identity)), d::DArray) = mapreduce(f, @functorize($fr), d) - (Base.$fn)(f::Base.Predicate, d::DArray) = mapreduce(f, @functorize($fr), d) - # (Base.$fn)(f::Base.Func{1}, d::DArray) = mapreduce(f, @functorize $fr, d) - (Base.$fn)(f::Callable, d::DArray) = mapreduce(f, @functorize($fr), d) - end -end - -# scalar ops -(+)(A::DArray{Bool}, x::Bool) = A .+ x -(+)(x::Bool, A::DArray{Bool}) = x .+ A -(-)(A::DArray{Bool}, x::Bool) = A .- x -(-)(x::Bool, A::DArray{Bool}) = x .- A -(+)(A::DArray, x::Number) = A .+ x -(+)(x::Number, A::DArray) = x .+ A -(-)(A::DArray, x::Number) = A .- x -(-)(x::Number, A::DArray) = x .- A - -map_localparts(f::Callable, d::DArray) = DArray(i->f(localpart(d)), d) -map_localparts(f::Callable, d1::DArray, d2::DArray) = DArray(d1) do I - f(localpart(d1), localpart(d2)) -end - -function map_localparts(f::Callable, DA::DArray, A::Array) - pas = PartitionedSerializer(A, procs(DA), DA.indexes) - DArray(DA) do I - f(localpart(DA), verify_and_get(pas, I)) - end -end - -function map_localparts(f::Callable, A::Array, DA::DArray) - pas = PartitionedSerializer(A, procs(DA), DA.indexes) - DArray(DA) do I - f(verify_and_get(pas, I), localpart(DA)) - end -end - -function map_localparts!(f::Callable, d::DArray) - @sync for p in procs(d) - @async remotecall_fetch((f,d)->(f(localpart(d)); nothing), p, f, d) - end - return d -end - -# Here we assume all the DArrays have -# the same size and distribution -map_localparts(f::Callable, As::DArray...) = DArray(I->f(map(localpart, As)...), As[1]) - -for f in (:.+, :.-, :.*, :./, :.%, :.<<, :.>>, :div, :mod, :rem, :&, :|, :$) - @eval begin - ($f){T}(A::DArray{T}, B::Number) = map_localparts(r->($f)(r, B), A) - ($f){T}(A::Number, B::DArray{T}) = map_localparts(r->($f)(A, r), B) - end -end - -function samedist(A::DArray, B::DArray) - (size(A) == size(B)) || throw(DimensionMismatch()) - if (procs(A) != procs(B)) || (A.cuts != B.cuts) - B = DArray(x->B[x...], A) - end - B -end - -for f in (:+, :-, :div, :mod, :rem, :&, :|, :$) - @eval begin - function ($f){T}(A::DArray{T}, B::DArray{T}) - B = samedist(A, B) - map_localparts($f, A, B) - end - ($f){T}(A::DArray{T}, B::Array{T}) = ($f)(A, distribute(B, A)) - ($f){T}(A::Array{T}, B::DArray{T}) = ($f)(distribute(A, B), B) - end -end -for f in (:.+, :.-, :.*, :./, :.%, :.<<, :.>>) - @eval begin - function ($f){T}(A::DArray{T}, B::DArray{T}) - map_localparts($f, A, B) - end - ($f){T}(A::DArray{T}, B::Array{T}) = ($f)(A, distribute(B, A)) - ($f){T}(A::Array{T}, B::DArray{T}) = ($f)(distribute(A, B), B) - end -end - -function Base.ctranspose{T}(D::DArray{T,2}) - DArray(reverse(size(D)), procs(D)) do I - lp = Array(T, map(length, I)) - rp = convert(Array, D[reverse(I)...]) - ctranspose!(lp, rp) - end -end - -function Base.transpose{T}(D::DArray{T,2}) - DArray(reverse(size(D)), procs(D)) do I - lp = Array(T, map(length, I)) - rp = convert(Array, D[reverse(I)...]) - transpose!(lp, rp) - end -end - -for f in (:abs, :abs2, :acos, :acosd, :acosh, :acot, :acotd, :acoth, - :acsc, :acscd, :acsch, :angle, :asec, :asecd, :asech, :asin, - :asind, :asinh, :atan, :atand, :atanh, :big, :cbrt, :ceil, :cis, - :complex, :cos, :cosc, :cosd, :cosh, :cospi, :cot, :cotd, :coth, - :csc, :cscd, :csch, :dawson, :deg2rad, :digamma, :erf, :erfc, - :erfcinv, :erfcx, :erfi, :erfinv, :exp, :exp10, :exp2, :expm1, - :exponent, :float, :floor, :gamma, :imag, :invdigamma, :isfinite, - :isinf, :isnan, :lfact, :lgamma, :log, :log10, :log1p, :log2, :rad2deg, - :real, :sec, :secd, :sech, :sign, :sin, :sinc, :sind, :sinh, :sinpi, - :sqrt, :tan, :tand, :tanh, :trigamma) - @eval begin - ($f)(A::DArray) = map($f, A) - end -end - -function mapslices{T,N}(f::Function, D::DArray{T,N}, dims::AbstractVector) - #Ensure that the complete DArray is available on the specified dims on all processors - for d in dims - for idxs in D.indexes - if length(idxs[d]) != size(D, d) - throw(DimensionMismatch(string("dimension $d is distributed. ", - "mapslices requires dimension $d to be completely available on all processors."))) - end - end - end - - refs = Future[remotecall((x,y,z)->mapslices(x,localpart(y),z), p, f, D, dims) for p in procs(D)] - - DArray(reshape(refs, size(procs(D)))) -end - -function _ppeval(f, A...; dim = map(ndims, A)) - if length(dim) != length(A) - throw(ArgumentError("dim argument has wrong length. length(dim) = $(length(dim)) but should be $(length(A))")) - end - narg = length(A) - dimlength = size(A[1], dim[1]) - for i = 2:narg - if dim[i] > 0 && dimlength != size(A[i], dim[i]) - throw(ArgumentError("lengths of broadcast dimensions must be the same. size(A[1], $(dim[1])) = $dimlength but size(A[$i], $(dim[i])) = $(size(A[i], dim[i]))")) - end - end - dims = [] - idx = [] - args = [] - for i = 1:narg - push!(dims, ndims(A[i])) - push!(idx, Any[1:size(A[i], d) for d in 1:dims[i]]) - if dim[i] > 0 - idx[i][dim[i]] = 1 - push!(args, view(A[i], idx[i]...)) - else - push!(args, A[i]) - end - end - R1 = f(args...) - ridx = Any[1:size(R1, d) for d in 1:ndims(R1)] - push!(ridx, 1) - Rsize = map(last, ridx) - Rsize[end] = dimlength - R = Array(eltype(R1), Rsize...) - - for i = 1:dimlength - for j = 1:narg - if dim[j] > 0 - idx[j][dim[j]] = i - args[j] = view(A[j], idx[j]...) - else - args[j] = A[j] - end - end - ridx[end] = i - R[ridx...] = f(args...) - end - - return R -end - -""" - ppeval(f, D...; dim::NTuple) - -Evaluates the callable argument `f` on slices of the elements of the `D` tuple. - -#### Arguments -`f` can be any callable object that accepts sliced or broadcasted elements of `D`. -The result returned from `f` must be either an array or a scalar. - -`D` has any number of elements and the alements can have any type. If an element -of `D` is a distributed array along the dimension specified by `dim`. If an -element of `D` is not distributed, the element is by default broadcasted and -applied on all evaluations of `f`. - -`dim` is a tuple of integers specifying the dimension over which the elements -of `D` is slices. The length of the tuple must therefore be the same as the -number of arguments `D`. By default distributed arrays are slides along the -last dimension. If the value is less than or equal to zero the element are -broadcasted to all evaluations of `f`. - -#### Result -`ppeval` returns a distributed array of dimension `p+1` where the first `p` -sizes correspond to the sizes of return values of `f`. The last dimention of -the return array from `ppeval` has the same length as the dimension over which -the input arrays are sliced. - -#### Examples -```jl -addprocs(JULIA_CPU_CORES) - -using DistributedArrays - -A = drandn((10, 10, JULIA_CPU_CORES), workers(), [1, 1, JULIA_CPU_CORES]) - -ppeval(eigvals, A) - -ppeval(eigvals, A, randn(10,10)) # broadcasting second argument - -B = drandn((10, JULIA_CPU_CORES), workers(), [1, JULIA_CPU_CORES]) - -ppeval(*, A, B) -``` -""" -function ppeval(f, D...; dim::NTuple = map(t -> isa(t, DArray) ? ndims(t) : 0, D)) - #Ensure that the complete DArray is available on the specified dims on all processors - for i = 1:length(D) - if isa(D[i], DArray) - for idxs in D[i].indexes - for d in setdiff(1:ndims(D[i]), dim[i]) - if length(idxs[d]) != size(D[i], d) - throw(DimensionMismatch(string("dimension $d is distributed. ", - "ppeval requires dimension $d to be completely available on all processors."))) - end - end - end - end - end - - refs = Future[remotecall((x, y, z) -> _ppeval(x, map(localpart, y)...; dim = z), p, f, D, dim) for p in procs(D[1])] - - # The array of Futures has to be reshaped for the DArray constructor to work correctly. - # This requires a fetch and the DArray is also fetching so it might be better to modify - # the DArray constructor. - sd = [size(D[1].pids)...] - nd = remotecall_fetch((r)->ndims(fetch(r)), refs[1].where, refs[1]) - DArray(reshape(refs, tuple([sd[1:nd - 1], sd[end];]...))) -end - -typealias DVector{T,A} DArray{T,1,A} -typealias DMatrix{T,A} DArray{T,2,A} - -# Level 1 - -function axpy!(α, x::DVector, y::DVector) - if length(x) != length(y) - throw(DimensionMismatch("vectors must have same length")) - end - @sync for p in procs(y) - @async remotecall_fetch(() -> (Base.axpy!(α, localpart(x), localpart(y)); nothing), p) - end - return y -end - -function dot(x::DVector, y::DVector) - if length(x) != length(y) - throw(DimensionMismatch("")) - end - if (procs(x) != procs(y)) || (x.cuts != y.cuts) - throw(ArgumentError("vectors don't have the same distribution. Not handled for efficiency reasons.")) - end - - results=Any[] - @sync begin - for i = eachindex(x.pids) - @async push!(results, remotecall_fetch((x, y, i) -> dot(localpart(x), fetch(y, i)), x.pids[i], x, y, i)) - end - end - return reduce(@functorize(+), results) -end - -function norm(x::DVector, p::Real = 2) - results = [] - @sync begin - for pp in procs(x) - @async push!(results, remotecall_fetch(() -> norm(localpart(x), p), pp)) - end - end - return norm(results, p) -end - -# Level 2 -function add!(dest, src, scale = one(dest[1])) - if length(dest) != length(src) - throw(DimensionMismatch("source and destination arrays must have same number of elements")) - end - if scale == one(scale) - @simd for i = eachindex(dest) - @inbounds dest[i] += src[i] - end - else - @simd for i = eachindex(dest) - @inbounds dest[i] += scale*src[i] - end - end - return dest -end - -localtype{T,N,S}(A::DArray{T,N,S}) = S -localtype(A::AbstractArray) = typeof(A) - -function A_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVector) - - # error checks - if size(A, 2) != length(x) - throw(DimensionMismatch("")) - end - if y.cuts[1] != A.cuts[1] - throw(ArgumentError("cuts of output vector must match cuts of first dimension of matrix")) - end - - # Multiply on each tile of A - R = Array(Future, size(A.pids)...) - for j = 1:size(A.pids, 2) - xj = x[A.cuts[2][j]:A.cuts[2][j + 1] - 1] - for i = 1:size(A.pids, 1) - R[i,j] = remotecall(procs(A)[i,j]) do - localpart(A)*convert(localtype(x), xj) - end - end - end - - # Scale y if necessary - if β != one(β) - @sync for p in y.pids - if β != zero(β) - @async remotecall_fetch(y -> (scale!(localpart(y), β); nothing), p, y) - else - @async remotecall_fetch(y -> (fill!(localpart(y), 0); nothing), p, y) - end - end - end - - # Update y - @sync for i = 1:size(R, 1) - p = y.pids[i] - for j = 1:size(R, 2) - rij = R[i,j] - @async remotecall_fetch(() -> (add!(localpart(y), fetch(rij), α); nothing), p) - end - end - - return y -end - -function Ac_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVector) - - # error checks - if size(A, 1) != length(x) - throw(DimensionMismatch("")) - end - if y.cuts[1] != A.cuts[2] - throw(ArgumentError("cuts of output vector must match cuts of second dimension of matrix")) - end - - # Multiply on each tile of A - R = Array(Future, reverse(size(A.pids))...) - for j = 1:size(A.pids, 1) - xj = x[A.cuts[1][j]:A.cuts[1][j + 1] - 1] - for i = 1:size(A.pids, 2) - R[i,j] = remotecall(() -> localpart(A)'*convert(localtype(x), xj), procs(A)[j,i]) - end - end - - # Scale y if necessary - if β != one(β) - @sync for p in y.pids - if β != zero(β) - @async remotecall_fetch(() -> (scale!(localpart(y), β); nothing), p) - else - @async remotecall_fetch(() -> (fill!(localpart(y), 0); nothing), p) - end - end - end - - # Update y - @sync for i = 1:size(R, 1) - p = y.pids[i] - for j = 1:size(R, 2) - rij = R[i,j] - @async remotecall_fetch(() -> (add!(localpart(y), fetch(rij), α); nothing), p) - end - end - return y -end - - -# Level 3 -function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix, tA) - # error checks - Ad1, Ad2 = (tA == 'N') ? (1,2) : (2,1) - mA, nA = size(A, Ad1, Ad2) - mB, nB = size(B) - if mB != nA - throw(DimensionMismatch("matrix A has dimensions ($mA, $nA), matrix B has dimensions ($mB, $nB)")) - end - if size(C,1) != mA || size(C,2) != nB - throw(DimensionMismatch("result C has dimensions $(size(C)), needs ($mA, $nB)")) - end - if C.cuts[1] != A.cuts[Ad1] - throw(ArgumentError("cuts of the first dimension of the output matrix must match cuts of dimension $Ad1 of the first input matrix")) - end - - # Multiply on each tile of A - if tA == 'N' - R = Array(Future, size(procs(A))..., size(procs(C), 2)) - else - R = Array(Future, reverse(size(procs(A)))..., size(procs(C), 2)) - end - for j = 1:size(A.pids, Ad2) - for k = 1:size(C.pids, 2) - Acuts = A.cuts[Ad2] - Ccuts = C.cuts[2] - Bjk = B[Acuts[j]:Acuts[j + 1] - 1, Ccuts[k]:Ccuts[k + 1] - 1] - for i = 1:size(A.pids, Ad1) - p = (tA == 'N') ? procs(A)[i,j] : procs(A)[j,i] - R[i,j,k] = remotecall(p) do - if tA == 'T' - return localpart(A).'*convert(localtype(B), Bjk) - elseif tA == 'C' - return localpart(A)'*convert(localtype(B), Bjk) - else - return localpart(A)*convert(localtype(B), Bjk) - end - end - end - end - end - - # Scale C if necessary - if β != one(β) - @sync for p in C.pids - if β != zero(β) - @async remotecall_fetch(() -> (scale!(localpart(C), β); nothing), p) - else - @async remotecall_fetch(() -> (fill!(localpart(C), 0); nothing), p) - end - end - end - - # Update C - @sync for i = 1:size(R, 1) - for k = 1:size(C.pids, 2) - p = C.pids[i,k] - for j = 1:size(R, 2) - rijk = R[i,j,k] - @async remotecall_fetch(d -> (add!(localpart(d), fetch(rijk), α); nothing), p, C) - end - end - end - return C -end - -A_mul_B!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix) = _matmatmul!(α, A, B, β, C, 'N') -Ac_mul_B!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix) = _matmatmul!(α, A, B, β, C, 'C') -At_mul_B!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix) = _matmatmul!(α, A, B, β, C, 'T') -At_mul_B!(C::DMatrix, A::DMatrix, B::AbstractMatrix) = At_mul_B!(one(eltype(C)), A, B, zero(eltype(C)), C) - -function (*)(A::DMatrix, x::AbstractVector) - T = promote_type(Base.LinAlg.arithtype(eltype(A)), Base.LinAlg.arithtype(eltype(x))) - y = DArray(I -> Array(T, map(length, I)), (size(A, 1),), procs(A)[:,1], (size(procs(A), 1),)) - return A_mul_B!(one(T), A, x, zero(T), y) -end -function (*)(A::DMatrix, B::AbstractMatrix) - T = promote_type(Base.LinAlg.arithtype(eltype(A)), Base.LinAlg.arithtype(eltype(B))) - C = DArray(I -> Array(T, map(length, I)), (size(A, 1), size(B, 2)), procs(A)[:,1:min(size(procs(A), 2), size(procs(B), 2))], (size(procs(A), 1), min(size(procs(A), 2), size(procs(B), 2)))) - return A_mul_B!(one(T), A, B, zero(T), C) -end - -function Ac_mul_B(A::DMatrix, x::AbstractVector) - T = promote_type(Base.LinAlg.arithtype(eltype(A)), Base.LinAlg.arithtype(eltype(x))) - y = DArray(I -> Array(T, map(length, I)), (size(A, 2),), procs(A)[1,:], (size(procs(A), 2),)) - return Ac_mul_B!(one(T), A, x, zero(T), y) -end -function Ac_mul_B(A::DMatrix, B::AbstractMatrix) - T = promote_type(Base.LinAlg.arithtype(eltype(A)), Base.LinAlg.arithtype(eltype(B))) - C = DArray(I -> Array(T, map(length, I)), (size(A, 2), size(B, 2)), procs(A)[1:min(size(procs(A), 1), size(procs(B), 2)),:], (size(procs(A), 2), min(size(procs(A), 1), size(procs(B), 2)))) - return Ac_mul_B!(one(T), A, B, zero(T), C) -end - - -# Sorting a DVector using samplesort - -function sample_n_setup_ref(d::DVector, sample_size; kwargs...) - lp = localpart(d) - llp = length(lp) - np = length(procs(d)) - sample_size = llp > sample_size ? sample_size : llp - sorted = sort(lp; kwargs...) - sample = sorted[collect(1:div(llp,sample_size):llp)] - ref = RemoteChannel(()->Channel(np+1)) # To collect parts to be sorted locally later. - # First element is the locally sorted vector - put!(ref, sorted) - return (sample, ref) -end - - -function scatter_n_sort_localparts{T}(d, myidx, refs::Array{RemoteChannel}, boundaries::Array{T}; by = identity, kwargs...) - if d==nothing - sorted = take!(refs[myidx]) # First entry in the remote channel is sorted localpart - else - sorted = sort(localpart(d); by = by, kwargs...) - end - - # send respective parts to correct workers, iterate over sorted array - p_sorted = 1 - for (i,r) in enumerate(refs) - p_till = length(sorted)+1 - - # calculate range to send to refs[i] - ctr=1 - for x in sorted[p_sorted:end] - if by(x) > by(boundaries[i+1]) - p_till = p_sorted+ctr-1 - break - else - ctr += 1 - end - end - - if p_till == p_sorted - @async put!(r, Array(T,0)) - else - v = sorted[p_sorted:p_till-1] - @async put!(r, v) - end - - p_sorted = p_till - end - - # wait to receive all of my parts from all other workers - lp_sorting=T[] - for _ in refs - v = take!(refs[myidx]) - append!(lp_sorting, v) - end - - sorted_ref=RemoteChannel() - put!(sorted_ref, sort!(lp_sorting; by = by, kwargs...)) - return (sorted_ref, length(lp_sorting)) -end - -function compute_boundaries{T}(d::DVector{T}; kwargs...) - pids = procs(d) - np = length(pids) - sample_sz_on_wrkr = 512 - - results = asyncmap(p -> remotecall_fetch(sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs...), pids) - - samples = Array(T,0) - for x in results - append!(samples, x[1]) - end - sort!(samples; kwargs...) - samples[1] = typemin(T) - - refs=RemoteChannel[x[2] for x in results] - - boundaries = samples[[1+(x-1)*div(length(samples), np) for x in 1:np]] - push!(boundaries, typemax(T)) - - return (boundaries, refs) -end - -""" - sort(d::DVector; sample=true, kwargs...) -> DVector - -Sorts and returns a new distributed vector. - -The sorted vector may not have the same distribution as the original. - -Keyword argument `sample` can take values: - -- `true`: A sample of max size 512 is first taken from all nodes. This is used to balance the distribution of the sorted array on participating workers. Default is `true`. - -- `false`: No sampling is done. Assumes a uniform distribution between min(d) and max(d) - -- 2-element tuple of the form `(min, max)`: No sampling is done. Assumes a uniform distribution between specified min and max values - -- Array{T}: The passed array is assumed to be a sample of the distribution and is used to balance the sorted distribution. - -Keyword argument `alg` takes the same options `Base.sort` -""" -function Base.sort{T}(d::DVector{T}; sample=true, kwargs...) - pids = procs(d) - np = length(pids) - - # Only `alg` and `sample` are supported as keyword arguments - if length(filter(x->!(x in (:alg, :by)), [x[1] for x in kwargs])) > 0 - throw(ArgumentError("Only `alg`, `by` and `sample` are supported as keyword arguments")) - end - - if sample==true - boundaries, refs = compute_boundaries(d; kwargs...) - presorted=true - - elseif sample==false - # Assume an uniform distribution between min and max values - minmax=asyncmap(p->remotecall_fetch(d->(minimum(localpart(d)), maximum(localpart(d))), p, d), pids) - min_d = minimum(T[x[1] for x in minmax]) - max_d = maximum(T[x[2] for x in minmax]) - - return sort(d; sample=(min_d,max_d), kwargs...) - - elseif isa(sample, Tuple) - # Assume an uniform distribution between min and max values in the tuple - lb=sample[1] - ub=sample[2] - - @assert lb<=ub - - s = Array(T, np) - part = abs(ub - lb)/np - (isnan(part) || isinf(part)) && throw(ArgumentError("lower and upper bounds must not be infinities")) - - for n in 1:np - v = lb + (n-1)*part - if T <: Integer - s[n] = round(v) - else - s[n] = v - end - end - return sort(d; sample=s, kwargs...) - - elseif isa(sample, Array) - # Provided array is used as a sample - samples = sort(copy(sample)) - samples[1] = typemin(T) - boundaries = samples[[1+(x-1)*div(length(samples), np) for x in 1:np]] - push!(boundaries, typemax(T)) - presorted=false - - refs=RemoteChannel[RemoteChannel(p) for p in procs(d)] - else - throw(ArgumentError("keyword arg `sample` must be Boolean, Tuple(Min,Max) or an actual sample of data : " * string(sample))) - end - - local_sort_results = Array(Tuple, np) - - Base.asyncmap!((i,p) -> remotecall_fetch( - scatter_n_sort_localparts, p, presorted ? nothing : d, i, refs, boundaries; kwargs...), - local_sort_results, 1:np, pids) - - # Construct a new DArray from the sorted refs. Remove parts with 0-length since - # the DArray constructor_from_refs does not yet support it. This implies that - # the participating workers for the sorted darray may be different from the original - # for highly non-uniform distributions. - local_sorted_refs = RemoteChannel[x[1] for x in filter(x->x[2]>0, local_sort_results)] - return DArray(local_sorted_refs) -end - -# Serialize only those parts of the object as required by the destination worker. -type PartitionedSerializer - indexable_obj # An indexable object, Array, SparseMatrix, etc. - # Complete object on the serializing side. - # Part object on the deserialized side. - pids::Nullable{Array} - idxs::Nullable{Array} - local_idxs::Nullable{Tuple} - - PartitionedSerializer(obj, local_idxs::Tuple) = new(obj, Nullable{Array}(), Nullable{Array}(), local_idxs) - function PartitionedSerializer(obj, pids::Array, idxs::Array) - pas = new(obj,pids,idxs,Nullable{Tuple}()) - - if myid() in pids - pas.local_idxs = idxs[findfirst(pids, myid())] - end - return pas - end -end - -function Base.serialize(S::AbstractSerializer, pas::PartitionedSerializer) - pid = Base.worker_id_from_socket(S.io) - I = get(pas.idxs)[findfirst(get(pas.pids), pid)] - Serializer.serialize_type(S, typeof(pas)) - serialize(S, pas.indexable_obj[I...]) - serialize(S, I) -end - -function Base.deserialize{T<:PartitionedSerializer}(S::AbstractSerializer, t::Type{T}) - obj_part = deserialize(S) - I = deserialize(S) - return PartitionedSerializer(obj_part, I) -end - -function verify_and_get(pas::PartitionedSerializer, I) - # Handle the special case where myid() is part of pas.pids. - # For this case serialize/deserialize is not called as the remotecall is executed locally - if myid() in get(pas.pids, []) - @assert I == get(pas.idxs)[findfirst(get(pas.pids),myid())] - return pas.indexable_obj[I...] - else - @assert I == get(pas.local_idxs, ()) - return pas.indexable_obj - end -end +include("core.jl") +include("serialize.jl") +include("mapreduce.jl") +include("linalg.jl") +include("sort.jl") end # module diff --git a/src/core.jl b/src/core.jl new file mode 100644 index 0000000..9201b4e --- /dev/null +++ b/src/core.jl @@ -0,0 +1,712 @@ +const registry=Dict{Tuple, Any}() +const refs=Set() # Collection of darray identities created on this node + +let DID::Int = 1 + global next_did + next_did() = (id = DID; DID += 1; (myid(), id)) +end + +""" + next_did() + +Produces an incrementing ID that will be used for DArrays. +""" +next_did + +""" + DArray(init, dims, [procs, dist]) + +Construct a distributed array. + +The parameter `init` is a function that accepts a tuple of index ranges. +This function should allocate a local chunk of the distributed array and initialize it for the specified indices. + +`dims` is the overall size of the distributed array. + +`procs` optionally specifies a vector of process IDs to use. +If unspecified, the array is distributed over all worker processes only. Typically, when running in distributed mode, +i.e., nprocs() > 1, this would mean that no chunk of the distributed array exists on the process hosting the +interactive julia prompt. + +`dist` is an integer vector specifying how many chunks the distributed array should be divided into in each dimension. + +For example, the `dfill` function that creates a distributed array and fills it with a value `v` is implemented as: + +### Example +```jl +dfill(v, args...) = DArray(I->fill(v, map(length,I)), args...) +``` +""" +type DArray{T,N,A} <: AbstractArray{T,N} + identity::Tuple + dims::NTuple{N,Int} + pids::Array{Int,N} # pids[i]==p ⇒ processor p has piece i + indexes::Array{NTuple{N,UnitRange{Int}},N} # indexes held by piece i + cuts::Vector{Vector{Int}} # cuts[d][i] = first index of chunk i in dimension d + + release::Bool + + function DArray(identity, dims, pids, indexes, cuts) + # check invariants + if dims != map(last, last(indexes)) + throw(ArgumentError("dimension of DArray (dim) and indexes do not match")) + end + release = (myid() == identity[1]) + + global registry + haskey(registry, (identity, :DARRAY)) && return registry[(identity, :DARRAY)] + + d = new(identity, dims, pids, indexes, cuts, release) + if release + push!(refs, identity) + registry[(identity, :DARRAY)] = d + +# println("Installing finalizer for : ", d.identity, ", : ", object_id(d), ", isbits: ", isbits(d)) + finalizer(d, close) + end + d + end + + DArray() = new() +end + +typealias SubDArray{T,N,D<:DArray} SubArray{T,N,D} +typealias SubOrDArray{T,N} Union{DArray{T,N}, SubDArray{T,N}} + +localtype{T,N,S}(A::DArray{T,N,S}) = S +localtype(A::AbstractArray) = typeof(A) + + +## core constructors ## + +function DArray(identity, init, dims, pids, idxs, cuts) + r=Channel(1) + @sync begin + for i = 1:length(pids) + @async begin + local typA + if isa(init, Function) + typA=remotecall_fetch(construct_localparts, pids[i], init, identity, dims, pids, idxs, cuts) + else + # constructing from an array of remote refs. + typA=remotecall_fetch(construct_localparts, pids[i], init[i], identity, dims, pids, idxs, cuts) + end + !isready(r) && put!(r, typA) + end + end + end + + typA = take!(r) + if myid() in pids + d = registry[(identity, :DARRAY)] + else + d = DArray{eltype(typA),length(dims),typA}(identity, dims, pids, idxs, cuts) + end + d +end + +function construct_localparts(init, identity, dims, pids, idxs, cuts) + A = isa(init, Function) ? init(idxs[localpartindex(pids)]) : fetch(init) + global registry + registry[(identity, :LOCALPART)] = A + typA = typeof(A) + d = DArray{eltype(typA),length(dims),typA}(identity, dims, pids, idxs, cuts) + registry[(identity, :DARRAY)] = d + typA +end + +function DArray(init, dims, procs, dist) + np = prod(dist) + procs = reshape(procs[1:np], ntuple(i->dist[i], length(dist))) + idxs, cuts = chunk_idxs([dims...], dist) + identity = next_did() + + return DArray(identity, init, dims, procs, idxs, cuts) +end + +function DArray(init, dims, procs) + if isempty(procs) + throw(ArgumentError("no processors given")) + end + return DArray(init, dims, procs, defaultdist(dims, procs)) +end +DArray(init, dims) = DArray(init, dims, workers()[1:min(nworkers(), maximum(dims))]) + +# Create a DArray from a collection of references +# The refs must have the same layout as the parts distributed. +# i.e. +# size(refs) must specify the distribution of dimensions across processors +# prod(size(refs)) must equal number of parts +# FIXME : Empty parts are currently not supported. +function DArray(refs) + dimdist = size(refs) + identity = next_did() + + npids = [r.where for r in refs] + nsizes = Array(Tuple, dimdist) + @sync for i in 1:length(refs) + let i=i + @async nsizes[i] = remotecall_fetch(rr_localpart, npids[i], refs[i], identity) + end + end + + nindexes = Array(NTuple{length(dimdist),UnitRange{Int}}, dimdist...) + + for i in 1:length(nindexes) + subidx = ind2sub(dimdist, i) + nindexes[i] = ntuple(length(subidx)) do x + idx_in_dim = subidx[x] + startidx = 1 + for j in 1:(idx_in_dim-1) + prevsubidx = ntuple(y -> y == x ? j : subidx[y], length(subidx)) + prevsize = nsizes[prevsubidx...] + startidx += prevsize[x] + end + startidx:startidx+(nsizes[i][x])-1 + end + end + + lastidxs = hcat([Int[last(idx_in_d)+1 for idx_in_d in idx] for idx in nindexes]...) + ncuts = Array{Int,1}[unshift!(sort(unique(lastidxs[x,:])), 1) for x in 1:length(dimdist)] + ndims = tuple([sort(unique(lastidxs[x,:]))[end]-1 for x in 1:length(dimdist)]...) + + DArray(identity, refs, ndims, reshape(npids, dimdist), nindexes, ncuts) +end + +macro DArray(ex0::Expr) + if ex0.head !== :comprehension + throw(ArgumentError("invalid @DArray syntax")) + end + ex = ex0.args[1] + if ex.head !== :generator + throw(ArgumentError("invalid @DArray syntax")) + end + ex.args[1] = esc(ex.args[1]) + ndim = length(ex.args) - 1 + ranges = map(r->esc(r.args[2]), ex.args[2:end]) + for d = 1:ndim + var = ex.args[d+1].args[1] + ex.args[d+1] = :( $(esc(var)) = ($(ranges[d]))[I[$d]] ) + end + return :( DArray((I::Tuple{Vararg{UnitRange{Int}}})->($ex0), + tuple($(map(r->:(length($r)), ranges)...))) ) +end + +# new DArray similar to an existing one +DArray(init, d::DArray) = DArray(next_did(), init, size(d), procs(d), d.indexes, d.cuts) + +function release_localpart(identity) + global registry + delete!(registry, (identity, :DARRAY)) + delete!(registry, (identity, :LOCALPART)) + nothing +end +release_localpart(d::DArray) = release_localpart(d.identity) + +function close_by_identity(identity, pids) +# @schedule println("Finalizer for : ", identity) + global refs + @sync begin + for p in pids + @async remotecall_fetch(release_localpart, p, identity) + end + if !(myid() in pids) + release_localpart(identity) + end + end + delete!(refs, identity) + nothing +end + +function close(d::DArray) +# @schedule println("close : ", d.identity, ", object_id : ", object_id(d), ", myid : ", myid() ) + if (myid() == d.identity[1]) && d.release + @schedule close_by_identity(d.identity, d.pids) + d.release = false + end + nothing +end + +function darray_closeall() + global registry + global refs + crefs = copy(refs) + for identity in crefs + if identity[1] == myid() # sanity check + haskey(registry, (identity, :DARRAY)) && close(registry[(identity, :DARRAY)]) + yield() + end + end +end + +function rr_localpart(ref, identity) + global registry + lp = fetch(ref) + registry[(identity, :LOCALPART)] = lp + return size(lp) +end + + +Base.similar(d::DArray, T::Type, dims::Dims) = DArray(I->Array(T, map(length,I)), dims, procs(d)) +Base.similar(d::DArray, T::Type) = similar(d, T, size(d)) +Base.similar{T}(d::DArray{T}, dims::Dims) = similar(d, T, dims) +Base.similar{T}(d::DArray{T}) = similar(d, T, size(d)) + +Base.size(d::DArray) = d.dims + + +""" + procs(d::DArray) + +Get the vector of processes storing pieces of DArray `d`. +""" +Base.procs(d::DArray) = d.pids + +chunktype{T,N,A}(d::DArray{T,N,A}) = A + +## chunk index utilities ## + +# decide how to divide each dimension +# returns size of chunks array +function defaultdist(dims, pids) + dims = [dims...] + chunks = ones(Int, length(dims)) + np = length(pids) + f = sort!(collect(keys(factor(np))), rev=true) + k = 1 + while np > 1 + # repeatedly allocate largest factor to largest dim + if np % f[k] != 0 + k += 1 + if k > length(f) + break + end + end + fac = f[k] + (d, dno) = findmax(dims) + # resolve ties to highest dim + dno = last(find(dims .== d)) + if dims[dno] >= fac + dims[dno] = div(dims[dno], fac) + chunks[dno] *= fac + end + np = div(np, fac) + end + return chunks +end + +# get array of start indexes for dividing sz into nc chunks +function defaultdist(sz::Int, nc::Int) + if sz >= nc + return round(Int, linspace(1, sz+1, nc+1)) + else + return [[1:(sz+1);], zeros(Int, nc-sz);] + end +end + +# compute indexes array for dividing dims into chunks +function chunk_idxs(dims, chunks) + cuts = map(defaultdist, dims, chunks) + n = length(dims) + idxs = Array(NTuple{n,UnitRange{Int}},chunks...) + for cidx in CartesianRange(tuple(chunks...)) + idxs[cidx.I...] = ntuple(i -> (cuts[i][cidx[i]]:cuts[i][cidx[i] + 1] - 1), n) + end + return (idxs, cuts) +end + +function localpartindex(pids::Array{Int}) + mi = myid() + for i = 1:length(pids) + if pids[i] == mi + return i + end + end + return 0 +end +localpartindex(d::DArray) = localpartindex(procs(d)) + +""" + localpart(d::DArray) + +Get the local piece of a distributed array. +Returns an empty array if no local part exists on the calling process. +""" +function localpart{T,N,A}(d::DArray{T,N,A}) + lpidx = localpartindex(d) + if lpidx == 0 + return convert(A, Array(T, ntuple(zero, N)))::A + end + + global registry + return registry[(d.identity, :LOCALPART)]::A +end + +localpart(d::DArray, localidx...) = localpart(d)[localidx...] + +# fetch localpart of d at pids[i] +fetch{T,N,A}(d::DArray{T,N,A}, i) = remotecall_fetch(localpart, d.pids[i], d) + +""" + localpart(A) + +The identity when input is not distributed +""" +localpart(A) = A + +""" + localindexes(d) + +A tuple describing the indexes owned by the local process. +Returns a tuple with empty ranges if no local part exists on the calling process. +""" +function localindexes(d::DArray) + lpidx = localpartindex(d) + if lpidx == 0 + return ntuple(i -> 1:0, ndims(d)) + end + return d.indexes[lpidx] +end + +# find which piece holds index (I...) +locate(d::DArray, I::Int...) = + ntuple(i -> searchsortedlast(d.cuts[i], I[i]), ndims(d)) + +chunk{T,N,A}(d::DArray{T,N,A}, i...) = remotecall_fetch(localpart, d.pids[i...], d)::A + +## convenience constructors ## + +""" + dzeros(dims, ...) + +Construct a distributed array of zeros. +Trailing arguments are the same as those accepted by `DArray`. +""" +dzeros(dims::Dims, args...) = DArray(I->zeros(map(length,I)), dims, args...) +dzeros{T}(::Type{T}, dims::Dims, args...) = DArray(I->zeros(T,map(length,I)), dims, args...) +dzeros{T}(::Type{T}, d1::Integer, drest::Integer...) = dzeros(T, convert(Dims, tuple(d1, drest...))) +dzeros(d1::Integer, drest::Integer...) = dzeros(Float64, convert(Dims, tuple(d1, drest...))) +dzeros(d::Dims) = dzeros(Float64, d) + + +""" + dones(dims, ...) + +Construct a distributed array of ones. +Trailing arguments are the same as those accepted by `DArray`. +""" +dones(dims::Dims, args...) = DArray(I->ones(map(length,I)), dims, args...) +dones{T}(::Type{T}, dims::Dims, args...) = DArray(I->ones(T,map(length,I)), dims, args...) +dones{T}(::Type{T}, d1::Integer, drest::Integer...) = dones(T, convert(Dims, tuple(d1, drest...))) +dones(d1::Integer, drest::Integer...) = dones(Float64, convert(Dims, tuple(d1, drest...))) +dones(d::Dims) = dones(Float64, d) + +""" + dfill(x, dims, ...) + +Construct a distributed array filled with value `x`. +Trailing arguments are the same as those accepted by `DArray`. +""" +dfill(v, dims::Dims, args...) = DArray(I->fill(v, map(length,I)), dims, args...) +dfill(v, d1::Integer, drest::Integer...) = dfill(v, convert(Dims, tuple(d1, drest...))) + +""" + drand(dims, ...) + +Construct a distributed uniform random array. +Trailing arguments are the same as those accepted by `DArray`. +""" +drand{T}(::Type{T}, dims::Dims, args...) = DArray(I->rand(T,map(length,I)), dims, args...) +drand{T}(::Type{T}, d1::Integer, drest::Integer...) = drand(T, convert(Dims, tuple(d1, drest...))) +drand(d1::Integer, drest::Integer...) = drand(Float64, convert(Dims, tuple(d1, drest...))) +drand(d::Dims, args...) = drand(Float64, d, args...) + +""" + drandn(dims, ...) + +Construct a distributed normal random array. +Trailing arguments are the same as those accepted by `DArray`. +""" +drandn(dims::Dims, args...) = DArray(I->randn(map(length,I)), dims, args...) +drandn(d1::Integer, drest::Integer...) = drandn(convert(Dims, tuple(d1, drest...))) + +## conversions ## + +""" + distribute(A[; procs, dist]) + +Convert a local array to distributed. + +`procs` optionally specifies an array of process IDs to use. (defaults to all workers) +`dist` optionally specifies a vector or tuple of the number of partitions in each dimension +""" +function distribute(A::AbstractArray; + procs = workers()[1:min(nworkers(), maximum(size(A)))], + dist = defaultdist(size(A), procs)) + idxs, _ = chunk_idxs([size(A)...], dist) + + pas = PartitionedSerializer(A, procs, idxs) + return DArray(I->verify_and_get(pas, I), size(A), procs, dist) +end + +""" + distribute(A, DA) + +Distribute a local array `A` like the distributed array `DA`. + +""" +function distribute(A::AbstractArray, DA::DArray) + size(DA) == size(A) || throw(DimensionMismatch("Distributed array has size $(size(DA)) but array has $(size(A))")) + + pas = PartitionedSerializer(A, procs(DA), DA.indexes) + return DArray(I->verify_and_get(pas, I), DA) +end + +Base.convert{T,N,S<:AbstractArray}(::Type{DArray{T,N,S}}, A::S) = distribute(convert(AbstractArray{T,N}, A)) + +Base.convert{S,T,N}(::Type{Array{S,N}}, d::DArray{T,N}) = begin + a = Array(S, size(d)) + @sync begin + for i = 1:length(d.pids) + @async a[d.indexes[i]...] = chunk(d, i) + end + end + return a +end + +Base.convert{S,T,N}(::Type{Array{S,N}}, s::SubDArray{T,N}) = begin + I = s.indexes + d = s.parent + if isa(I,Tuple{Vararg{UnitRange{Int}}}) && S<:T && T<:S + l = locate(d, map(first, I)...) + if isequal(d.indexes[l...], I) + # SubDArray corresponds to a chunk + return chunk(d, l...) + end + end + a = Array(S, size(s)) + a[[1:size(a,i) for i=1:N]...] = s + return a +end + +function Base.convert{T,N}(::Type{DArray}, SD::SubArray{T,N}) + D = SD.parent + DArray(size(SD), procs(D)) do I + TR = typeof(SD.indexes[1]) + lindices = Array(TR, 0) + for (i,r) in zip(I, SD.indexes) + st = step(r) + lrstart = first(r) + st*(first(i)-1) + lrend = first(r) + st*(last(i)-1) + if TR <: UnitRange + push!(lindices, lrstart:lrend) + else + push!(lindices, lrstart:st:lrend) + end + end + convert(Array, D[lindices...]) + end +end + +Base.reshape{T,S<:Array}(A::DArray{T,1,S}, d::Dims) = begin + if prod(d) != length(A) + throw(DimensionMismatch("dimensions must be consistent with array size")) + end + return DArray(d) do I + sz = map(length,I) + d1offs = first(I[1]) + nd = length(I) + + B = Array(T,sz) + nr = size(B,1) + sztail = size(B)[2:end] + + for i=1:div(length(B),nr) + i2 = ind2sub(sztail, i) + globalidx = [ I[j][i2[j-1]] for j=2:nd ] + + a = sub2ind(d, d1offs, globalidx...) + + B[:,i] = A[a:(a+nr-1)] + end + B + end +end + +## indexing ## + +getlocalindex(d::DArray, idx...) = localpart(d)[idx...] +function getindex_tuple{T}(d::DArray{T}, I::Tuple{Vararg{Int}}) + chidx = locate(d, I...) + idxs = d.indexes[chidx...] + localidx = ntuple(i -> (I[i] - first(idxs[i]) + 1), ndims(d)) + pid = d.pids[chidx...] + return remotecall_fetch(getlocalindex, pid, d, localidx...)::T +end + +Base.getindex(d::DArray, i::Int) = getindex_tuple(d, ind2sub(size(d), i)) +Base.getindex(d::DArray, i::Int...) = getindex_tuple(d, i) + +Base.getindex(d::DArray) = d[1] +Base.getindex(d::DArray, I::Union{Int,UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) = view(d, I...) + +Base.copy!(dest::SubOrDArray, src::SubOrDArray) = begin + if !(size(dest) == size(src) && + procs(dest) == procs(src) && + dest.indexes == src.indexes && + dest.cuts == src.cuts) + throw(DimensionMismatch("destination array doesn't fit to source array")) + end + @sync for p in procs(dest) + @async remotecall_fetch((dest,src)->(copy!(localpart(dest), localpart(src)); nothing), p, dest, src) + end + return dest +end + +# local copies are obtained by convert(Array, ) or assigning from +# a SubDArray to a local Array. + +function Base.setindex!(a::Array, d::DArray, + I::Union{UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) + n = length(I) + @sync for i = 1:length(d.pids) + K = d.indexes[i] + @async a[[I[j][K[j]] for j=1:n]...] = chunk(d, i) + end + return a +end + +# We also want to optimize setindex! with a SubDArray source, but this is hard +# and only works on 0.5. + +# Similar to Base.indexin, but just create a logical mask. Note that this +# must return a logical mask in order to support merging multiple masks +# together into one linear index since we need to know how many elements to +# skip at the end. In many cases range intersection would be much faster +# than generating a logical mask, but that loses the endpoint information. +indexin_mask(a, b::Number) = a .== b +indexin_mask(a, r::Range{Int}) = [i in r for i in a] +indexin_mask(a, b::AbstractArray{Int}) = indexin_mask(a, IntSet(b)) +indexin_mask(a, b::AbstractArray) = indexin_mask(a, Set(b)) +indexin_mask(a, b) = [i in b for i in a] + +import Base: tail +# Given a tuple of indices and a tuple of masks, restrict the indices to the +# valid regions. This is, effectively, reversing Base.setindex_shape_check. +# We can't just use indexing into MergedIndices here because getindex is much +# pickier about singleton dimensions than setindex! is. +restrict_indices(::Tuple{}, ::Tuple{}) = () +function restrict_indices(a::Tuple{Any, Vararg{Any}}, b::Tuple{Any, Vararg{Any}}) + if (length(a[1]) == length(b[1]) == 1) || (length(a[1]) > 1 && length(b[1]) > 1) + (vec(a[1])[vec(b[1])], restrict_indices(tail(a), tail(b))...) + elseif length(a[1]) == 1 + (a[1], restrict_indices(tail(a), b)) + elseif length(b[1]) == 1 && b[1][1] + restrict_indices(a, tail(b)) + else + throw(DimensionMismatch("this should be caught by setindex_shape_check; please submit an issue")) + end +end +# The final indices are funky - they're allowed to accumulate together. +# An easy (albeit very inefficient) fix for too many masks is to use the +# outer product to merge them. But we can do that lazily with a custom type: +function restrict_indices(a::Tuple{Any}, b::Tuple{Any, Any, Vararg{Any}}) + (vec(a[1])[vec(ProductIndices(b, map(length, b)))],) +end +# But too many indices is much harder; this requires merging the indices +# in `a` before applying the final mask in `b`. +function restrict_indices(a::Tuple{Any, Any, Vararg{Any}}, b::Tuple{Any}) + if length(a[1]) == 1 + (a[1], restrict_indices(tail(a), b)) + else + # When one mask spans multiple indices, we need to merge the indices + # together. At this point, we can just use indexing to merge them since + # there's no longer special handling of singleton dimensions + (view(MergedIndices(a, map(length, a)), b[1]),) + end +end + +immutable ProductIndices{I,N} <: AbstractArray{Bool, N} + indices::I + sz::NTuple{N,Int} +end +Base.size(P::ProductIndices) = P.sz +# This gets passed to map to avoid breaking propagation of inbounds +Base.@propagate_inbounds propagate_getindex(A, I...) = A[I...] +Base.@propagate_inbounds Base.getindex{_,N}(P::ProductIndices{_,N}, I::Vararg{Int, N}) = + Bool((&)(map(propagate_getindex, P.indices, I)...)) + +immutable MergedIndices{I,N} <: AbstractArray{CartesianIndex{N}, N} + indices::I + sz::NTuple{N,Int} +end +Base.size(M::MergedIndices) = M.sz +Base.@propagate_inbounds Base.getindex{_,N}(M::MergedIndices{_,N}, I::Vararg{Int, N}) = + CartesianIndex(map(propagate_getindex, M.indices, I)) +# Additionally, we optimize bounds checking when using MergedIndices as an +# array index since checking, e.g., A[1:500, 1:500] is *way* faster than +# checking an array of 500^2 elements of CartesianIndex{2}. This optimization +# also applies to reshapes of MergedIndices since the outer shape of the +# container doesn't affect the index elements themselves. We can go even +# farther and say that even restricted views of MergedIndices must be valid +# over the entire array. This is overly strict in general, but in this +# use-case all the merged indices must be valid at some point, so it's ok. +typealias ReshapedMergedIndices{T,N,M<:MergedIndices} Base.ReshapedArray{T,N,M} +typealias SubMergedIndices{T,N,M<:Union{MergedIndices, ReshapedMergedIndices}} SubArray{T,N,M} +typealias MergedIndicesOrSub Union{MergedIndices, ReshapedMergedIndices, SubMergedIndices} +import Base: checkbounds_indices +@inline checkbounds_indices(::Type{Bool}, inds::Tuple{}, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) = + checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...)) +@inline checkbounds_indices(::Type{Bool}, inds::Tuple{Any}, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) = + checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...)) +@inline checkbounds_indices(::Type{Bool}, inds::Tuple, I::Tuple{MergedIndicesOrSub,Vararg{Any}}) = + checkbounds_indices(Bool, inds, (parent(parent(I[1])).indices..., tail(I)...)) + +# The tricky thing here is that we want to optimize the accesses into the +# distributed array, but in doing so, we lose track of which indices in I we +# should be using. +# +# I’ve come to the conclusion that the function is utterly insane. +# There are *6* flavors of indices with four different reference points: +# 1. Find the indices of each portion of the DArray. +# 2. Find the valid subset of indices for the SubArray into that portion. +# 3. Find the portion of the `I` indices that should be used when you access the +# `K` indices in the subarray. This guy is nasty. It’s totally backwards +# from all other arrays, wherein we simply iterate over the source array’s +# elements. You need to *both* know which elements in `J` were skipped +# (`indexin_mask`) and which dimensions should match up (`restrict_indices`) +# 4. If `K` doesn’t correspond to an entire chunk, reinterpret `K` in terms of +# the local portion of the source array +function Base.setindex!(a::Array, s::SubDArray, + I::Union{UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) + Base.setindex_shape_check(s, Base.index_lengths(a, I...)...) + n = length(I) + d = s.parent + J = Base.decolon(d, s.indexes...) + @sync for i = 1:length(d.pids) + K_c = d.indexes[i] + K = map(intersect, J, K_c) + if !any(isempty, K) + K_mask = map(indexin_mask, J, K_c) + idxs = restrict_indices(Base.decolon(a, I...), K_mask) + if isequal(K, K_c) + # whole chunk + @async a[idxs...] = chunk(d, i) + else + # partial chunk + @async a[idxs...] = + remotecall_fetch(d.pids[i]) do + view(localpart(d), [K[j]-first(K_c[j])+1 for j=1:length(J)]...) + end + end + end + end + return a +end + +Base.fill!(A::DArray, x) = begin + @sync for p in procs(A) + @async remotecall_fetch((A,x)->(fill!(localpart(A), x); nothing), p, A, x) + end + return A +end \ No newline at end of file diff --git a/src/linalg.jl b/src/linalg.jl new file mode 100644 index 0000000..213816d --- /dev/null +++ b/src/linalg.jl @@ -0,0 +1,260 @@ +function Base.ctranspose{T}(D::DArray{T,2}) + DArray(reverse(size(D)), procs(D)) do I + lp = Array(T, map(length, I)) + rp = convert(Array, D[reverse(I)...]) + ctranspose!(lp, rp) + end +end + +function Base.transpose{T}(D::DArray{T,2}) + DArray(reverse(size(D)), procs(D)) do I + lp = Array(T, map(length, I)) + rp = convert(Array, D[reverse(I)...]) + transpose!(lp, rp) + end +end + +typealias DVector{T,A} DArray{T,1,A} +typealias DMatrix{T,A} DArray{T,2,A} + +# Level 1 + +function axpy!(α, x::DVector, y::DVector) + if length(x) != length(y) + throw(DimensionMismatch("vectors must have same length")) + end + @sync for p in procs(y) + @async remotecall_fetch(() -> (Base.axpy!(α, localpart(x), localpart(y)); nothing), p) + end + return y +end + +function dot(x::DVector, y::DVector) + if length(x) != length(y) + throw(DimensionMismatch("")) + end + if (procs(x) != procs(y)) || (x.cuts != y.cuts) + throw(ArgumentError("vectors don't have the same distribution. Not handled for efficiency reasons.")) + end + + results=Any[] + @sync begin + for i = eachindex(x.pids) + @async push!(results, remotecall_fetch((x, y, i) -> dot(localpart(x), fetch(y, i)), x.pids[i], x, y, i)) + end + end + return reduce(@functorize(+), results) +end + +function norm(x::DVector, p::Real = 2) + results = [] + @sync begin + for pp in procs(x) + @async push!(results, remotecall_fetch(() -> norm(localpart(x), p), pp)) + end + end + return norm(results, p) +end + +Base.scale!(A::DArray, x::Number) = begin + @sync for p in procs(A) + @async remotecall_fetch((A,x)->(scale!(localpart(A), x); nothing), p, A, x) + end + return A +end + +# Level 2 +function add!(dest, src, scale = one(dest[1])) + if length(dest) != length(src) + throw(DimensionMismatch("source and destination arrays must have same number of elements")) + end + if scale == one(scale) + @simd for i = eachindex(dest) + @inbounds dest[i] += src[i] + end + else + @simd for i = eachindex(dest) + @inbounds dest[i] += scale*src[i] + end + end + return dest +end + +function A_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVector) + + # error checks + if size(A, 2) != length(x) + throw(DimensionMismatch("")) + end + if y.cuts[1] != A.cuts[1] + throw(ArgumentError("cuts of output vector must match cuts of first dimension of matrix")) + end + + # Multiply on each tile of A + R = Array(Future, size(A.pids)...) + for j = 1:size(A.pids, 2) + xj = x[A.cuts[2][j]:A.cuts[2][j + 1] - 1] + for i = 1:size(A.pids, 1) + R[i,j] = remotecall(procs(A)[i,j]) do + localpart(A)*convert(localtype(x), xj) + end + end + end + + # Scale y if necessary + if β != one(β) + @sync for p in y.pids + if β != zero(β) + @async remotecall_fetch(y -> (scale!(localpart(y), β); nothing), p, y) + else + @async remotecall_fetch(y -> (fill!(localpart(y), 0); nothing), p, y) + end + end + end + + # Update y + @sync for i = 1:size(R, 1) + p = y.pids[i] + for j = 1:size(R, 2) + rij = R[i,j] + @async remotecall_fetch(() -> (add!(localpart(y), fetch(rij), α); nothing), p) + end + end + + return y +end + +function Ac_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVector) + + # error checks + if size(A, 1) != length(x) + throw(DimensionMismatch("")) + end + if y.cuts[1] != A.cuts[2] + throw(ArgumentError("cuts of output vector must match cuts of second dimension of matrix")) + end + + # Multiply on each tile of A + R = Array(Future, reverse(size(A.pids))...) + for j = 1:size(A.pids, 1) + xj = x[A.cuts[1][j]:A.cuts[1][j + 1] - 1] + for i = 1:size(A.pids, 2) + R[i,j] = remotecall(() -> localpart(A)'*convert(localtype(x), xj), procs(A)[j,i]) + end + end + + # Scale y if necessary + if β != one(β) + @sync for p in y.pids + if β != zero(β) + @async remotecall_fetch(() -> (scale!(localpart(y), β); nothing), p) + else + @async remotecall_fetch(() -> (fill!(localpart(y), 0); nothing), p) + end + end + end + + # Update y + @sync for i = 1:size(R, 1) + p = y.pids[i] + for j = 1:size(R, 2) + rij = R[i,j] + @async remotecall_fetch(() -> (add!(localpart(y), fetch(rij), α); nothing), p) + end + end + return y +end + + +# Level 3 +function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix, tA) + # error checks + Ad1, Ad2 = (tA == 'N') ? (1,2) : (2,1) + mA, nA = size(A, Ad1, Ad2) + mB, nB = size(B) + if mB != nA + throw(DimensionMismatch("matrix A has dimensions ($mA, $nA), matrix B has dimensions ($mB, $nB)")) + end + if size(C,1) != mA || size(C,2) != nB + throw(DimensionMismatch("result C has dimensions $(size(C)), needs ($mA, $nB)")) + end + if C.cuts[1] != A.cuts[Ad1] + throw(ArgumentError("cuts of the first dimension of the output matrix must match cuts of dimension $Ad1 of the first input matrix")) + end + + # Multiply on each tile of A + if tA == 'N' + R = Array(Future, size(procs(A))..., size(procs(C), 2)) + else + R = Array(Future, reverse(size(procs(A)))..., size(procs(C), 2)) + end + for j = 1:size(A.pids, Ad2) + for k = 1:size(C.pids, 2) + Acuts = A.cuts[Ad2] + Ccuts = C.cuts[2] + Bjk = B[Acuts[j]:Acuts[j + 1] - 1, Ccuts[k]:Ccuts[k + 1] - 1] + for i = 1:size(A.pids, Ad1) + p = (tA == 'N') ? procs(A)[i,j] : procs(A)[j,i] + R[i,j,k] = remotecall(p) do + if tA == 'T' + return localpart(A).'*convert(localtype(B), Bjk) + elseif tA == 'C' + return localpart(A)'*convert(localtype(B), Bjk) + else + return localpart(A)*convert(localtype(B), Bjk) + end + end + end + end + end + + # Scale C if necessary + if β != one(β) + @sync for p in C.pids + if β != zero(β) + @async remotecall_fetch(() -> (scale!(localpart(C), β); nothing), p) + else + @async remotecall_fetch(() -> (fill!(localpart(C), 0); nothing), p) + end + end + end + + # Update C + @sync for i = 1:size(R, 1) + for k = 1:size(C.pids, 2) + p = C.pids[i,k] + for j = 1:size(R, 2) + rijk = R[i,j,k] + @async remotecall_fetch(d -> (add!(localpart(d), fetch(rijk), α); nothing), p, C) + end + end + end + return C +end + +A_mul_B!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix) = _matmatmul!(α, A, B, β, C, 'N') +Ac_mul_B!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix) = _matmatmul!(α, A, B, β, C, 'C') +At_mul_B!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix) = _matmatmul!(α, A, B, β, C, 'T') +At_mul_B!(C::DMatrix, A::DMatrix, B::AbstractMatrix) = At_mul_B!(one(eltype(C)), A, B, zero(eltype(C)), C) + +function (*)(A::DMatrix, x::AbstractVector) + T = promote_type(Base.LinAlg.arithtype(eltype(A)), Base.LinAlg.arithtype(eltype(x))) + y = DArray(I -> Array(T, map(length, I)), (size(A, 1),), procs(A)[:,1], (size(procs(A), 1),)) + return A_mul_B!(one(T), A, x, zero(T), y) +end +function (*)(A::DMatrix, B::AbstractMatrix) + T = promote_type(Base.LinAlg.arithtype(eltype(A)), Base.LinAlg.arithtype(eltype(B))) + C = DArray(I -> Array(T, map(length, I)), (size(A, 1), size(B, 2)), procs(A)[:,1:min(size(procs(A), 2), size(procs(B), 2))], (size(procs(A), 1), min(size(procs(A), 2), size(procs(B), 2)))) + return A_mul_B!(one(T), A, B, zero(T), C) +end + +function Ac_mul_B(A::DMatrix, x::AbstractVector) + T = promote_type(Base.LinAlg.arithtype(eltype(A)), Base.LinAlg.arithtype(eltype(x))) + y = DArray(I -> Array(T, map(length, I)), (size(A, 2),), procs(A)[1,:], (size(procs(A), 2),)) + return Ac_mul_B!(one(T), A, x, zero(T), y) +end +function Ac_mul_B(A::DMatrix, B::AbstractMatrix) + T = promote_type(Base.LinAlg.arithtype(eltype(A)), Base.LinAlg.arithtype(eltype(B))) + C = DArray(I -> Array(T, map(length, I)), (size(A, 2), size(B, 2)), procs(A)[1:min(size(procs(A), 1), size(procs(B), 2)),:], (size(procs(A), 2), min(size(procs(A), 1), size(procs(B), 2)))) + return Ac_mul_B!(one(T), A, B, zero(T), C) +end diff --git a/src/mapreduce.jl b/src/mapreduce.jl new file mode 100644 index 0000000..6d2d364 --- /dev/null +++ b/src/mapreduce.jl @@ -0,0 +1,370 @@ +## higher-order functions ## + +### We need to define broadcast operations which will soon be very common +### in base through the dot-verctorization syntax + +Base.map(f, d::DArray) = DArray(I->map(f, localpart(d)), d) + +function Base.reduce(f, d::DArray) + results=[] + @sync begin + for p in procs(d) + @async push!(results, remotecall_fetch((f,d)->reduce(f, localpart(d)), p, f, d)) + end + end + reduce(f, results) +end + +function _mapreduce(f, opt, d::DArray) +# TODO Change to an @async remotecall_fetch - will reduce one extra network hop - +# once bug in master is fixed. + results=[] + @sync begin + for p in procs(d) + @async push!(results, remotecall_fetch((f,opt,d)->mapreduce(f, opt, localpart(d)), p, f, opt, d)) + end + end + reduce(opt, results) +end +Base.mapreduce(f, opt::Union{typeof(@functorize(|)), typeof(@functorize(&))}, d::DArray) = _mapreduce(f, opt, d) +Base.mapreduce(f, opt::Function, d::DArray) = _mapreduce(f, opt, d) +Base.mapreduce(f, opt, d::DArray) = _mapreduce(f, opt, d) + +Base.map!{F}(f::F, d::DArray) = begin + @sync for p in procs(d) + @async remotecall_fetch((f,d)->(map!(f, localpart(d)); nothing), p, f, d) + end + return d +end + +# mapreducedim +Base.reducedim_initarray{R}(A::DArray, region, v0, ::Type{R}) = begin + # Store reduction on lowest pids + pids = A.pids[ntuple(i -> i in region ? (1:1) : (:), ndims(A))...] + chunks = similar(pids, Future) + @sync for i in eachindex(pids) + @async chunks[i...] = remotecall_wait(() -> Base.reducedim_initarray(localpart(A), region, v0, R), pids[i...]) + end + return DArray(chunks) +end +Base.reducedim_initarray{T}(A::DArray, region, v0::T) = Base.reducedim_initarray(A, region, v0, T) + +Base.reducedim_initarray0{R}(A::DArray, region, v0, ::Type{R}) = begin + # Store reduction on lowest pids + pids = A.pids[ntuple(i -> i in region ? (1:1) : (:), ndims(A))...] + chunks = similar(pids, Future) + @sync for i in eachindex(pids) + @async chunks[i...] = remotecall_wait(() -> Base.reducedim_initarray0(localpart(A), region, v0, R), pids[i...]) + end + return DArray(chunks) +end +Base.reducedim_initarray0{T}(A::DArray, region, v0::T) = Base.reducedim_initarray0(A, region, v0, T) + +# Compute mapreducedim of each localpart and store the result in a new DArray +mapreducedim_within(f, op, A::DArray, region) = begin + arraysize = [size(A)...] + gridsize = [size(A.indexes)...] + arraysize[[region...]] = gridsize[[region...]] + indx = similar(A.indexes) + for i in CartesianRange(size(indx)) + indx[i] = ntuple(j -> j in region ? (i.I[j]:i.I[j]) : A.indexes[i][j], ndims(A)) + end + cuts = [i in region ? collect(1:arraysize[i] + 1) : A.cuts[i] for i in 1:ndims(A)] + return DArray(next_did(), I -> mapreducedim(f, op, localpart(A), region), + tuple(arraysize...), procs(A), indx, cuts) +end + +# Compute mapreducedim accros the processes. This should be done after mapreducedim +# has been run on each localpart with mapreducedim_within. Eventually, we might +# want to write mapreducedim_between! as a binary reduction. +function mapreducedim_between!(f, op, R::DArray, A::DArray, region) + @sync for p in procs(R) + @async remotecall_fetch(p, f, op, R, A, region) do f, op, R, A, region + localind = [r for r = localindexes(A)] + localind[[region...]] = [1:n for n = size(A)[[region...]]] + B = convert(Array, A[localind...]) + Base.mapreducedim!(f, op, localpart(R), B) + nothing + end + end + return R +end + +Base.mapreducedim!(f, op, R::DArray, A::DArray) = begin + lsize = Base.check_reducedims(R,A) + if isempty(A) + return copy(R) + end + region = tuple(collect(1:ndims(A))[[size(R)...] .!= [size(A)...]]...) + if isempty(region) + return copy!(R, A) + end + B = mapreducedim_within(f, op, A, region) + return mapreducedim_between!(identity, op, R, B, region) +end + +Base.mapreducedim(f, op, R::DArray, A::DArray) = begin + Base.mapreducedim!(f, op, Base.reducedim_initarray(A, region, v0), A) +end + +function nnz(A::DArray) + B = Array(Any, size(A.pids)) + @sync begin + for i in eachindex(A.pids) + @async B[i...] = remotecall_fetch(x -> nnz(localpart(x)), A.pids[i...], A) + end + end + return reduce(+, B) +end + +# reduce like +for (fn, fr) in ((:sum, :+), + (:prod, :*), + (:maximum, :max), + (:minimum, :min), + (:any, :|), + (:all, :&)) + @eval (Base.$fn)(d::DArray) = reduce(@functorize($fr), d) +end + +# mapreduce like +for (fn, fr1, fr2) in ((:maxabs, :abs, :max), + (:minabs, :abs, :min), + (:sumabs, :abs, :+), + (:sumabs2, :abs2, :+)) + @eval (Base.$fn)(d::DArray) = mapreduce(@functorize($fr1), @functorize($fr2), d) +end + +# semi mapreduce +for (fn, fr) in ((:any, :|), + (:all, :&), + (:count, :+)) + @eval begin + (Base.$fn)(f::typeof(@functorize(identity)), d::DArray) = mapreduce(f, @functorize($fr), d) + (Base.$fn)(f::Base.Predicate, d::DArray) = mapreduce(f, @functorize($fr), d) + # (Base.$fn)(f::Base.Func{1}, d::DArray) = mapreduce(f, @functorize $fr, d) + (Base.$fn)(f::Callable, d::DArray) = mapreduce(f, @functorize($fr), d) + end +end + +# scalar ops +(+)(A::DArray{Bool}, x::Bool) = A .+ x +(+)(x::Bool, A::DArray{Bool}) = x .+ A +(-)(A::DArray{Bool}, x::Bool) = A .- x +(-)(x::Bool, A::DArray{Bool}) = x .- A +(+)(A::DArray, x::Number) = A .+ x +(+)(x::Number, A::DArray) = x .+ A +(-)(A::DArray, x::Number) = A .- x +(-)(x::Number, A::DArray) = x .- A + +map_localparts(f::Callable, d::DArray) = DArray(i->f(localpart(d)), d) +map_localparts(f::Callable, d1::DArray, d2::DArray) = DArray(d1) do I + f(localpart(d1), localpart(d2)) +end + +function map_localparts(f::Callable, DA::DArray, A::Array) + pas = PartitionedSerializer(A, procs(DA), DA.indexes) + DArray(DA) do I + f(localpart(DA), verify_and_get(pas, I)) + end +end + +function map_localparts(f::Callable, A::Array, DA::DArray) + pas = PartitionedSerializer(A, procs(DA), DA.indexes) + DArray(DA) do I + f(verify_and_get(pas, I), localpart(DA)) + end +end + +function map_localparts!(f::Callable, d::DArray) + @sync for p in procs(d) + @async remotecall_fetch((f,d)->(f(localpart(d)); nothing), p, f, d) + end + return d +end + +# Here we assume all the DArrays have +# the same size and distribution +map_localparts(f::Callable, As::DArray...) = DArray(I->f(map(localpart, As)...), As[1]) + +for f in (:.+, :.-, :.*, :./, :.%, :.<<, :.>>, :div, :mod, :rem, :&, :|, :$) + @eval begin + ($f){T}(A::DArray{T}, B::Number) = map_localparts(r->($f)(r, B), A) + ($f){T}(A::Number, B::DArray{T}) = map_localparts(r->($f)(A, r), B) + end +end + +function samedist(A::DArray, B::DArray) + (size(A) == size(B)) || throw(DimensionMismatch()) + if (procs(A) != procs(B)) || (A.cuts != B.cuts) + B = DArray(x->B[x...], A) + end + B +end + +for f in (:+, :-, :div, :mod, :rem, :&, :|, :$) + @eval begin + function ($f){T}(A::DArray{T}, B::DArray{T}) + B = samedist(A, B) + map_localparts($f, A, B) + end + ($f){T}(A::DArray{T}, B::Array{T}) = ($f)(A, distribute(B, A)) + ($f){T}(A::Array{T}, B::DArray{T}) = ($f)(distribute(A, B), B) + end +end +for f in (:.+, :.-, :.*, :./, :.%, :.<<, :.>>) + @eval begin + function ($f){T}(A::DArray{T}, B::DArray{T}) + map_localparts($f, A, B) + end + ($f){T}(A::DArray{T}, B::Array{T}) = ($f)(A, distribute(B, A)) + ($f){T}(A::Array{T}, B::DArray{T}) = ($f)(distribute(A, B), B) + end +end + +### Should be deleted when broadcast is defined +for f in (:abs, :abs2, :acos, :acosd, :acosh, :acot, :acotd, :acoth, + :acsc, :acscd, :acsch, :angle, :asec, :asecd, :asech, :asin, + :asind, :asinh, :atan, :atand, :atanh, :big, :cbrt, :ceil, :cis, + :complex, :cos, :cosc, :cosd, :cosh, :cospi, :cot, :cotd, :coth, + :csc, :cscd, :csch, :dawson, :deg2rad, :digamma, :erf, :erfc, + :erfcinv, :erfcx, :erfi, :erfinv, :exp, :exp10, :exp2, :expm1, + :exponent, :float, :floor, :gamma, :imag, :invdigamma, :isfinite, + :isinf, :isnan, :lfact, :lgamma, :log, :log10, :log1p, :log2, :rad2deg, + :real, :sec, :secd, :sech, :sign, :sin, :sinc, :sind, :sinh, :sinpi, + :sqrt, :tan, :tand, :tanh, :trigamma) + @eval begin + ($f)(A::DArray) = map($f, A) + end +end + +function mapslices{T,N}(f::Function, D::DArray{T,N}, dims::AbstractVector) + #Ensure that the complete DArray is available on the specified dims on all processors + for d in dims + for idxs in D.indexes + if length(idxs[d]) != size(D, d) + throw(DimensionMismatch(string("dimension $d is distributed. ", + "mapslices requires dimension $d to be completely available on all processors."))) + end + end + end + + refs = Future[remotecall((x,y,z)->mapslices(x,localpart(y),z), p, f, D, dims) for p in procs(D)] + + DArray(reshape(refs, size(procs(D)))) +end + +function _ppeval(f, A...; dim = map(ndims, A)) + if length(dim) != length(A) + throw(ArgumentError("dim argument has wrong length. length(dim) = $(length(dim)) but should be $(length(A))")) + end + narg = length(A) + dimlength = size(A[1], dim[1]) + for i = 2:narg + if dim[i] > 0 && dimlength != size(A[i], dim[i]) + throw(ArgumentError("lengths of broadcast dimensions must be the same. size(A[1], $(dim[1])) = $dimlength but size(A[$i], $(dim[i])) = $(size(A[i], dim[i]))")) + end + end + dims = [] + idx = [] + args = [] + for i = 1:narg + push!(dims, ndims(A[i])) + push!(idx, Any[1:size(A[i], d) for d in 1:dims[i]]) + if dim[i] > 0 + idx[i][dim[i]] = 1 + push!(args, view(A[i], idx[i]...)) + else + push!(args, A[i]) + end + end + R1 = f(args...) + ridx = Any[1:size(R1, d) for d in 1:ndims(R1)] + push!(ridx, 1) + Rsize = map(last, ridx) + Rsize[end] = dimlength + R = Array(eltype(R1), Rsize...) + + for i = 1:dimlength + for j = 1:narg + if dim[j] > 0 + idx[j][dim[j]] = i + args[j] = view(A[j], idx[j]...) + else + args[j] = A[j] + end + end + ridx[end] = i + R[ridx...] = f(args...) + end + + return R +end + +""" + ppeval(f, D...; dim::NTuple) + +Evaluates the callable argument `f` on slices of the elements of the `D` tuple. + +#### Arguments +`f` can be any callable object that accepts sliced or broadcasted elements of `D`. +The result returned from `f` must be either an array or a scalar. + +`D` has any number of elements and the alements can have any type. If an element +of `D` is a distributed array along the dimension specified by `dim`. If an +element of `D` is not distributed, the element is by default broadcasted and +applied on all evaluations of `f`. + +`dim` is a tuple of integers specifying the dimension over which the elements +of `D` is slices. The length of the tuple must therefore be the same as the +number of arguments `D`. By default distributed arrays are slides along the +last dimension. If the value is less than or equal to zero the element are +broadcasted to all evaluations of `f`. + +#### Result +`ppeval` returns a distributed array of dimension `p+1` where the first `p` +sizes correspond to the sizes of return values of `f`. The last dimention of +the return array from `ppeval` has the same length as the dimension over which +the input arrays are sliced. + +#### Examples +```jl +addprocs(JULIA_CPU_CORES) + +using DistributedArrays + +A = drandn((10, 10, JULIA_CPU_CORES), workers(), [1, 1, JULIA_CPU_CORES]) + +ppeval(eigvals, A) + +ppeval(eigvals, A, randn(10,10)) # broadcasting second argument + +B = drandn((10, JULIA_CPU_CORES), workers(), [1, JULIA_CPU_CORES]) + +ppeval(*, A, B) +``` +""" +function ppeval(f, D...; dim::NTuple = map(t -> isa(t, DArray) ? ndims(t) : 0, D)) + #Ensure that the complete DArray is available on the specified dims on all processors + for i = 1:length(D) + if isa(D[i], DArray) + for idxs in D[i].indexes + for d in setdiff(1:ndims(D[i]), dim[i]) + if length(idxs[d]) != size(D[i], d) + throw(DimensionMismatch(string("dimension $d is distributed. ", + "ppeval requires dimension $d to be completely available on all processors."))) + end + end + end + end + end + + refs = Future[remotecall((x, y, z) -> _ppeval(x, map(localpart, y)...; dim = z), p, f, D, dim) for p in procs(D[1])] + + # The array of Futures has to be reshaped for the DArray constructor to work correctly. + # This requires a fetch and the DArray is also fetching so it might be better to modify + # the DArray constructor. + sd = [size(D[1].pids)...] + nd = remotecall_fetch((r)->ndims(fetch(r)), refs[1].where, refs[1]) + DArray(reshape(refs, tuple([sd[1:nd - 1], sd[end];]...))) +end diff --git a/src/serialize.jl b/src/serialize.jl new file mode 100644 index 0000000..fccfddb --- /dev/null +++ b/src/serialize.jl @@ -0,0 +1,86 @@ +function Base.serialize(S::AbstractSerializer, d::DArray) + # Only send the ident for participating workers - we expect the DArray to exist in the + # remote registry + destpid = Base.worker_id_from_socket(S.io) + Serializer.serialize_type(S, typeof(d)) + if (destpid in d.pids) || (destpid == d.identity[1]) + serialize(S, (true, d.identity)) # (identity_only, identity) + else + serialize(S, (false, d.identity)) + for n in [:dims, :pids, :indexes, :cuts] + serialize(S, getfield(d, n)) + end + end +end + +function Base.deserialize{T<:DArray}(S::AbstractSerializer, t::Type{T}) + what = deserialize(S) + identity_only = what[1] + identity = what[2] + + if identity_only + global registry + if haskey(registry, (identity, :DARRAY)) + return registry[(identity, :DARRAY)] + else + # access to fields will throw an error, at least the deserialization process will not + # result in worker death + d = T() + d.identity = identity + return d + end + else + # We are not a participating worker, deser fields and instantiate locally. + dims = deserialize(S) + pids = deserialize(S) + indexes = deserialize(S) + cuts = deserialize(S) + return T(identity, dims, pids, indexes, cuts) + end +end + +# Serialize only those parts of the object as required by the destination worker. +type PartitionedSerializer + indexable_obj # An indexable object, Array, SparseMatrix, etc. + # Complete object on the serializing side. + # Part object on the deserialized side. + pids::Nullable{Array} + idxs::Nullable{Array} + local_idxs::Nullable{Tuple} + + PartitionedSerializer(obj, local_idxs::Tuple) = new(obj, Nullable{Array}(), Nullable{Array}(), local_idxs) + function PartitionedSerializer(obj, pids::Array, idxs::Array) + pas = new(obj,pids,idxs,Nullable{Tuple}()) + + if myid() in pids + pas.local_idxs = idxs[findfirst(pids, myid())] + end + return pas + end +end + +function Base.serialize(S::AbstractSerializer, pas::PartitionedSerializer) + pid = Base.worker_id_from_socket(S.io) + I = get(pas.idxs)[findfirst(get(pas.pids), pid)] + Serializer.serialize_type(S, typeof(pas)) + serialize(S, pas.indexable_obj[I...]) + serialize(S, I) +end + +function Base.deserialize{T<:PartitionedSerializer}(S::AbstractSerializer, t::Type{T}) + obj_part = deserialize(S) + I = deserialize(S) + return PartitionedSerializer(obj_part, I) +end + +function verify_and_get(pas::PartitionedSerializer, I) + # Handle the special case where myid() is part of pas.pids. + # For this case serialize/deserialize is not called as the remotecall is executed locally + if myid() in get(pas.pids, []) + @assert I == get(pas.idxs)[findfirst(get(pas.pids),myid())] + return pas.indexable_obj[I...] + else + @assert I == get(pas.local_idxs, ()) + return pas.indexable_obj + end +end diff --git a/src/sort.jl b/src/sort.jl new file mode 100644 index 0000000..a23d0f2 --- /dev/null +++ b/src/sort.jl @@ -0,0 +1,170 @@ +# Sorting a DVector using samplesort + +function sample_n_setup_ref(d::DVector, sample_size; kwargs...) + lp = localpart(d) + llp = length(lp) + np = length(procs(d)) + sample_size = llp > sample_size ? sample_size : llp + sorted = sort(lp; kwargs...) + sample = sorted[collect(1:div(llp,sample_size):llp)] + ref = RemoteChannel(()->Channel(np+1)) # To collect parts to be sorted locally later. + # First element is the locally sorted vector + put!(ref, sorted) + return (sample, ref) +end + + +function scatter_n_sort_localparts{T}(d, myidx, refs::Array{RemoteChannel}, boundaries::Array{T}; by = identity, kwargs...) + if d==nothing + sorted = take!(refs[myidx]) # First entry in the remote channel is sorted localpart + else + sorted = sort(localpart(d); by = by, kwargs...) + end + + # send respective parts to correct workers, iterate over sorted array + p_sorted = 1 + for (i,r) in enumerate(refs) + p_till = length(sorted)+1 + + # calculate range to send to refs[i] + ctr=1 + for x in sorted[p_sorted:end] + if by(x) > by(boundaries[i+1]) + p_till = p_sorted+ctr-1 + break + else + ctr += 1 + end + end + + if p_till == p_sorted + @async put!(r, Array(T,0)) + else + v = sorted[p_sorted:p_till-1] + @async put!(r, v) + end + + p_sorted = p_till + end + + # wait to receive all of my parts from all other workers + lp_sorting=T[] + for _ in refs + v = take!(refs[myidx]) + append!(lp_sorting, v) + end + + sorted_ref=RemoteChannel() + put!(sorted_ref, sort!(lp_sorting; by = by, kwargs...)) + return (sorted_ref, length(lp_sorting)) +end + +function compute_boundaries{T}(d::DVector{T}; kwargs...) + pids = procs(d) + np = length(pids) + sample_sz_on_wrkr = 512 + + results = asyncmap(p -> remotecall_fetch(sample_n_setup_ref, p, d, sample_sz_on_wrkr; kwargs...), pids) + + samples = Array(T,0) + for x in results + append!(samples, x[1]) + end + sort!(samples; kwargs...) + samples[1] = typemin(T) + + refs=RemoteChannel[x[2] for x in results] + + boundaries = samples[[1+(x-1)*div(length(samples), np) for x in 1:np]] + push!(boundaries, typemax(T)) + + return (boundaries, refs) +end + +""" + sort(d::DVector; sample=true, kwargs...) -> DVector + +Sorts and returns a new distributed vector. + +The sorted vector may not have the same distribution as the original. + +Keyword argument `sample` can take values: + +- `true`: A sample of max size 512 is first taken from all nodes. This is used to balance the distribution of the sorted array on participating workers. Default is `true`. + +- `false`: No sampling is done. Assumes a uniform distribution between min(d) and max(d) + +- 2-element tuple of the form `(min, max)`: No sampling is done. Assumes a uniform distribution between specified min and max values + +- Array{T}: The passed array is assumed to be a sample of the distribution and is used to balance the sorted distribution. + +Keyword argument `alg` takes the same options `Base.sort` +""" +function Base.sort{T}(d::DVector{T}; sample=true, kwargs...) + pids = procs(d) + np = length(pids) + + # Only `alg` and `sample` are supported as keyword arguments + if length(filter(x->!(x in (:alg, :by)), [x[1] for x in kwargs])) > 0 + throw(ArgumentError("Only `alg`, `by` and `sample` are supported as keyword arguments")) + end + + if sample==true + boundaries, refs = compute_boundaries(d; kwargs...) + presorted=true + + elseif sample==false + # Assume an uniform distribution between min and max values + minmax=asyncmap(p->remotecall_fetch(d->(minimum(localpart(d)), maximum(localpart(d))), p, d), pids) + min_d = minimum(T[x[1] for x in minmax]) + max_d = maximum(T[x[2] for x in minmax]) + + return sort(d; sample=(min_d,max_d), kwargs...) + + elseif isa(sample, Tuple) + # Assume an uniform distribution between min and max values in the tuple + lb=sample[1] + ub=sample[2] + + @assert lb<=ub + + s = Array(T, np) + part = abs(ub - lb)/np + (isnan(part) || isinf(part)) && throw(ArgumentError("lower and upper bounds must not be infinities")) + + for n in 1:np + v = lb + (n-1)*part + if T <: Integer + s[n] = round(v) + else + s[n] = v + end + end + return sort(d; sample=s, kwargs...) + + elseif isa(sample, Array) + # Provided array is used as a sample + samples = sort(copy(sample)) + samples[1] = typemin(T) + boundaries = samples[[1+(x-1)*div(length(samples), np) for x in 1:np]] + push!(boundaries, typemax(T)) + presorted=false + + refs=RemoteChannel[RemoteChannel(p) for p in procs(d)] + else + throw(ArgumentError("keyword arg `sample` must be Boolean, Tuple(Min,Max) or an actual sample of data : " * string(sample))) + end + + local_sort_results = Array(Tuple, np) + + Base.asyncmap!((i,p) -> remotecall_fetch( + scatter_n_sort_localparts, p, presorted ? nothing : d, i, refs, boundaries; kwargs...), + local_sort_results, 1:np, pids) + + # Construct a new DArray from the sorted refs. Remove parts with 0-length since + # the DArray constructor_from_refs does not yet support it. This implies that + # the participating workers for the sorted darray may be different from the original + # for highly non-uniform distributions. + local_sorted_refs = RemoteChannel[x[1] for x in filter(x->x[2]>0, local_sort_results)] + return DArray(local_sorted_refs) +end