Skip to content

Commit

Permalink
some distributed GC tweaks, further optimization of call_fetch
Browse files Browse the repository at this point in the history
using nothing instead of () as default value of WeakRef
  • Loading branch information
JeffBezanson committed Jul 2, 2011
1 parent 4a56c02 commit 180b9b7
Show file tree
Hide file tree
Showing 3 changed files with 55 additions and 33 deletions.
2 changes: 1 addition & 1 deletion doc/todo
Original file line number Diff line number Diff line change
Expand Up @@ -711,7 +711,7 @@ issues 1/24/11
- some kind of no-specialize hint for slots like x in
assign(A::Array{Any}, x, i::Index) = arrayset(A,i,x)
where we know the function is identical for all types of x
- maybe make x::Any different from no type decl
* add x::ANY
- reuse specialized methods for multiple types in an Any slot
- implement SubArray
- isassigned predicates
Expand Down
84 changes: 53 additions & 31 deletions j/multi.j
Original file line number Diff line number Diff line change
Expand Up @@ -206,11 +206,6 @@ type RemoteRef
r
end

global WeakRemoteRef
function WeakRemoteRef(w, wh, id)
return new(w, wh, id)
end

REQ_ID::Int32 = 0
function RemoteRef(pid::Int)
rr = RemoteRef(pid, myid(), REQ_ID)
Expand All @@ -221,36 +216,51 @@ type RemoteRef
RemoteRef(w::LocalProcess) = RemoteRef(myid())
RemoteRef(w::Worker) = RemoteRef(w.id)
RemoteRef() = RemoteRef(myid())

global WeakRemoteRef
function WeakRemoteRef(w, wh, id)
return new(w, wh, id)
end

function WeakRemoteRef(pid::Int)
rr = WeakRemoteRef(pid, myid(), REQ_ID)
REQ_ID += 1
rr
end

WeakRemoteRef(w::LocalProcess) = WeakRemoteRef(myid())
WeakRemoteRef(w::Worker) = WeakRemoteRef(w.id)
WeakRemoteRef() = WeakRemoteRef(myid())
end

hash(r::RemoteRef) = hash(r.whence)+3*hash(r.id)
isequal(r::RemoteRef, s::RemoteRef) = (r.whence==s.whence && r.id==s.id)

rr2id(r::RemoteRef) = (r.whence, r.id)

let bottom_func() = assert(false)
global lookup_ref
function lookup_ref(id)
global PGRP
wi = get((PGRP::ProcessGroup).refs, id, ())
if is(wi, ())
# first we've heard of this ref
wi = WorkItem(bottom_func)
# this WorkItem is just for storing the result value
PGRP.refs[id] = wi
add(wi.clientset, id[1])
end
wi
end
# is a ref uninitialized? (for locally-owned refs only)
function ref_uninitialized(id)
wi = lookup_ref(id)
!wi.done && is(wi.thunk,bottom_func)
bottom_func() = assert(false)

function lookup_ref(id)
global PGRP
wi = get((PGRP::ProcessGroup).refs, id, ())
if is(wi, ())
# first we've heard of this ref
wi = WorkItem(bottom_func)
# this WorkItem is just for storing the result value
PGRP.refs[id] = wi
add(wi.clientset, id[1])
end
ref_uninitialized(r::RemoteRef) = (assert(r.where==myid());
ref_uninitialized(rr2id(r)))
wi
end

# is a ref uninitialized? (for locally-owned refs only)
function ref_uninitialized(id)
wi = lookup_ref(id)
!wi.done && is(wi.thunk,bottom_func)
end
ref_uninitialized(r::RemoteRef) = (assert(r.where==myid());
ref_uninitialized(rr2id(r)))

function isready(rr::RemoteRef)
rid = rr2id(rr)
if rr.where == myid()
Expand Down Expand Up @@ -283,7 +293,7 @@ function send_del_client(rr::RemoteRef)
else
W = worker_from_id(rr.where)
push(W.del_msgs, (rr2id(rr), myid()))
if length(W.del_msgs) >= 16
if length(W.del_msgs) >= 10
#print("sending delete of $(W.del_msgs)\n")
remote_do(rr.where, del_clients, W.del_msgs...)
del_all(W.del_msgs)
Expand Down Expand Up @@ -377,10 +387,12 @@ remote_call(id::Int, f, args...) = remote_call(worker_from_id(id), f, args...)
remote_call_fetch(w::LocalProcess, f, args...) = f(args...)

function remote_call_fetch(w::Worker, f, args...)
rr = RemoteRef(w)
# can be weak, because the program will have no way to refer to the Ref
# itself, it only gets the result.
rr = WeakRemoteRef(w)
oid = rr2id(rr)
send_msg(w, :call_fetch, oid, f, args)
force(yieldto(Scheduler, WaitFor(:fetch, oid)))
force(yieldto(Scheduler, WaitFor(:call_fetch, oid)))
end

remote_call_fetch(id::Int, f, args...) =
Expand Down Expand Up @@ -420,7 +432,11 @@ function sync_msg(verb::Symbol, r::RemoteRef)
if r.where==myid() || isa(pg.workers[r.where], LocalProcess)
wi = lookup_ref(oid)
if wi.done
return is(verb,:fetch) ? work_result(wi) : r
if is(verb,:fetch)
return work_result(wi)
else
return r
end
else
# add to WorkItem's notify list
wi.notify = ((), verb, oid, wi.notify)
Expand Down Expand Up @@ -561,6 +577,7 @@ function perform_work(job::WorkItem)
job.task = ()
# do notifications
notify_done(job)
job.thunk = bottom_func # avoid reference retention
else
# job interrupted
if is(job.task,runner)
Expand All @@ -581,7 +598,7 @@ end

function deliver_result(sock::IOStream, msg, oid, value)
#print("$(myid()) sending result\n")
if is(msg,:fetch)
if is(msg,:fetch) || is(msg,:call_fetch)
val = value
else
@assert is(msg, :wait)
Expand Down Expand Up @@ -629,6 +646,11 @@ function notify_done(job::WorkItem)
else
deliver_result(sock, msg, oid, wr)
end
if is(msg,:call_fetch)
# can delete the ref right away since we know it is
# unreferenced by the client
del(PGRP.refs, oid)
end
end
end
end
Expand Down Expand Up @@ -681,7 +703,7 @@ function message_handler(fd, sockets)
#print("$(myid()) got call\n")
wi = schedule_call(id, f, args)
if is(msg, :call_fetch)
wi.notify = (sock, :fetch, id, wi.notify)
wi.notify = (sock, :call_fetch, id, wi.notify)
elseif is(msg, :call_wait)
wi.notify = (sock, :wait, id, wi.notify)
end
Expand Down
2 changes: 1 addition & 1 deletion src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ static void sweep_weak_refs()
if (gc_marked_obj(wr)) {
// weakref itself is alive
if (!gc_marked_obj(wr->value))
wr->value = (jl_value_t*)jl_null;
wr->value = (jl_value_t*)jl_nothing;
n++;
}
else {
Expand Down

0 comments on commit 180b9b7

Please sign in to comment.