diff --git a/src/DistributedArrays.jl b/src/DistributedArrays.jl index 941e4f6..755855c 100644 --- a/src/DistributedArrays.jl +++ b/src/DistributedArrays.jl @@ -8,7 +8,7 @@ using LinearAlgebra import Base: +, -, *, div, mod, rem, &, |, xor import Base.Callable -import LinearAlgebra: axpy!, dot, norm, +import LinearAlgebra: axpy!, dot, norm import Primes import Primes: factor diff --git a/src/core.jl b/src/core.jl index 034e901..abb8415 100644 --- a/src/core.jl +++ b/src/core.jl @@ -55,7 +55,8 @@ end Get the vector of processes storing pieces of DArray `d`. """ -Distributed.procs(d::DArray) = d.pids +Distributed.procs(d::DArray) = d.pids +Distributed.procs(d::SubDArray) = procs(parent(d)) """ localpart(A) diff --git a/src/darray.jl b/src/darray.jl index ab28026..e2108b8 100644 --- a/src/darray.jl +++ b/src/darray.jl @@ -364,9 +364,41 @@ function localindices(d::DArray) return d.indices[lpidx] end -# find which piece holds index (I...) -locate(d::DArray, I::Int...) = - ntuple(i -> searchsortedlast(d.cuts[i], I[i]), ndims(d)) +# Equality +function Base.:(==)(d::DArray{<:Any,<:Any,A}, a::AbstractArray) where A + if size(d) != size(a) + return false + else + b = asyncmap(procs(d)) do p + remotecall_fetch(p) do + localpart(d) == A(a[localindices(d)...]) + end + end + return all(b) + end +end +Base.:(==)(d::SubDArray, a::AbstractArray) = copy(d) == a +Base.:(==)(a::AbstractArray, d::DArray) = d == a +Base.:(==)(a::AbstractArray, d::SubDArray) = d == a +Base.:(==)(d1::DArray, d2::DArray) = invoke(==, Tuple{DArray, AbstractArray}, d1, d2) +Base.:(==)(d1::SubDArray, d2::DArray) = copy(d1) == d2 +Base.:(==)(d1::DArray, d2::SubDArray) = d1 == copy(d2) +Base.:(==)(d1::SubDArray, d2::SubDArray) = copy(d1) == copy(d2) + +""" + locate(d::DArray, I::Int...) + +Determine the index of `procs(d)` that hold element `I`. +""" +function locate(d::DArray, I::Int...) + ntuple(ndims(d)) do i + fi = searchsortedlast(d.cuts[i], I[i]) + if fi >= length(d.cuts[i]) + throw(ArgumentError("element not contained in array")) + end + return fi + end +end chunk(d::DArray{T,N,A}, i...) where {T,N,A} = remotecall_fetch(localpart, d.pids[i...], d)::A @@ -479,7 +511,7 @@ end function (::Type{Array{S,N}})(s::SubDArray{T,N}) where {S,T,N} I = s.indices d = s.parent - if isa(I,Tuple{Vararg{UnitRange{Int}}}) && S<:T && T<:S + if isa(I,Tuple{Vararg{UnitRange{Int}}}) && S<:T && T<:S && !isempty(s) l = locate(d, map(first, I)...) if isequal(d.indices[l...], I) # SubDArray corresponds to a chunk @@ -487,7 +519,7 @@ function (::Type{Array{S,N}})(s::SubDArray{T,N}) where {S,T,N} end end a = Array{S}(undef, size(s)) - a[[1:size(a,i) for i=1:N]...] .= s + a[[1:size(a,i) for i=1:N]...] = s return a end @@ -540,7 +572,7 @@ end function Base.getindex(d::DArray, i::Int) _scalarindexingallowed() - return getindex_tuple(d, CartesianIndices(d)[i]) + return getindex_tuple(d, Tuple(CartesianIndices(d)[i])) end function Base.getindex(d::DArray, i::Int...) _scalarindexingallowed() @@ -548,7 +580,7 @@ function Base.getindex(d::DArray, i::Int...) end Base.getindex(d::DArray) = d[1] -Base.getindex(d::DArray, I::Union{Int,UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) = view(d, I...) +Base.getindex(d::SubOrDArray, I::Union{Int,UnitRange{Int},Colon,Vector{Int},StepRange{Int,Int}}...) = view(d, I...) function Base.isassigned(D::DArray, i::Integer...) try @@ -564,15 +596,15 @@ function Base.isassigned(D::DArray, i::Integer...) end -Base.copyto!(dest::SubOrDArray, src::SubOrDArray) = begin +function Base.copyto!(dest::SubOrDArray, src::AbstractArray) asyncmap(procs(dest)) do p remotecall_fetch(p) do - localpart(dest)[:] = src[localindices(dest)...] + ldest = localpart(dest) + ldest[:] = Array(view(src, localindices(dest)...)) end end return dest end -Base.copy!(dest::SubOrDArray, src::SubOrDArray) = copyto!(dest, src) function Base.deepcopy(src::DArray) dest = similar(src) diff --git a/src/linalg.jl b/src/linalg.jl index ca05c19..caf95cc 100644 --- a/src/linalg.jl +++ b/src/linalg.jl @@ -1,4 +1,5 @@ -function Base.copy(D::Adjoint{T,<:DArray{T,2}}) where T +function Base.copy(Dadj::Adjoint{T,<:DArray{T,2}}) where T + D = parent(Dadj) DArray(reverse(size(D)), procs(D)) do I lp = Array{T}(undef, map(length, I)) rp = convert(Array, D[reverse(I)...]) @@ -6,7 +7,8 @@ function Base.copy(D::Adjoint{T,<:DArray{T,2}}) where T end end -function Base.copy(D::Transpose{T,<:DArray{T,2}}) where T +function Base.copy(Dtr::Transpose{T,<:DArray{T,2}}) where T + D = parent(Dtr) DArray(reverse(size(D)), procs(D)) do I lp = Array{T}(undef, map(length, I)) rp = convert(Array, D[reverse(I)...]) @@ -49,7 +51,7 @@ function dot(x::DVector, y::DVector) return reduce(+, results) end -function norm(x::DVector, p::Real = 2) +function norm(x::DArray, p::Real = 2) results = [] @sync begin for pp in procs(x) @@ -83,7 +85,7 @@ function add!(dest, src, scale = one(dest[1])) return dest end -function A_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVector) +function mul!(y::DVector, A::DMatrix, x::AbstractVector, α::Number = 1, β::Number = 0) # error checks if size(A, 2) != length(x) @@ -106,11 +108,14 @@ function A_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVec # Scale y if necessary if β != one(β) - @sync for p in y.pids - if β != zero(β) - @async remotecall_fetch(y -> (rmul!(localpart(y), β); nothing), p, y) - else - @async remotecall_fetch(y -> (fill!(localpart(y), 0); nothing), p, y) + asyncmap(procs(y)) do p + remotecall_fetch(p) do + if !iszero(β) + rmul!(localpart(y), β) + else + fill!(localpart(y), 0) + end + return nothing end end end @@ -127,7 +132,9 @@ function A_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVec return y end -function Ac_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVector) +function mul!(y::DVector, adjA::Adjoint{<:Number,<:DMatrix}, x::AbstractVector, α::Number = 1, β::Number = 0) + + A = parent(adjA) # error checks if size(A, 1) != length(x) @@ -148,11 +155,14 @@ function Ac_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVe # Scale y if necessary if β != one(β) - @sync for p in y.pids - if β != zero(β) - @async remotecall_fetch(() -> (rmul!(localpart(y), β); nothing), p) - else - @async remotecall_fetch(() -> (fill!(localpart(y), 0); nothing), p) + asyncmap(procs(y)) do p + remotecall_fetch(p) do + if !iszero(β) + rmul!(localpart(y), β) + else + fill!(localpart(y), 0) + end + return nothing end end end @@ -189,7 +199,7 @@ function LinearAlgebra.rmul!(DA::DMatrix, D::Diagonal) end # Level 3 -function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix, tA) +function _matmatmul!(C::DMatrix, A::DMatrix, B::AbstractMatrix, α::Number, β::Number, tA) # error checks Ad1, Ad2 = (tA == 'N') ? (1,2) : (2,1) mA, nA = (size(A, Ad1), size(A, Ad2)) @@ -254,17 +264,16 @@ function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::D return C end -A_mul_B!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix) = _matmatmul!(α, A, B, β, C, 'N') -Ac_mul_B!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix) = _matmatmul!(α, A, B, β, C, 'C') -At_mul_B!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::DMatrix) = _matmatmul!(α, A, B, β, C, 'T') -At_mul_B!(C::DMatrix, A::DMatrix, B::AbstractMatrix) = At_mul_B!(one(eltype(C)), A, B, zero(eltype(C)), C) +mul!(C::DMatrix, A::DMatrix, B::AbstractMatrix, α::Number = 1, β::Number = 0) = _matmatmul!(C, A, B, α, β, 'N') +mul!(C::DMatrix, A::Adjoint{<:Number,<:DMatrix}, B::AbstractMatrix, α::Number = 1, β::Number = 0) = _matmatmul!(C, parent(A), B, α, β, 'C') +mul!(C::DMatrix, A::Transpose{<:Number,<:DMatrix}, B::AbstractMatrix, α::Number = 1, β::Number = 0) = _matmatmul!(C, parent(A), B, α, β, 'T') _matmul_op = (t,s) -> t*s + t*s function Base.:*(A::DMatrix, x::AbstractVector) T = Base.promote_op(_matmul_op, eltype(A), eltype(x)) y = DArray(I -> Array{T}(undef, map(length, I)), (size(A, 1),), procs(A)[:,1], (size(procs(A), 1),)) - return A_mul_B!(one(T), A, x, zero(T), y) + return mul!(y, A, x) end function Base.:*(A::DMatrix, B::AbstractMatrix) T = Base.promote_op(_matmul_op, eltype(A), eltype(B)) @@ -272,22 +281,43 @@ function Base.:*(A::DMatrix, B::AbstractMatrix) (size(A, 1), size(B, 2)), procs(A)[:,1:min(size(procs(A), 2), size(procs(B), 2))], (size(procs(A), 1), min(size(procs(A), 2), size(procs(B), 2)))) - return A_mul_B!(one(T), A, B, zero(T), C) + return mul!(C, A, B) +end + +function Base.:*(adjA::Adjoint{<:Any,<:DMatrix}, x::AbstractVector) + A = parent(adjA) + T = Base.promote_op(_matmul_op, eltype(A), eltype(x)) + y = DArray(I -> Array{T}(undef, map(length, I)), + (size(A, 2),), + procs(A)[1,:], + (size(procs(A), 2),)) + return mul!(y, adjA, x) +end +function Base.:*(adjA::Adjoint{<:Any,<:DMatrix}, B::AbstractMatrix) + A = parent(adjA) + T = Base.promote_op(_matmul_op, eltype(A), eltype(B)) + C = DArray(I -> Array{T}(undef, map(length, I)), (size(A, 2), + size(B, 2)), + procs(A)[1:min(size(procs(A), 1), size(procs(B), 2)),:], + (size(procs(A), 2), min(size(procs(A), 1), size(procs(B), 2)))) + return mul!(C, adjA, B) end -function Ac_mul_B(A::DMatrix, x::AbstractVector) +function Base.:*(trA::Transpose{<:Any,<:DMatrix}, x::AbstractVector) + A = parent(trA) T = Base.promote_op(_matmul_op, eltype(A), eltype(x)) y = DArray(I -> Array{T}(undef, map(length, I)), (size(A, 2),), procs(A)[1,:], (size(procs(A), 2),)) - return Ac_mul_B!(one(T), A, x, zero(T), y) + return mul!(y, trA, x) end -function Ac_mul_B(A::DMatrix, B::AbstractMatrix) +function Base.:*(trA::Transpose{<:Any,<:DMatrix}, B::AbstractMatrix) + A = parent(trA) T = Base.promote_op(_matmul_op, eltype(A), eltype(B)) C = DArray(I -> Array{T}(undef, map(length, I)), (size(A, 2), size(B, 2)), procs(A)[1:min(size(procs(A), 1), size(procs(B), 2)),:], (size(procs(A), 2), min(size(procs(A), 1), size(procs(B), 2)))) - return Ac_mul_B!(one(T), A, B, zero(T), C) + return mul!(C, trA, B) end diff --git a/src/mapreduce.jl b/src/mapreduce.jl index 3b28c77..d8da385 100644 --- a/src/mapreduce.jl +++ b/src/mapreduce.jl @@ -5,10 +5,10 @@ import SparseArrays: nnz Base.map(f, d0::DArray, ds::AbstractArray...) = broadcast(f, d0, ds...) -function Base.map!(f::F, dest::DArray, src::DArray) where {F} +function Base.map!(f::F, dest::DArray, src::DArray{<:Any,<:Any,A}) where {F,A} asyncmap(procs(dest)) do p remotecall_fetch(p) do - map!(f, localpart(dest), src[localindices(dest)...]) + map!(f, localpart(dest), A(view(src, localindices(dest)...))) return nothing end end @@ -53,7 +53,7 @@ rewrite_local(x) = x function Base.reduce(f, d::DArray) results = asyncmap(procs(d)) do p - remotecall_fetch(p, f, d) do (f, d) + remotecall_fetch(p) do return reduce(f, localpart(d)) end end @@ -122,12 +122,39 @@ function Base.mapreducedim!(f, op, R::DArray, A::DArray) end region = tuple(collect(1:ndims(A))[[size(R)...] .!= [size(A)...]]...) if isempty(region) - return copy!(R, A) + return copyto!(R, A) end B = mapreducedim_within(f, op, A, region) return mapreducedim_between!(identity, op, R, B, region) end +function Base._all(f, A::DArray, ::Colon) + B = asyncmap(procs(A)) do p + remotecall_fetch(p) do + all(f, localpart(A)) + end + end + return all(B) +end + +function Base._any(f, A::DArray, ::Colon) + B = asyncmap(procs(A)) do p + remotecall_fetch(p) do + any(f, localpart(A)) + end + end + return any(B) +end + +function Base.count(f, A::DArray) + B = asyncmap(procs(A)) do p + remotecall_fetch(p) do + count(f, localpart(A)) + end + end + return sum(B) +end + function nnz(A::DArray) B = asyncmap(A.pids) do p remotecall_fetch(nnz∘localpart, p, A) diff --git a/src/serialize.jl b/src/serialize.jl index a36d6b8..385a89f 100644 --- a/src/serialize.jl +++ b/src/serialize.jl @@ -2,7 +2,7 @@ function Serialization.serialize(S::AbstractSerializer, d::DArray{T,N,A}) where # Only send the ident for participating workers - we expect the DArray to exist in the # remote registry. DO NOT send the localpart. destpid = worker_id_from_socket(S.io) - serialize_type(S, typeof(d)) + Serialization.serialize_type(S, typeof(d)) if (destpid in d.pids) || (destpid == d.id[1]) serialize(S, (true, d.id)) # (id_only, id) else @@ -64,7 +64,7 @@ function Serialization.serialize(S::AbstractSerializer, s::DestinationSerializer pid = worker_id_from_socket(S.io) pididx = findfirst(isequal(pid), s.pids) @assert pididx !== nothing - serialize_type(S, typeof(s)) + Serialization.serialize_type(S, typeof(s)) serialize(S, s.generate(pididx)) end diff --git a/test/darray.jl b/test/darray.jl index 6ee06ea..4875f72 100644 --- a/test/darray.jl +++ b/test/darray.jl @@ -161,32 +161,40 @@ check_leaks() @testset "test DArray / Array conversion" begin D = drand((200,200), [MYID, OTHERIDS]) - @testset "test convert(::Array, ::(Sub)DArray)" begin - S = convert(Matrix{Float64}, D[1:150, 1:150]) - A = convert(Matrix{Float64}, D) + @testset "test construct Array from (Sub)DArray" begin + S = Matrix{Float64}(D[1:150, 1:150]) + A = Matrix{Float64}(D) @test A[1:150,1:150] == S - D2 = convert(DArray{Float64,2,Matrix{Float64}}, A) + D2 = DArray{Float64,2,Matrix{Float64}}(A) @test D2 == D + DistributedArrays.allowscalar(true) @test fetch(@spawnat MYID localpart(D)[1,1]) == D[1,1] @test fetch(@spawnat OTHERIDS localpart(D)[1,1]) == D[1,101] + DistributedArrays.allowscalar(false) close(D2) - S2 = convert(Vector{Float64}, D[4, 23:176]) + S2 = Vector{Float64}(D[4, 23:176]) @test A[4, 23:176] == S2 - S3 = convert(Vector{Float64}, D[23:176, 197]) + S3 = Vector{Float64}(D[23:176, 197]) @test A[23:176, 197] == S3 S4 = zeros(4) setindex!(S4, D[3:4, 99:100], :) + # FixMe! Hitting the AbstractArray fallback here is extremely unfortunate but vec() becomes a ReshapedArray which makes it diffuclt to hit DArray methods. Unless this can be fixed in Base, we might have to add special methods for ReshapedArray{DArray} + DistributedArrays.allowscalar(true) @test S4 == vec(D[3:4, 99:100]) @test S4 == vec(A[3:4, 99:100]) + DistributedArrays.allowscalar(false) S5 = zeros(2,2) setindex!(S5, D[1,1:4], :, 1:2) + # FixMe! Hitting the AbstractArray fallback here is extremely unfortunate but vec() becomes a ReshapedArray which makes it diffuclt to hit DArray methods. Unless this can be fixed in Base, we might have to add special methods for ReshapedArray{DArray} + DistributedArrays.allowscalar(true) @test vec(S5) == D[1, 1:4] @test vec(S5) == A[1, 1:4] + DistributedArrays.allowscalar(false) end close(D) end @@ -198,7 +206,7 @@ check_leaks() r1 = remotecall_wait(() -> randn(3,10), workers()[1]) r2 = remotecall_wait(() -> randn(7,10), workers()[2]) D2 = DArray(reshape([r1; r2], 2, 1)) - copy!(D2, D1) + copyto!(D2, D1) @test D1 == D2 close(D1) close(D2) @@ -672,22 +680,22 @@ check_leaks() @testset "test transpose/adjoint" begin @testset "test transpose real" begin A = drand(Float64, 100, 200) - @test transpose(A) == transpose(Array(A)) + @test copy(transpose(A)) == transpose(Array(A)) close(A) end @testset "test transpose complex" begin A = drand(ComplexF64, 200, 100) - @test transpose(A) == transpose(Array(A)) + @test copy(transpose(A)) == transpose(Array(A)) close(A) end @testset "test adjoint real" begin A = drand(Float64, 200, 100) - @test adjoint(A) == adjoint(Array(A)) + @test copy(adjoint(A)) == adjoint(Array(A)) close(A) end @testset "test adjoint complex" begin A = drand(ComplexF64, 100, 200) - @test adjoint(A) == adjoint(Array(A)) + @test copy(adjoint(A)) == adjoint(Array(A)) close(A) end @@ -701,11 +709,11 @@ check_leaks() s = view(a, 1:5, 5:8) @test isa(s, SubDArray) - @test s == convert(DArray, s) + @test s == DArray(s) s = view(a, 6:5, 5:8) @test isa(s, SubDArray) - @test s == convert(DArray, s) + @test s == DArray(s) close(a) d_closeall() # close the temporaries created above end diff --git a/test/runtests.jl b/test/runtests.jl index b98613b..9b66444 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -2,6 +2,10 @@ using Test using Distributed using DistributedArrays +# Disable scalar indexing to avoid falling back on generic methods +# for AbstractArray +DistributedArrays.allowscalar(false) + # add at least 3 worker processes if nworkers() < 3 n = max(3, min(8, Sys.CPU_THREADS)) @@ -10,6 +14,7 @@ end @assert nprocs() > 3 @assert nworkers() >= 3 +@everywhere using Distributed @everywhere using DistributedArrays @everywhere using DistributedArrays.SPMD @everywhere using Random @@ -23,7 +28,7 @@ const OTHERIDS = filter(id-> id != MYID, procs())[rand(1:(nprocs()-1))] function check_leaks() if length(DistributedArrays.refs) > 0 sleep(0.1) # allow time for any cleanup to complete and test again - length(DistributedArrays.refs) > 0 && warn("Probable leak of ", length(DistributedArrays.refs), " darrays") + length(DistributedArrays.refs) > 0 && @warn("Probable leak of ", length(DistributedArrays.refs), " darrays") end end