Skip to content

Commit

Permalink
avoid triggering distributed gc in a few places
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Dec 4, 2015
1 parent 3ba57b0 commit 6c2c580
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 23 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
*.jl.cov
*.jl.mem
.DS_Store
54 changes: 31 additions & 23 deletions src/DistributedArrays.jl
Original file line number Diff line number Diff line change
Expand Up @@ -594,7 +594,7 @@ Base.copy!(dest::SubOrDArray, src::SubOrDArray) = begin
throw(DimensionMismatch("destination array doesn't fit to source array"))
end
@sync for p in procs(dest)
@async remotecall_wait((dest,src)->copy!(localpart(dest), localpart(src)), p, dest, src)
@async remotecall_fetch((dest,src)->(copy!(localpart(dest), localpart(src)); nothing), p, dest, src)
end
return dest
end
Expand Down Expand Up @@ -645,7 +645,7 @@ Base.setindex!(a::Array{Any}, d::SubOrDArray, i::Int) = Base.arrayset(a, d, i)

Base.fill!(A::DArray, x) = begin
@sync for p in procs(A)
@async remotecall_wait((A,x)->fill!(localpart(A), x), p, A, x)
@async remotecall_fetch((A,x)->(fill!(localpart(A), x); nothing), p, A, x)
end
return A
end
Expand Down Expand Up @@ -681,7 +681,7 @@ Base.mapreduce(f, opt, d::DArray) = _mapreduce(f, opt, d)

Base.map!(f, d::DArray) = begin
@sync for p in procs(d)
@async remotecall_wait((f,d)->map!(f, localpart(d)), p, f, d)
@async remotecall_fetch((f,d)->(map!(f, localpart(d)); nothing), p, f, d)
end
return d
end
Expand Down Expand Up @@ -714,11 +714,12 @@ end

function mapreducedim_between!(f, op, R::DArray, A::DArray, region)
@sync for p in procs(R)
@async remotecall_wait(p, f, op, R, A, region) do f, op, R, A, region
@async remotecall_fetch(p, f, op, R, A, region) do f, op, R, A, region
localind = [r for r = localindexes(A)]
localind[[region...]] = [1:n for n = size(A)[[region...]]]
B = convert(Array, A[localind...])
Base.mapreducedim!(f, op, localpart(R), B)
nothing
end
end
return R
Expand Down Expand Up @@ -754,7 +755,7 @@ end
# LinAlg
Base.scale!(A::DArray, x::Number) = begin
@sync for p in procs(A)
@async remotecall_wait((A,x)->scale!(localpart(A), x), p, A, x)
@async remotecall_fetch((A,x)->(scale!(localpart(A), x); nothing), p, A, x)
end
return A
end
Expand Down Expand Up @@ -805,7 +806,7 @@ map_localparts(f::Callable, d1::DArray, d2::DArray) = DArray(d1) do I
end
function map_localparts!(f::Callable, d::DArray)
@sync for p in procs(d)
@async remotecall_wait((f,d)->f(localpart(d)), p, f, d)
@async remotecall_fetch((f,d)->(f(localpart(d)); nothing), p, f, d)
end
return d
end
Expand Down Expand Up @@ -1017,7 +1018,7 @@ function axpy!(α, x::DVector, y::DVector)
throw(DimensionMismatch("vectors must have same length"))
end
@sync for p in procs(y)
@async remotecall_wait(() -> Base.axpy!(α, localpart(x), localpart(y)), p)
@async remotecall_fetch(() -> (Base.axpy!(α, localpart(x), localpart(y)); nothing), p)
end
return y
end
Expand All @@ -1029,17 +1030,24 @@ function dot(x::DVector, y::DVector)
if (procs(x) != procs(y)) || (x.cuts != y.cuts)
throw(ArgumentError("vectors don't have the same distribution. Not handled for efficiency reasons."))
end
r = Future[]
for i = eachindex(x.pids)
px, py = x.pids[i], y.pids[i]
push!(r, remotecall((x, y, i) -> dot(localpart(x), fetch(y, i)), px, x, y, i))

results=Any[]
@sync begin
for i = eachindex(x.pids)
@async push!(results, remotecall_fetch((x, y, i) -> dot(localpart(x), fetch(y, i)), x.pids[i], x, y, i))
end
end
return mapreduce(fetch, Base.AddFun(), r)
return reduce(Base.AddFun(), results)
end

function norm(x::DVector, p::Number = 2)
r = [remotecall(() -> norm(localpart(x), p), pp) for pp in procs(x)]
return norm([fetch(rr) for rr in r], p)
results = []
@sync begin
for pp in procs(x)
@async push!(results, remotecall_fetch(() -> norm(localpart(x), p), pp))
end
end
return norm(results, p)
end

# Level 2
Expand Down Expand Up @@ -1087,9 +1095,9 @@ function A_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVec
if β != one(β)
@sync for p in y.pids
if β != zero(β)
@async remotecall_wait(y -> scale!(localpart(y), β), p, y)
@async remotecall_fetch(y -> (scale!(localpart(y), β); nothing), p, y)
else
@async remotecall_wait(y -> fill!(localpart(y), 0), p, y)
@async remotecall_fetch(y -> (fill!(localpart(y), 0); nothing), p, y)
end
end
end
Expand All @@ -1099,7 +1107,7 @@ function A_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVec
p = y.pids[i]
for j = 1:size(R, 2)
rij = R[i,j]
@async remotecall_wait(() -> add!(localpart(y), fetch(rij), α), p)
@async remotecall_fetch(() -> (add!(localpart(y), fetch(rij), α); nothing), p)
end
end

Expand Down Expand Up @@ -1129,9 +1137,9 @@ function Ac_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVe
if β != one(β)
@sync for p in y.pids
if β != zero(β)
@async remotecall_wait(() -> scale!(localpart(y), β), p)
@async remotecall_fetch(() -> (scale!(localpart(y), β); nothing), p)
else
@async remotecall_wait(() -> fill!(localpart(y), 0), p)
@async remotecall_fetch(() -> (fill!(localpart(y), 0); nothing), p)
end
end
end
Expand All @@ -1141,7 +1149,7 @@ function Ac_mul_B!(α::Number, A::DMatrix, x::AbstractVector, β::Number, y::DVe
p = y.pids[i]
for j = 1:size(R, 2)
rij = R[i,j]
@async remotecall_wait(() -> add!(localpart(y), fetch(rij), α), p)
@async remotecall_fetch(() -> (add!(localpart(y), fetch(rij), α); nothing), p)
end
end
return y
Expand Down Expand Up @@ -1194,9 +1202,9 @@ function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::D
if β != one(β)
@sync for p in C.pids
if β != zero(β)
@async remotecall_wait(() -> scale!(localpart(C), β), p)
@async remotecall_fetch(() -> (scale!(localpart(C), β); nothing), p)
else
@async remotecall_wait(() -> fill!(localpart(C), 0), p)
@async remotecall_fetch(() -> (fill!(localpart(C), 0); nothing), p)
end
end
end
Expand All @@ -1207,7 +1215,7 @@ function _matmatmul!(α::Number, A::DMatrix, B::AbstractMatrix, β::Number, C::D
p = C.pids[i,k]
for j = 1:size(R, 2)
rijk = R[i,j,k]
@async remotecall_wait(d -> add!(localpart(d), fetch(rijk), α), p, C)
@async remotecall_fetch(d -> (add!(localpart(d), fetch(rijk), α); nothing), p, C)
end
end
end
Expand Down

0 comments on commit 6c2c580

Please sign in to comment.