From 68256c137ed0ebd52892a1b1e162d45576b45149 Mon Sep 17 00:00:00 2001 From: amitmurthy Date: Fri, 14 Jun 2013 11:32:27 +0530 Subject: [PATCH 1/3] process ids are independent of nprocs() --- base/client.jl | 9 +- base/darray.jl | 2 +- base/exports.jl | 4 + base/loading.jl | 4 +- base/multi.jl | 242 ++++++++++++++++++++++++++++++----------------- test/parallel.jl | 2 +- 6 files changed, 166 insertions(+), 97 deletions(-) diff --git a/base/client.jl b/base/client.jl index 8f59cffefa321..153a8710e21f9 100644 --- a/base/client.jl +++ b/base/client.jl @@ -281,11 +281,10 @@ function init_head_sched() # start in "head node" mode global const Scheduler = Task(()->event_loop(true), 1024*1024) global PGRP - PGRP.myid = 1 - assert(PGRP.np == 0) - push!(PGRP.workers,LocalProcess()) - push!(PGRP.locs,("",0)) - PGRP.np = 1 + global LPROC + LPROC.id = 1 + assert(length(PGRP.workers) == 0) + register_worker(LPROC) # make scheduler aware of current (root) task unshift!(Workqueue, roottask) yield() diff --git a/base/darray.jl b/base/darray.jl index 6d150f1d98a3d..9546eeb94c098 100644 --- a/base/darray.jl +++ b/base/darray.jl @@ -45,7 +45,7 @@ function DArray(init, dims, procs) end DArray(init, dims, procs, defaultdist(dims,procs)) end -DArray(init, dims) = DArray(init, dims, [1:min(nprocs(),max(dims))]) +DArray(init, dims) = DArray(init, dims, list_allprocs()[1:min(nprocs(),max(dims))]) size(d::DArray) = d.dims procs(d::DArray) = d.pmap diff --git a/base/exports.jl b/base/exports.jl index f8f0a94bf9f1d..69225dd781142 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1085,6 +1085,10 @@ export yield, myid, nprocs, + nworkers, + list_allprocs, + list_workers, + rmprocs, pmap, put, remotecall, diff --git a/base/loading.jl b/base/loading.jl index 8418b0f2c461c..c66b6f28b417a 100644 --- a/base/loading.jl +++ b/base/loading.jl @@ -37,7 +37,7 @@ require(f::String, fs::String...) = (require(f); for x in fs require(x); end) function require(name::ByteString) if myid() == 1 - @sync for p = 2:nprocs() + @sync for p in list_workers() @spawnat p require(name) end end @@ -52,7 +52,7 @@ end function reload(name::String) if myid() == 1 - @sync for p = 2:nprocs() + @sync for p in list_workers() @spawnat p reload(name) end end diff --git a/base/multi.jl b/base/multi.jl index ad2dbf49dd3b8..4410ac5dd0a3e 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -167,59 +167,99 @@ end ## process group creation ## type LocalProcess + id::Int +end + +const LPROC = LocalProcess(0) + +const map_pid_wrkr = Dict{Int, Union(Worker, LocalProcess)}() +const map_sock_wrkr = Dict{Socket, Union(Worker, LocalProcess)}() + +let next_pid = 2 # 1 is reserved for the client (always) + global get_next_pid + function get_next_pid() + retval = next_pid + next_pid += 1 + retval + end end + type ProcessGroup - myid::Int + name::String workers::Array{Any,1} - locs::Array{Any,1} - np::Int # global references refs::Dict - function ProcessGroup(myid::Integer, w::Array{Any,1}, locs::Array{Any,1}) - return new(myid, w, locs, length(w), Dict()) + function ProcessGroup(w::Array{Any,1}) + return new("pg-default", w, Dict()) end end -const PGRP = ProcessGroup(0, {}, {}) +const PGRP = ProcessGroup({}) -function add_workers(PGRP::ProcessGroup, w::Array{Any,1}) - n = length(w) - locs = map(x->(x.host,x.port), w) +function add_workers(pg::ProcessGroup, w::Array{Any,1}) # NOTE: currently only node 1 can add new nodes, since nobody else # has the full list of address:port - newlocs = [PGRP.locs, locs] - for i=1:n - push!(PGRP.workers, w[i]) - w[i].id = PGRP.np+i - send_msg_now(w[i], :join_pgrp, w[i].id, newlocs) + assert(LPROC.id == 1) + for i=1:length(w) + w[i].id = get_next_pid() + register_worker(w[i]) create_message_handler_loop(w[i].socket) end - PGRP.locs = newlocs - PGRP.np += n + all_locs = map(x -> isa(x, Worker) ? (x.host,x.port, x.id) : ("", 0, x.id), pg.workers) + for i=1:length(w) + send_msg_now(w[i], :join_pgrp, w[i].id, all_locs) + end :ok end -myid() = PGRP.myid -nprocs() = PGRP.np +myid() = LPROC.id + +nprocs() = length(PGRP.workers) +function nworkers() + n = nprocs() + n == 1 ? 1 : n-1 +end + +list_allprocs() = [x.id for x in PGRP.workers] + +function list_workers() + allp = list_allprocs() + if nprocs() == 1 + allp + else + filter(x -> x != 1, allp) + end +end + +function rmprocs(args...) + # Only pid 1 can add and remove processes + assert(myid() == 1) + for i in [args...] + if haskey(map_pid_wrkr, i) + remotecall(i, () -> exit()) + end + end +end + -function worker_from_id(i) - pg = PGRP::ProcessGroup - while i > length(pg.workers) || pg.workers[i]===nothing - sleep(0.1) +worker_from_id(i) = worker_from_id(PGRP, i) +function worker_from_id(pg::ProcessGroup, i) +# Processes with pids > ours, have to connect to us. May not have happened. Wait for some time. + start = time() + while (!haskey(map_pid_wrkr, i) && ((time() - start) < 60.0)) + sleep(0.1) yield() end - pg.workers[i] + map_pid_wrkr[i] end function worker_id_from_socket(s) - for i=1:nprocs() - w = worker_from_id(i) - if isa(w,Worker) - if is(s, w.socket) || is(s, w.sendbuf) - return i - end + w = get(map_sock_wrkr, s, nothing) + if isa(w,Worker) + if is(s, w.socket) || is(s, w.sendbuf) + return w.id end end if isa(s,IOStream) && fd(s)==-1 @@ -229,16 +269,19 @@ function worker_id_from_socket(s) return -1 end -function register_worker(i, wrkr) - d = i-length(PGRP.workers) - if d > 0 - resize!(PGRP.workers, i) - PGRP.workers[(end-d+1):end] = nothing - PGRP.np += d - end - PGRP.workers[i] = wrkr - #write(STDOUT, "$(PGRP.myid) heard from $i\n") - nothing + +register_worker(w) = register_worker(PGRP, w) +function register_worker(pg, w) + push!(pg.workers, w) + map_pid_wrkr[w.id] = w + if isa(w, Worker) map_sock_wrkr[w.socket] = w end +end + +deregister_worker(pid) = deregister_worker(PGRP, pid) +function deregister_worker(pg, pid) + pg.workers = filter(x -> !(x.id == pid), pg.workers) + w = delete!(map_pid_wrkr, pid, nothing) + if isa(w, Worker) delete!(map_sock_wrkr, w.socket) end end ## remote refs ## @@ -275,7 +318,7 @@ type RemoteRef rr end - RemoteRef(w::LocalProcess) = RemoteRef(myid()) + RemoteRef(w::LocalProcess) = RemoteRef(w.id) RemoteRef(w::Worker) = RemoteRef(w.id) RemoteRef() = RemoteRef(myid()) @@ -303,13 +346,13 @@ isequal(r::RemoteRef, s::RemoteRef) = (r.whence==s.whence && r.id==s.id) rr2id(r::RemoteRef) = (r.whence, r.id) -function lookup_ref(id) - GRP = PGRP::ProcessGroup - rv = get(GRP.refs, id, false) +lookup_ref(id) = lookup_ref(PGRP, id) +function lookup_ref(pg, id) + rv = get(pg.refs, id, false) if rv === false # first we've heard of this ref rv = RemoteValue() - GRP.refs[id] = rv + pg.refs[id] = rv add!(rv.clientset, id[1]) end rv @@ -332,11 +375,12 @@ function isready(rr::RemoteRef) end end -function del_client(id, client) +del_client(id, client) = del_client(PGRP, id, client) +function del_client(pg, id, client) rv = lookup_ref(id) delete!(rv.clientset, client) if isempty(rv.clientset) - delete!((PGRP::ProcessGroup).refs, id) + delete!(pg.refs, id) #print("$(myid()) collected $id\n") end nothing @@ -575,7 +619,7 @@ remote_do(id::Integer, f, args...) = remote_do(worker_from_id(id), f, args...) function sync_msg(verb::Symbol, r::RemoteRef) pg = (PGRP::ProcessGroup) oid = rr2id(r) - if r.where==myid() || isa(pg.workers[r.where], LocalProcess) + if r.where==myid() || isa(worker_from_id(r.where), LocalProcess) rv = lookup_ref(oid) wait_full(rv) return is(verb,:fetch) ? work_result(rv) : r @@ -701,9 +745,8 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately #println("message_handler_loop") start_reading(sock) wait_connected(sock) - #println("loop") - while true - try + try + while true msg = deserialize(sock) #println("got msg: ",msg) # handle message @@ -744,21 +787,30 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately end elseif is(msg, :identify_socket) otherid = deserialize(sock) - # establish a Worker connection for processes that connected to us - register_worker(otherid, Worker("", 0, sock, otherid)) + register_worker(Worker("", 0, sock, otherid)) elseif is(msg, :join_pgrp) - PGRP.myid = deserialize(sock) - PGRP.locs = locs = deserialize(sock) - - register_worker(PGRP.myid, LocalProcess()) - register_worker(1, Worker("", 0, sock, 1)) - - w = PGRP.workers - for i = 2:(PGRP.myid-1) - w[i] = Worker(locs[i][1], locs[i][2]) - w[i].id = i - create_message_handler_loop(w[i].socket) - send_msg_now(w[i], :identify_socket, PGRP.myid) + # first connection; get process group info from client + self_pid = LPROC.id = deserialize(sock) + locs = deserialize(sock) + #print("\nLocation: ",locs,"\nId:",myid(),"\n") + # joining existing process group + + register_worker(Worker("", 0, sock, 1)) + register_worker(LPROC) + + for (rhost, rport, rpid) in locs + if (rpid < self_pid) && (!(rpid == 1)) + # Connect to them + w = Worker(rhost, rport) + w.id = rpid + register_worker(w) + create_message_handler_loop(w.socket) + send_msg_now(w, :identify_socket, self_pid) + else + # Others will connect to us. Don't do anything just yet + continue + end + end else # the synchronization messages @@ -775,19 +827,29 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately end end end - catch e - if isa(e,EOFError) - #print("eof. $(myid()) exiting\n") - stop_reading(sock) - # TODO: remove machine from group - throw(DisconnectException()) - else - print(STDERR, "deserialization: ", e, "\n") - rethrow(e) - # #while nb_available(sock) > 0 #|| select(sock) - # # read(sock, Uint8) - # #end - end + end # end of while + + + + catch e + iderr = worker_id_from_socket(sock) + # If pid 1 is disconnected, commit harakiri + if (iderr == 1) exit() end + + if isa(e,EOFError) + stop_reading(sock) + deregister_worker(iderr) + + if (myid() == 1) println("Worker $iderr terminated.") end + + #TODO : Notify all RemoteRefs linked to this Worker who just died.... + # How? + + # FIXME: Without the below throw, the main process results in a segmentation fault. + throw("DisconnectedException") + else + # TODO : Treat any exception as death of node / major screw-up and cleanup? + rethrow(e) end end end) @@ -928,10 +990,10 @@ end worker_local_cmd() = `$JULIA_HOME/julia-release-basic --bind-to $bind_addr --worker` -addprocs(np::Integer) = +function addprocs(np::Integer) add_workers(PGRP, start_remote_workers({ "localhost" for i=1:np }, { worker_local_cmd() for i=1:np })) - +end function start_sge_workers(n) home = JULIA_HOME @@ -1011,7 +1073,7 @@ end spawnat(p, thunk) = sync_add(remotecall(p, thunk)) -let lastp = 1 +let nextidx = 1 global chooseproc function chooseproc(thunk::Function) p = -1 @@ -1027,9 +1089,12 @@ let lastp = 1 end end if p == -1 - p = lastp; lastp += 1 - if lastp > nprocs() - lastp = 1 + if nextidx > nprocs() + p = PGRP.workers[1].id + nextidx = 2 + else + p = PGRP.workers[nextidx].id + nextidx += 1 end end p @@ -1072,8 +1137,8 @@ macro spawnat(p, expr) end function at_each(f, args...) - for i=1:nprocs() - sync_add(remotecall(i, f, args...)) + for w in PGRP.workers + sync_add(remotecall(w.id, f, args...)) end end @@ -1088,7 +1153,7 @@ end function pmap_static(f, lsts...) np = nprocs() n = length(lsts[1]) - { remotecall((i-1)%np+1, f, map(L->L[i], lsts)...) for i = 1:n } + { remotecall(PGRP.workers[(i-1)%np+1].id, f, map(L->L[i], lsts)...) for i = 1:n } end pmap(f) = f() @@ -1109,14 +1174,15 @@ function pmap(f, lsts...) nextidx() = (idx=i; i+=1; idx) @sync begin for p=1:np - if p != myid() || np == 1 + wpid = PGRP.workers[p].id + if wpid != myid() || np == 1 @async begin while true idx = nextidx() if idx > n break end - results[idx] = remotecall_fetch(p, f, + results[idx] = remotecall_fetch(wpid, f, map(L->L[idx], lsts)...) end end diff --git a/test/parallel.jl b/test/parallel.jl index 44e07f0801426..d4230f392acda 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -3,7 +3,7 @@ if nprocs() < 2 end id_me = myid() -id_other = id_me==1 ? 2 : 1 +id_other = filter(x -> x != id_me, list_allprocs())[rand(1:(nprocs()-1))] @test fetch(@spawnat id_other myid()) == id_other From 9346ab5bee3804e08659b7c02ece964252488d89 Mon Sep 17 00:00:00 2001 From: amitmurthy Date: Fri, 14 Jun 2013 17:58:59 +0530 Subject: [PATCH 2/3] Fixed loop bug in require() --- base/loading.jl | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/base/loading.jl b/base/loading.jl index c66b6f28b417a..2bf0982ca3d76 100644 --- a/base/loading.jl +++ b/base/loading.jl @@ -36,8 +36,8 @@ require(fname::String) = require(bytestring(fname)) require(f::String, fs::String...) = (require(f); for x in fs require(x); end) function require(name::ByteString) - if myid() == 1 - @sync for p in list_workers() + if myid() == 1 + @sync for p in filter(x -> x != 1, list_allprocs()) @spawnat p require(name) end end @@ -52,7 +52,7 @@ end function reload(name::String) if myid() == 1 - @sync for p in list_workers() + @sync for p in filter(x -> x != 1, list_allprocs()) @spawnat p reload(name) end end From eeabbdffd7e9a3b71f5bbb64200ce48f2991e0cb Mon Sep 17 00:00:00 2001 From: amitmurthy Date: Sat, 15 Jun 2013 12:12:05 +0530 Subject: [PATCH 3/3] Changed names to procs() and workers() --- base/darray.jl | 2 +- base/exports.jl | 4 ++-- base/loading.jl | 4 ++-- base/multi.jl | 6 +++--- test/parallel.jl | 2 +- 5 files changed, 9 insertions(+), 9 deletions(-) diff --git a/base/darray.jl b/base/darray.jl index 9546eeb94c098..2fefdad809abb 100644 --- a/base/darray.jl +++ b/base/darray.jl @@ -45,7 +45,7 @@ function DArray(init, dims, procs) end DArray(init, dims, procs, defaultdist(dims,procs)) end -DArray(init, dims) = DArray(init, dims, list_allprocs()[1:min(nprocs(),max(dims))]) +DArray(init, dims) = DArray(init, dims, procs()[1:min(nprocs(),max(dims))]) size(d::DArray) = d.dims procs(d::DArray) = d.pmap diff --git a/base/exports.jl b/base/exports.jl index 69225dd781142..0f439ed5b1185 100644 --- a/base/exports.jl +++ b/base/exports.jl @@ -1086,8 +1086,8 @@ export myid, nprocs, nworkers, - list_allprocs, - list_workers, + procs, + workers, rmprocs, pmap, put, diff --git a/base/loading.jl b/base/loading.jl index 2bf0982ca3d76..76a9bdc326cf0 100644 --- a/base/loading.jl +++ b/base/loading.jl @@ -37,7 +37,7 @@ require(f::String, fs::String...) = (require(f); for x in fs require(x); end) function require(name::ByteString) if myid() == 1 - @sync for p in filter(x -> x != 1, list_allprocs()) + @sync for p in filter(x -> x != 1, procs()) @spawnat p require(name) end end @@ -52,7 +52,7 @@ end function reload(name::String) if myid() == 1 - @sync for p in filter(x -> x != 1, list_allprocs()) + @sync for p in filter(x -> x != 1, procs()) @spawnat p reload(name) end end diff --git a/base/multi.jl b/base/multi.jl index 4410ac5dd0a3e..40e7d244b737f 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -222,10 +222,10 @@ function nworkers() n == 1 ? 1 : n-1 end -list_allprocs() = [x.id for x in PGRP.workers] +procs() = [x.id for x in PGRP.workers] -function list_workers() - allp = list_allprocs() +function workers() + allp = procs() if nprocs() == 1 allp else diff --git a/test/parallel.jl b/test/parallel.jl index d4230f392acda..4547d896321d6 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -3,7 +3,7 @@ if nprocs() < 2 end id_me = myid() -id_other = filter(x -> x != id_me, list_allprocs())[rand(1:(nprocs()-1))] +id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))] @test fetch(@spawnat id_other myid()) == id_other