Skip to content

Commit

Permalink
Serialize zeroed-out remote refs to non-ClusterSerializer objects (Ju…
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jul 19, 2017
1 parent eb133ab commit 7c045f0
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 10 deletions.
32 changes: 25 additions & 7 deletions base/distributed/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ mutable struct Future <: AbstractRemoteRef

Future(w::Int, rrid::RRID) = Future(w, rrid, Nullable{Any}())
Future(w::Int, rrid::RRID, v) = (r = new(w,rrid.whence,rrid.id,v); return test_existing_ref(r))

Future(t::Tuple) = new(t[1],t[2],t[3],t[4]) # Useful for creating dummy, zeroed-out instances
end

mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
Expand All @@ -31,6 +33,10 @@ mutable struct RemoteChannel{T<:AbstractChannel} <: AbstractRemoteRef
r = new(w, rrid.whence, rrid.id)
return test_existing_ref(r)
end

function RemoteChannel{T}(t::Tuple) where T<:AbstractChannel
return new(t[1],t[2],t[3])
end
end

function test_existing_ref(r::AbstractRemoteRef)
Expand Down Expand Up @@ -273,29 +279,29 @@ end

channel_type(rr::RemoteChannel{T}) where {T} = T

serialize(s::AbstractSerializer, f::Future) = serialize(s, f, isnull(f.v))
serialize(s::AbstractSerializer, rr::RemoteChannel) = serialize(s, rr, true)
function serialize(s::AbstractSerializer, rr::AbstractRemoteRef, addclient)
serialize(s::ClusterSerializer, f::Future) = serialize(s, f, isnull(f.v))
serialize(s::ClusterSerializer, rr::RemoteChannel) = serialize(s, rr, true)
function serialize(s::ClusterSerializer, rr::AbstractRemoteRef, addclient)
if addclient
p = worker_id_from_socket(s.io)
(p !== rr.where) && send_add_client(rr, p)
end
invoke(serialize, Tuple{AbstractSerializer, Any}, s, rr)
invoke(serialize, Tuple{ClusterSerializer, Any}, s, rr)
end

function deserialize(s::AbstractSerializer, t::Type{<:Future})
function deserialize(s::ClusterSerializer, t::Type{<:Future})
f = deserialize_rr(s,t)
Future(f.where, RRID(f.whence, f.id), f.v) # ctor adds to client_refs table
end

function deserialize(s::AbstractSerializer, t::Type{<:RemoteChannel})
function deserialize(s::ClusterSerializer, t::Type{<:RemoteChannel})
rr = deserialize_rr(s,t)
# call ctor to make sure this rr gets added to the client_refs table
RemoteChannel{channel_type(rr)}(rr.where, RRID(rr.whence, rr.id))
end

function deserialize_rr(s, t)
rr = invoke(deserialize, Tuple{AbstractSerializer, DataType}, s, t)
rr = invoke(deserialize, Tuple{ClusterSerializer, DataType}, s, t)
if rr.where == myid()
# send_add_client() is not executed when the ref is being
# serialized to where it exists
Expand All @@ -304,6 +310,18 @@ function deserialize_rr(s, t)
rr
end

# Future and RemoteChannel are serializable only in a running cluster.
# Serialize zeroed-out values to non ClusterSerializer objects
function serialize(s::AbstractSerializer, ::Future)
zero_fut = Future((0,0,0,Nullable{Any}()))
invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_fut)
end

function serialize(s::AbstractSerializer, ::RemoteChannel)
zero_rc = RemoteChannel{Channel{Any}}((0,0,0))
invoke(serialize, Tuple{AbstractSerializer, Any}, s, zero_rc)
end


# make a thunk to call f on args in a way that simulates what would happen if
# the function were sent elsewhere
Expand Down
8 changes: 5 additions & 3 deletions doc/src/manual/parallel-computing.md
Original file line number Diff line number Diff line change
Expand Up @@ -781,14 +781,16 @@ julia> @elapsed while n > 0 # print out results

## Remote References and Distributed Garbage Collection

Objects referred to by remote references can be freed only when *all* held references in the cluster
are deleted.
Objects referred to by remote references can be freed only when *all* held references
in the cluster are deleted.

The node where the value is stored keeps track of which of the workers have a reference to it.
Every time a [`RemoteChannel`](@ref) or a (unfetched) [`Future`](@ref) is serialized to a worker,
the node pointed to by the reference is notified. And every time a [`RemoteChannel`](@ref) or
a (unfetched) [`Future`](@ref) is garbage collected locally, the node owning the value is again
notified.
notified. This is implemented in an internal cluster aware serializer. Remote references are only
valid in the context of a running cluster. Serializing and deserializing references to and from
regular `IO` objects is not supported.

The notifications are done via sending of "tracking" messages--an "add reference" message when
a reference is serialized to a different process and a "delete reference" message when a reference
Expand Down
21 changes: 21 additions & 0 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,27 @@ test_indexing(Future(id_other))
test_indexing(RemoteChannel())
test_indexing(RemoteChannel(id_other))

# Test ser/deser to non-ClusterSerializer objects.
function test_regular_io_ser(ref::Base.Distributed.AbstractRemoteRef)
io = IOBuffer()
serialize(io, ref)
seekstart(io)
ref2 = deserialize(io)
for fld in fieldnames(typeof(ref))
v = getfield(ref2, fld)
if isa(v, Number)
@test v === zero(typeof(v))
elseif isa(v, Nullable)
@test v === Nullable{Any}()
else
error(string("Add test for field ", fld))
end
end
end

test_regular_io_ser(Future())
test_regular_io_ser(RemoteChannel())

dims = (20,20,20)

if Sys.islinux()
Expand Down

0 comments on commit 7c045f0

Please sign in to comment.