From e554039bddd063be2da3a08b7bdf6e1ffe03ac51 Mon Sep 17 00:00:00 2001 From: Amit Murthy Date: Mon, 1 Jun 2015 11:08:50 +0530 Subject: [PATCH] refactor asynchronous addition of workers --- base/lock.jl | 24 ++ base/managers.jl | 14 +- base/multi.jl | 546 +++++++++++++++++++++------------------------ base/precompile.jl | 22 ++ test/parallel.jl | 5 +- 5 files changed, 312 insertions(+), 299 deletions(-) diff --git a/base/lock.jl b/base/lock.jl index 67d2026615e6d..38ccc95c7530d 100644 --- a/base/lock.jl +++ b/base/lock.jl @@ -35,3 +35,27 @@ function unlock(rl::ReentrantLock) end return rl end + +type Semaphore + sem_size::Int + curr_cnt::Int + cond_wait::Condition + Semaphore(sem_size) = new(sem_size, 0, Condition()) +end + +function acquire(s::Semaphore) + while true + if s.curr_cnt < s.sem_size + s.curr_cnt = s.curr_cnt + 1 + return + else + wait(s.cond_wait) + end + end +end + +function release(s::Semaphore) + s.curr_cnt = s.curr_cnt - 1 + notify(s.cond_wait; all=false) +end + diff --git a/base/managers.jl b/base/managers.jl index 536feade31f61..690eeda862c1c 100644 --- a/base/managers.jl +++ b/base/managers.jl @@ -211,6 +211,8 @@ end immutable DefaultClusterManager <: ClusterManager end +const tunnel_hosts_map = Dict{AbstractString, Semaphore}() + function connect(manager::ClusterManager, pid::Int, config::WorkerConfig) if !isnull(config.connect_at) # this is a worker-to-worker setup call. @@ -245,8 +247,18 @@ function connect(manager::ClusterManager, pid::Int, config::WorkerConfig) end if tunnel + if !haskey(tunnel_hosts_map, pubhost) + tunnel_hosts_map[pubhost] = Semaphore(get(config.max_parallel, typemax(Int))) + end + sem = tunnel_hosts_map[pubhost] + sshflags = get(config.sshflags) - (s, bind_addr) = connect_to_worker(pubhost, bind_addr, port, user, sshflags) + acquire(sem) + try + (s, bind_addr) = connect_to_worker(pubhost, bind_addr, port, user, sshflags) + finally + release(sem) + end else (s, bind_addr) = connect_to_worker(bind_addr, port) end diff --git a/base/multi.jl b/base/multi.jl index 37ead0f6398c2..52b2c270a7c5b 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -1,47 +1,5 @@ # This file is a part of Julia. License is MIT: http://julialang.org/license -## multi.jl - multiprocessing -## -## julia starts with one process, and processors can be added using: -## addprocs(n) using exec -## addprocs({"host1","host2",...}) using remote execution -## -## remotecall(w, func, args...) - -## tell a worker to call a function on the given arguments. -## returns a RemoteRef to the result. -## -## remote_do(w, f, args...) - remote function call with no result -## -## wait(rr) - wait for a RemoteRef to be finished computing -## -## fetch(rr) - wait for and get the value of a RemoteRef -## -## remotecall_fetch(w, func, args...) - faster fetch(remotecall(...)) -## -## pmap(func, lst) - -## call a function on each element of lst (some 1-d thing), in -## parallel. -## -## RemoteRef() - create an uninitialized RemoteRef on the local processor -## -## RemoteRef(p) - ...or on a particular processor -## -## put!(r, val) - store a value to an uninitialized RemoteRef -## -## @spawn expr - -## evaluate expr somewhere. returns a RemoteRef. all variables in expr -## are copied to the remote processor. -## -## @spawnat p expr - @spawn specifying where to run -## -## @async expr - -## run expr as an asynchronous task on the local processor -## -## @parallel (r) for i=1:n ... end - -## parallel loop. the results from each iteration are reduced using (r). -## -## @everywhere expr - run expr everywhere. - # todo: # * fetch/wait latency seems to be excessive # * message aggregation @@ -121,13 +79,14 @@ type WorkerConfig end end -@enum WorkerState W_RUNNING W_TERMINATING W_TERMINATED +@enum WorkerState W_CREATED W_RUNNING W_TERMINATING W_TERMINATED type Worker id::Int del_msgs::Array{Any,1} add_msgs::Array{Any,1} gcflag::Bool state::WorkerState + c_state::Condition # wait for state changes r_stream::AsyncStream w_stream::AsyncStream @@ -140,6 +99,8 @@ type Worker w.w_stream = buffer_writes(w_stream) w.manager = manager w.config = config + set_worker_state(w, W_RUNNING) + register_worker_streams(w) w end @@ -147,12 +108,20 @@ type Worker if haskey(map_pid_wrkr, id) return map_pid_wrkr[id] end - new(id, [], [], false, W_RUNNING) + w=new(id, [], [], false, W_CREATED, Condition()) + register_worker(w) + w end + + Worker() = Worker(get_next_pid()) end Worker(id, r_stream, w_stream, manager) = Worker(id, r_stream, w_stream, manager, WorkerConfig()) +function set_worker_state(w, state) + w.state = state + notify(w.c_state) +end function send_msg_now(w::Worker, kind, args...) send_msg_(w, kind, args, true) @@ -304,7 +273,7 @@ function rmprocs(args...; waitfor = 0.0) else if haskey(map_pid_wrkr, i) w = map_pid_wrkr[i] - w.state = W_TERMINATING + set_worker_state(w, W_TERMINATING) kill(w.manager, i, w.config) push!(rmprocset, w) end @@ -362,10 +331,11 @@ register_worker(w) = register_worker(PGRP, w) function register_worker(pg, w) push!(pg.workers, w) map_pid_wrkr[w.id] = w - if isa(w, Worker) - map_sock_wrkr[w.r_stream] = w - map_sock_wrkr[w.w_stream] = w - end +end + +function register_worker_streams(w) + map_sock_wrkr[w.r_stream] = w + map_sock_wrkr[w.w_stream] = w end deregister_worker(pid) = deregister_worker(PGRP, pid) @@ -808,156 +778,169 @@ function accept_handler(server::TCPServer, status::Int32) process_messages(client, client) end -function process_messages(r_stream::TCPSocket, w_stream::TCPSocket; kwargs...) - @schedule begin +process_messages(r_stream::TCPSocket, w_stream::TCPSocket) = process_messages(r_stream, w_stream, nothing) +process_messages(r_stream::TCPSocket, w_stream::TCPSocket, rr_ntfy_join) = @schedule process_tcp_streams(r_stream, w_stream, rr_ntfy_join) + +function process_tcp_streams(r_stream::TCPSocket, w_stream::TCPSocket, rr_ntfy_join) disable_nagle(r_stream) wait_connected(r_stream) if r_stream != w_stream disable_nagle(w_stream) wait_connected(w_stream) end - create_message_handler_loop(r_stream, w_stream; kwargs...) - end + message_handler_loop(r_stream, w_stream, rr_ntfy_join) end -function process_messages(r_stream::AsyncStream, w_stream::AsyncStream; kwargs...) - create_message_handler_loop(r_stream, w_stream; kwargs...) -end +process_messages(r_stream::AsyncStream, w_stream::AsyncStream) = process_messages(r_stream, w_stream, nothing) +process_messages(r_stream::AsyncStream, w_stream::AsyncStream, rr_ntfy_join) = @schedule message_handler_loop(r_stream, w_stream, rr_ntfy_join) -function create_message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStream; ntfy_join_complete=nothing) #returns immediately - @schedule begin - global PGRP - global cluster_manager +function message_handler_loop(r_stream::AsyncStream, w_stream::AsyncStream, rr_ntfy_join) + global PGRP + global cluster_manager - try - while true - msg = deserialize(r_stream) - #println("got msg: ",msg) - # handle message - if is(msg, :call) - id = deserialize(r_stream) - #print("$(myid()) got id $id\n") - f0 = deserialize(r_stream) - #print("$(myid()) got call $f0\n") - args0 = deserialize(r_stream) - #print("$(myid()) got args $args0\n") - let f=f0, args=args0 - schedule_call(id, ()->f(args...)) - end - elseif is(msg, :call_fetch) - id = deserialize(r_stream) - f = deserialize(r_stream) - args = deserialize(r_stream) - let f=f, args=args, id=id, msg=msg - @schedule begin - v = run_work_thunk(()->f(args...)) - deliver_result(w_stream, msg, id, v) - v - end + try + while true + msg = deserialize(r_stream) + #println("got msg: ",msg) + # handle message + if is(msg, :call) + id = deserialize(r_stream) + #print("$(myid()) got id $id\n") + f0 = deserialize(r_stream) + #print("$(myid()) got call $f0\n") + args0 = deserialize(r_stream) + #print("$(myid()) got args $args0\n") + let f=f0, args=args0 + schedule_call(id, ()->f(args...)) + end + elseif is(msg, :call_fetch) + id = deserialize(r_stream) + f = deserialize(r_stream) + args = deserialize(r_stream) + let f=f, args=args, id=id, msg=msg + @schedule begin + v = run_work_thunk(()->f(args...)) + deliver_result(w_stream, msg, id, v) + v end - elseif is(msg, :call_wait) - id = deserialize(r_stream) - notify_id = deserialize(r_stream) - f = deserialize(r_stream) - args = deserialize(r_stream) - let f=f, args=args, id=id, msg=msg, notify_id=notify_id - @schedule begin - rv = schedule_call(id, ()->f(args...)) - deliver_result(w_stream, msg, notify_id, wait_full(rv)) - end + end + elseif is(msg, :call_wait) + id = deserialize(r_stream) + notify_id = deserialize(r_stream) + f = deserialize(r_stream) + args = deserialize(r_stream) + let f=f, args=args, id=id, msg=msg, notify_id=notify_id + @schedule begin + rv = schedule_call(id, ()->f(args...)) + deliver_result(w_stream, msg, notify_id, wait_full(rv)) end - elseif is(msg, :do) - f = deserialize(r_stream) - args = deserialize(r_stream) - #print("got args: $args\n") - let f=f, args=args - @schedule begin - run_work_thunk(RemoteValue(), ()->f(args...)) - end + end + elseif is(msg, :do) + f = deserialize(r_stream) + args = deserialize(r_stream) + #print("got args: $args\n") + let f=f, args=args + @schedule begin + run_work_thunk(RemoteValue(), ()->f(args...)) end - elseif is(msg, :result) - # used to deliver result of wait or fetch - oid = deserialize(r_stream) - #print("$(myid()) got $msg $oid\n") - val = deserialize(r_stream) - put!(lookup_ref(oid), val) - elseif is(msg, :identify_socket) - otherid = deserialize(r_stream) - register_worker(Worker(otherid, r_stream, w_stream, cluster_manager)) - elseif is(msg, :join_pgrp) - self_pid = LPROC.id = deserialize(r_stream) - locs = deserialize(r_stream) - self_is_local = deserialize(r_stream) - controller = Worker(1, r_stream, w_stream, cluster_manager) - register_worker(controller) - register_worker(LPROC) - - for (connect_at, rpid, r_is_local) in locs - if (rpid < self_pid) && (!(rpid == 1)) - # Connect processes with lower pids - wconfig = WorkerConfig() - wconfig.connect_at = connect_at - wconfig.environ = AnyDict(:self_is_local=>self_is_local, :r_is_local=>r_is_local) - - (r_s, w_s) = connect(cluster_manager, rpid, wconfig) - w = Worker(rpid, r_s, w_s, cluster_manager, wconfig) - register_worker(w) - process_messages(w.r_stream, w.w_stream) - send_msg_now(w, :identify_socket, self_pid) - else - # Processes with higher pids connect to us. Don't do anything just yet - continue - end + end + elseif is(msg, :result) + # used to deliver result of wait or fetch + oid = deserialize(r_stream) + #print("$(myid()) got $msg $oid\n") + val = deserialize(r_stream) + put!(lookup_ref(oid), val) + elseif is(msg, :identify_socket) + otherid = deserialize(r_stream) + Worker(otherid, r_stream, w_stream, cluster_manager) # The constructor registers the worker + + elseif is(msg, :join_pgrp) + self_pid = LPROC.id = deserialize(r_stream) + locs = deserialize(r_stream) + self_is_local = deserialize(r_stream) + controller = Worker(1, r_stream, w_stream, cluster_manager) + register_worker(LPROC) + + wait_tasks = Task[] + + for (connect_at, rpid, r_is_local) in locs + wconfig = WorkerConfig() + wconfig.connect_at = connect_at + wconfig.environ = AnyDict(:self_is_local=>self_is_local, :r_is_local=>r_is_local) + + let rpid=rpid, wconfig=wconfig + t = @async connect_to_peer(cluster_manager, rpid, wconfig) + push!(wait_tasks, t) end + end - send_msg_now(controller, :join_complete, Sys.CPU_CORES, getpid()) + for wt in wait_tasks; wait(wt); end - elseif is(msg, :join_complete) - w = map_sock_wrkr[r_stream] + send_msg_now(controller, :join_complete, Sys.CPU_CORES, getpid()) - environ = get(w.config.environ, Dict()) - environ[:cpu_cores] = deserialize(r_stream) - w.config.environ = environ + elseif is(msg, :join_complete) + w = map_sock_wrkr[r_stream] - w.config.ospid = deserialize(r_stream) + environ = get(w.config.environ, Dict()) + environ[:cpu_cores] = deserialize(r_stream) + w.config.environ = environ - put!(ntfy_join_complete, w.id) - ntfy_join_complete = nothing # so that it gets gc'ed - end + w.config.ospid = deserialize(r_stream) - end # end of while - catch e - iderr = worker_id_from_socket(r_stream) - werr = worker_from_id(iderr) - oldstate = werr.state - werr.state = W_TERMINATED - - # If error occured talking to pid 1, commit harakiri - if iderr == 1 - if isopen(w_stream) - print(STDERR, "fatal error on ", myid(), ": ") - display_error(e, catch_backtrace()) - end - exit(1) + put!(rr_ntfy_join, w.id) + rr_ntfy_join = nothing # so that it gets gc'ed end - # Will treat any exception as death of node and cleanup - # since currently we do not have a mechanism for workers to reconnect - # to each other on unhandled errors - deregister_worker(iderr) + end # end of while + catch e + iderr = worker_id_from_socket(r_stream) + werr = worker_from_id(iderr) + oldstate = werr.state + set_worker_state(werr, W_TERMINATED) - if isopen(r_stream) close(r_stream) end - if isopen(w_stream) close(w_stream) end - if (myid() == 1) - if oldstate != W_TERMINATING - println(STDERR, "Worker $iderr terminated.") - rethrow(e) - end + # If error occured talking to pid 1, commit harakiri + if iderr == 1 + if isopen(w_stream) + print(STDERR, "fatal error on ", myid(), ": ") + display_error(e, catch_backtrace()) end + exit(1) + end - return nothing + # Will treat any exception as death of node and cleanup + # since currently we do not have a mechanism for workers to reconnect + # to each other on unhandled errors + deregister_worker(iderr) + + if isopen(r_stream) close(r_stream) end + if isopen(w_stream) close(w_stream) end + + if (myid() == 1) + if oldstate != W_TERMINATING + println(STDERR, "Worker $iderr terminated.") + rethrow(e) + end + end + + return nothing + end +end + +function connect_to_peer(manager::ClusterManager, rpid::Int, wconfig::WorkerConfig) + try + (r_s, w_s) = connect(manager, rpid, wconfig) + w = Worker(rpid, r_s, w_s, manager, wconfig) + process_messages(w.r_stream, w.w_stream) + send_msg_now(w, :identify_socket, myid()) + + # test connectivity with an echo + if remotecall_fetch(rpid, ()->:ok) != :ok + throw("ping test with remote peer failed") end + catch e + println(STDERR, "Error [$e] on $(myid()) while connecting to peer $rpid. Exiting.") + exit(1) end end @@ -1087,22 +1070,8 @@ function addprocs(manager::ClusterManager; kwargs...) # References to launched workers, filled when each worker is fully initialized and # has connected to all nodes. - rr_launched = RemoteRef[] # Asynchronously filled by the launch method - - start_cluster_workers(manager, params, rr_launched) - - # Wait for all workers to be fully connected - sort!([fetch(rr) for rr in rr_launched]) -end - - -default_addprocs_params() = AnyDict( - :dir => pwd(), - :exename => joinpath(JULIA_HOME,julia_exename()), - :exeflags => ``) + launched_q = Int[] # Asynchronously filled by the launch method - -function start_cluster_workers(manager, params, rr_launched) # The `launch` method should add an object of type WorkerConfig for every # worker launched. It provides information required on how to connect # to it. @@ -1112,137 +1081,102 @@ function start_cluster_workers(manager, params, rr_launched) # call manager's `launch` is a separate task. This allows the master # process initiate the connection setup process as and when workers come # online - t = @schedule try - launch(manager, params, launched, launch_ntfy) - catch e - print(STDERR, "Error launching workers with $(typeof(manager)) : $e\n") - end + t_launch = @schedule launch(manager, params, launched, launch_ntfy) - # When starting workers on remote multi-core hosts, `launch` can (optionally) start only one - # process on the remote machine, with a request to start additional workers of the - # same type. This is done by setting an appropriate value to `WorkerConfig.cnt`. - workers_with_additional = [] # List of workers with additional on-host workers requested - - while true - if length(launched) == 0 - if istaskdone(t) - break + @sync begin + while true + if length(launched) == 0 + istaskdone(t_launch) && break + @schedule (sleep(1); notify(launch_ntfy)) + wait(launch_ntfy) end - @schedule (sleep(1); notify(launch_ntfy)) - wait(launch_ntfy) - end - if length(launched) > 0 - wconfig = shift!(launched) - w = connect_n_create_worker(manager, get_next_pid(), wconfig) - rr = setup_worker(PGRP, w) - cnt = get(w.config.count, 1) - if (cnt == :auto) || (cnt > 1) - push!(workers_with_additional, (w, rr)) + if (length(launched) > 0) + wconfig = shift!(launched) + let wconfig=wconfig + @async setup_launched_worker(manager, wconfig, launched_q) + end end - - push!(rr_launched, rr) end end - # Perform the launch of additional workers in parallel. - additional_workers = [] # List of workers launched via the "additional" method - @sync begin - for (w, rr) in workers_with_additional - let w=w, rr=rr - @async begin - wait(rr) # :cpu_cores below is set only after we get a setup complete - # message from the new worker. - cnt = get(w.config.count) - if cnt == :auto - cnt = get(w.config.environ)[:cpu_cores] - end - cnt = cnt - 1 # Removing self from the requested number + wait(t_launch) # catches any thrown errors from the launch task - exename = get(w.config.exename) - exeflags = get(w.config.exeflags, ``) - cmd = `$exename $exeflags` + sort!(launched_q) +end - npids = [get_next_pid() for x in 1:cnt] - new_workers = remotecall_fetch(w.id, launch_additional, cnt, npids, cmd) - push!(additional_workers, (w, new_workers)) - end - end - end + +default_addprocs_params() = AnyDict( + :dir => pwd(), + :exename => joinpath(JULIA_HOME,julia_exename()), + :exeflags => ``) + + +function setup_launched_worker(manager, wconfig, launched_q) + pid = create_worker(manager, wconfig) + push!(launched_q, pid) + + # When starting workers on remote multi-core hosts, `launch` can (optionally) start only one + # process on the remote machine, with a request to start additional workers of the + # same type. This is done by setting an appropriate value to `WorkerConfig.cnt`. + cnt = get(wconfig.count, 1) + if cnt == :auto + cnt = get(wconfig.environ)[:cpu_cores] end + cnt = cnt - 1 # Removing self from the requested number - # connect each of the additional workers with each other - process_additional(additional_workers, rr_launched) + if cnt > 0 + launch_n_additional_processes(manager, pid, wconfig, cnt, launched_q) + end end -function process_additional(additional_workers, rr_launched::Array) - # keyword argument `max_parallel` is only relevant for concurrent ssh connections to a unique host - # Post launch, ssh from master to workers is used only if tunnel is true - while length(additional_workers) > 0 - all_new_workers=[] - for (w_initial, new_workers) in additional_workers - num_new_w = length(new_workers) - tunnel = get(w_initial.config.tunnel, false) - maxp = get(w_initial.config.max_parallel, 0) +function launch_n_additional_processes(manager, frompid, fromconfig, cnt, launched_q) + @sync begin + exename = get(fromconfig.exename) + exeflags = get(fromconfig.exeflags, ``) + cmd = `$exename $exeflags` - if tunnel && (maxp > 0) - num_in_p = min(maxp, num_new_w) - else - num_in_p = num_new_w # Do not rate-limit connect - end + new_addresses = remotecall_fetch(frompid, launch_additional, cnt, cmd) + for address in new_addresses + (bind_addr, port) = address - for i in 1:num_in_p - (pid, bind_addr, port) = shift!(new_workers) + wconfig = WorkerConfig() + for x in [:host, :tunnel, :sshflags, :exeflags, :exename] + setfield!(wconfig, x, getfield(fromconfig, x)) + end + wconfig.bind_addr = bind_addr + wconfig.port = port - wconfig = WorkerConfig() - for x in [:host, :tunnel, :sshflags, :exeflags, :exename] - setfield!(wconfig, x, getfield(w_initial.config, x)) + let wconfig=wconfig + @async begin + pid = create_worker(manager, wconfig) + remote_do(frompid, redirect_output_from_additional_worker, pid, port) + push!(launched_q, pid) end - wconfig.bind_addr = bind_addr - wconfig.port = port - - new_w = connect_n_create_worker(w_initial.manager, pid, wconfig) - push!(all_new_workers, new_w) end end - - rr_list=[] - for new_w in all_new_workers - rr=setup_worker(PGRP, new_w) - push!(rr_list, rr) - push!(rr_launched, rr) - end - - # It is important to wait for all of newly launched workers to finish - # connection setup, so that all the workers are aware of all other workers - [wait(rr) for rr in rr_list] - - filter!(x->((w_initial, new_workers) = x; length(new_workers) > 0), additional_workers) end end -function connect_n_create_worker(manager, pid, wconfig) +function create_worker(manager, wconfig) + # only node 1 can add new nodes, since nobody else has the full list of address:port + assert(LPROC.id == 1) + # initiate a connect. Does not wait for connection completion in case of TCP. - (r_s, w_s) = connect(manager, pid, wconfig) + w = Worker() - w = Worker(pid, r_s, w_s, manager, wconfig) - register_worker(w) + (r_s, w_s) = connect(manager, w.id, wconfig) + w = Worker(w.id, r_s, w_s, manager, wconfig) # install a finalizer to perform cleanup if necessary finalizer(w, (w)->if myid() == 1 manage(w.manager, w.id, w.config, :finalize) end) - w -end - -function setup_worker(pg::ProcessGroup, w) - # only node 1 can add new nodes, since nobody else has the full list of address:port - assert(LPROC.id == 1) # set when the new worker has finshed connections with all other workers - rr_join = RemoteRef() + rr_ntfy_join = RemoteRef() # Start a new task to handle inbound messages from connected worker in master. # Also calls `wait_connected` on TCP streams. - process_messages(w.r_stream, w.w_stream; ntfy_join_complete=rr_join) + process_messages(w.r_stream, w.w_stream, rr_ntfy_join) # send address information of all workers to the new worker. # Cluster managers set the address of each worker in `WorkerConfig.connect_at`. @@ -1254,20 +1188,36 @@ function setup_worker(pg::ProcessGroup, w) # - On each worker # - each worker sends a :identify_socket to all workers less than its pid # - each worker then sends a :join_complete back to the master along with its OS_PID and NUM_CORES - # - once master receives a :join_complete it triggers rr_join (signifies that worker setup is complete) - all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id, isa(x.manager, LocalManager)) : ((), x.id, true), pg.workers) + # - once master receives a :join_complete it triggers rr_ntfy_join (signifies that worker setup is complete) + + # need to wait for lower worker pids to have completed connecting, since the numerical value + # of pids is relevant to the connection process, i.e., higher pids connect to lower pids and they + # require the value of config.connect_at which is set only upon connection completion + + lower_wlist = filter(x -> (x.id != 1) && (x.id < w.id) && (x.state == W_CREATED), PGRP.workers) + for wl in lower_wlist + if wl.state == W_CREATED + wait(wl.c_state) + end + end + + # filter list to workers in a running state + join_list = filter(x -> (x.id != 1) && (x.id < w.id) && (x.state==W_RUNNING), PGRP.workers) + + all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id, isa(x.manager, LocalManager)) : ((), x.id, true), join_list) send_msg_now(w, :join_pgrp, w.id, all_locs, isa(w.manager, LocalManager)) @schedule manage(w.manager, w.id, w.config, :register) - rr_join + wait(rr_ntfy_join) + w.id end # Called on the first worker on a remote host. Used to optimize launching # of multiple workers on a remote host (to leverage multi-core) -function launch_additional(np::Integer, pids::Array, cmd::Cmd) - assert(np == length(pids)) +additional_io_objs=Dict() +function launch_additional(np::Integer, cmd::Cmd) io_objs = cell(np) addresses = cell(np) @@ -1278,16 +1228,20 @@ function launch_additional(np::Integer, pids::Array, cmd::Cmd) for (i,io) in enumerate(io_objs) (host, port) = read_worker_host_port(io) - addresses[i] = (pids[i], host, port) - - let io=io, pid=pids[i] - redirect_worker_output("$pid", io) - end + addresses[i] = (host, port) + additional_io_objs[port] = io end addresses end +function redirect_output_from_additional_worker(pid, port) + io = additional_io_objs[port] + redirect_worker_output("$pid", io) + delete!(additional_io_objs, port) + nothing +end + ## higher-level functions: spawn, pmap, pfor, etc. ## let nextidx = 0 diff --git a/base/precompile.jl b/base/precompile.jl index 09c25bb518a1a..1ddb88b68a5e9 100644 --- a/base/precompile.jl +++ b/base/precompile.jl @@ -493,6 +493,28 @@ precompile(Base.string, (ASCIIString, UTF8String, Char)) precompile(Base.string, (ASCIIString, ASCIIString, Int)) precompile(Base.vect, (Base.LineEdit.Prompt, ASCIIString)) + +# Speeding up addprocs for LocalManager +precompile(Base.start_worker, ()) +precompile(Base.start_worker, (Base.TTY,)) +precompile(Base.process_messages, (Base.TCPSocket, Base.TCPSocket)) +precompile(Base.process_messages, (Base.TCPSocket, Base.TCPSocket, Void)) +precompile(Base.process_tcp_streams, (Base.TCPSocket, Base.TCPSocket, Void)) + +precompile(Base.message_handler_loop, (Base.TCPSocket, Base.TCPSocket, Void)) + +precompile(Base.connect_to_peer, (Base.LocalManager, Int64, Base.WorkerConfig)) +precompile(Base.connect, (Base.LocalManager, Int64, Base.WorkerConfig)) +precompile(Base.connect_w2w, (Int64, Base.WorkerConfig)) + +precompile(Base.connect_to_worker, (UTF8String, Int64)) + +precompile(Base.addprocs, (Base.LocalManager, )) +precompile(Base.addprocs, (Int, )) +precompile(Base.setup_launched_worker, (Base.LocalManager, Dict, Base.WorkerConfig, Array{Int,1})) +precompile(Base.create_worker, (Base.LocalManager, Dict, Base.WorkerConfig)) +precompile(Base.launch, (Base.LocalManager, Dict, Array{Base.WorkerConfig, 1}, Base.Condition)) + # Speed up repl help if Base.USE_GPL_LIBS sprint(Markdown.term, @doc fft) diff --git a/test/parallel.jl b/test/parallel.jl index 2a0eda2e7cc33..83d3ca6ab4c1d 100644 --- a/test/parallel.jl +++ b/test/parallel.jl @@ -259,6 +259,7 @@ if Bool(parse(Int,(get(ENV, "JULIA_TESTFULL", "0")))) catch e print("p : $p\n") print("newpids : $new_pids\n") + print("w_in_remote : $w_in_remote\n") print("intersect : $(intersect(new_pids, w_in_remote))\n\n\n") rethrow(e) end @@ -296,8 +297,8 @@ if Bool(parse(Int,(get(ENV, "JULIA_TESTFULL", "0")))) test_n_remove_pids(new_pids) print("\nssh addprocs with tunnel\n") - new_pids = sort(remotecall_fetch(1, (h, sf) -> addprocs(h; tunnel=true, sshflags=sf), [("localhost", 9)], sshflags)) - @test length(new_pids) == 9 + new_pids = sort(remotecall_fetch(1, (h, sf) -> addprocs(h; tunnel=true, sshflags=sf), [("localhost", num_workers)], sshflags)) + @test length(new_pids) == num_workers test_n_remove_pids(new_pids) end