Skip to content

Commit

Permalink
fix bug in worker-to-worker connection setup. closes JuliaLang#9951
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jan 29, 2015
1 parent 7dba386 commit 60d43d5
Show file tree
Hide file tree
Showing 2 changed files with 91 additions and 57 deletions.
117 changes: 62 additions & 55 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -244,15 +244,13 @@ function add_worker(pg::ProcessGroup, w)
# has the full list of address:port
assert(LPROC.id == 1)
rr_join = RemoteRef()
register_worker(w)
process_messages(w.r_stream, w.w_stream; ntfy_join_complete=rr_join)

all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id, isa(x.manager, LocalManager)) : ((), x.id, true), pg.workers)

send_msg_now(w, :join_pgrp, w.id, all_locs, isa(w.manager, LocalManager))

@schedule manage(w.manager, w.id, w.config, :register)

rr_join
end

Expand Down Expand Up @@ -1020,6 +1018,7 @@ function start_cluster_workers(manager, params, resp_arr, launched_ntfy)
end

@sync begin
additional_workers = []
while true
if length(launched) == 0
if istaskdone(t)
Expand All @@ -1031,9 +1030,27 @@ function start_cluster_workers(manager, params, resp_arr, launched_ntfy)

if length(launched) > 0
wconfig = shift!(launched)
rr = connect_n_create_worker(manager, get_next_pid(), wconfig)
let rr=rr, exename = params[:exename]
@async launch_additional(worker_from_id(fetch(rr)), exename, resp_arr, launched_ntfy)
w = connect_n_create_worker(manager, get_next_pid(), wconfig)
rr = add_worker(PGRP, w)

let additional_workers=additional_workers, rr=rr, exename = params[:exename]
@async begin
w = worker_from_id(fetch(rr))
cnt = get(w.config.count, 1)
if cnt == :auto
cnt = get(w.config.environ)[:cpu_cores]
end
cnt = cnt - 1 # Removing self from the requested number

if cnt > 0
exeflags = get(w.config.exeflags, ``)
cmd = `$exename $exeflags`

npids = [get_next_pid() for x in 1:cnt]
new_workers = remotecall_fetch(w.id, launch_additional, cnt, npids, cmd)
push!(additional_workers, (w, new_workers))
end
end
end

push!(resp_arr, rr)
Expand All @@ -1042,75 +1059,65 @@ function start_cluster_workers(manager, params, resp_arr, launched_ntfy)
end
end

process_additional(additional_workers, resp_arr, launched_ntfy)

notify(launched_ntfy)
end

function launch_additional(w::Worker, exename, resp_arr::Array, launched_ntfy::Condition)
cnt = get(w.config.count, 1)
if cnt == :auto
cnt = get(w.config.environ)[:cpu_cores]
end
cnt = cnt - 1 # Removing self from the requested number

exeflags = get(w.config.exeflags, ``)
cmd = `$exename $exeflags`
if cnt > 0
npids = [get_next_pid() for x in 1:cnt]
new_workers = remotecall_fetch(w.id, launch_additional, cnt, npids, cmd)
function process_additional(additional_workers, resp_arr::Array, launched_ntfy::Condition)
# keyword argument max_parallel is only relevant for concurrent ssh connections to a unique host
# Post launch, ssh from master to workers is used only if tunnel is true

# keyword argument max_parallel is only relevant for concurrent ssh connections to a unique host
# Post launch, ssh from master to workers is used only if tunnel is true
num_new_w = length(new_workers)
tunnel = get(w.config.tunnel, false)
maxp = get(w.config.max_parallel, 0)
while length(additional_workers) > 0
all_new_workers=[]
for (w, new_workers) in additional_workers
num_new_w = length(new_workers)
tunnel = get(w.config.tunnel, false)
maxp = get(w.config.max_parallel, 0)

if tunnel && (maxp > 0)
num_in_p = min(maxp, num_new_w)
control_rrs = [RemoteRef() for i in 1:num_in_p]
else
num_in_p = 0 # Do not rate-limit connect
control_rrs = []
end
if tunnel && (maxp > 0)
num_in_p = min(maxp, num_new_w)
else
num_in_p = num_new_w # Do not rate-limit connect
end

@sync for (i, address) in enumerate(new_workers)
(pid, bind_addr, port) = address
for i in 1:num_in_p
(pid, bind_addr, port) = shift!(new_workers)

wconfig = WorkerConfig()
for x in [:host, :tunnel, :sshflags, :exeflags]
setfield!(wconfig, x, getfield(w.config, x))
end
wconfig.bind_addr = bind_addr
wconfig.port = port

rridx = num_in_p > 0 ? (num_new_w % num_in_p) + 1 : 0
let pid=pid, wconfig=wconfig, rridx=rridx
@async try
(rridx > 0) && take!(control_rrs[rridx])
rr = connect_n_create_worker(w.manager, pid, wconfig)
(rridx > 0) && put!(control_rrs[rridx], :OK)

push!(resp_arr, rr)
notify(launched_ntfy)
catch e
print(STDERR, "Error connecting to additional worker : $(e)\n")
wconfig = WorkerConfig()
for x in [:host, :tunnel, :sshflags, :exeflags]
setfield!(wconfig, x, getfield(w.config, x))
end
wconfig.bind_addr = bind_addr
wconfig.port = port

new_w = connect_n_create_worker(w.manager, pid, wconfig)
push!(all_new_workers, new_w)
end
end
for rr in control_rrs
put(rr, :OK)

rr_list=[]
for new_w in all_new_workers
rr=add_worker(PGRP, new_w)
push!(resp_arr, rr)
push!(rr_list, rr)
notify(launched_ntfy)
end

# wait for all of this set to finish.
[wait(rr) for rr in rr_list]

filter!(x->((w, new_workers) = x; length(new_workers) > 0), additional_workers)
end
end

function connect_n_create_worker(manager, pid, wconfig)
(r_s, w_s) = connect(manager, pid, wconfig)

w = Worker(pid, r_s, w_s, manager, wconfig)
register_worker(w)
# install a finalizer to perform cleanup if necessary
finalizer(w, (w)->if myid() == 1 manage(w.manager, w.id, w.config, :finalize) end)

# performs initial handshake with new worker. Returns a remoteref we can wait on for completion.
rr = add_worker(PGRP, w)
w
end


Expand Down
31 changes: 29 additions & 2 deletions test/parallel.jl
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ end
# executed successfully before committing/merging

if haskey(ENV, "PTEST_FULL")
println("START of parallel tests that print errors")
print("\n\nSTART of parallel tests that print errors\n")

# make sure exceptions propagate when waiting on Tasks
@test_throws ErrorException (@sync (@async error("oops")))
Expand Down Expand Up @@ -219,7 +219,34 @@ if haskey(ENV, "PTEST_FULL")
@test length(res) == length(ups)
@test isa(res[1], Exception)

println("END of parallel tests that print errors")
print("\n\nEND of parallel tests that print errors\n")

#Issue #9951
hosts=[]
for i in 1:10
push!(hosts, "localhost")
push!(hosts, string(getipaddr()))
push!(hosts, "127.0.0.1")
end

new_pids = remotecall_fetch(1, addprocs, hosts)
function test_n_remove_pids(new_pids)
for p in new_pids
w_in_remote = sort(remotecall_fetch(p, workers))
@test intersect(new_pids, w_in_remote) == new_pids
end

remotecall_fetch(1, rmprocs, new_pids)
end

#Other addprocs/rmprocs tests
new_pids = sort(remotecall_fetch(1, addprocs, ["localhost", ("127.0.0.1", :auto), "localhost"]))
@test length(new_pids) == (2 + Sys.CPU_CORES)
test_n_remove_pids(new_pids)

new_pids = sort(remotecall_fetch(1, addprocs, [("localhost", 2), ("127.0.0.1", 2), "localhost"]))
@test length(new_pids) == 5
test_n_remove_pids(new_pids)
end

# issue #7727
Expand Down

0 comments on commit 60d43d5

Please sign in to comment.