diff --git a/base/client.jl b/base/client.jl index 364a02edcf33b..9ecd4daabe2f6 100644 --- a/base/client.jl +++ b/base/client.jl @@ -171,8 +171,27 @@ end # try to include() a file, ignoring if not found try_include(path::String) = isfile(path) && include(path) +function init_bind_addr(args::Vector{UTF8String}) + # Treat --bind-to in a position independent manner in ARGS since + # --worker, -n and --machinefile options are affected by it + btoidx = findfirst(args, "--bind-to") + if btoidx > 0 + bind_addr = parseip(args[btoidx+1]) + else + try + bind_addr = getipaddr() + catch + # All networking is unavailable, initialize bind_addr to the loopback address + # Will cause an exception to be raised only when used. + bind_addr = ip"127.0.0.1" + end + end + global LPROC + LPROC.bind_addr = bind_addr +end + + function process_options(args::Vector{UTF8String}) - global bind_addr quiet = false repl = true startup = true @@ -186,8 +205,7 @@ function process_options(args::Vector{UTF8String}) start_worker() # doesn't return elseif args[i]=="--bind-to" - i += 1 - bind_addr = args[i] + i+=1 # has already been processed elseif args[i]=="-e" || args[i]=="--eval" repl = false i+=1 @@ -322,6 +340,7 @@ function _start() early_init() try + init_bind_addr(ARGS) any(a->(a=="--worker"), ARGS) || init_head_sched() init_load_path() (quiet,repl,startup,color_set,no_history_file) = process_options(copy(ARGS)) diff --git a/base/multi.jl b/base/multi.jl index abc16b30c8c9d..72da022b84dc1 100644 --- a/base/multi.jl +++ b/base/multi.jl @@ -87,14 +87,6 @@ end abstract ClusterManager -type WorkerLocalInfo - # Currently only one field, in the future we may add more fields like - # local OS pid, system info like RAM, CPU type, etc. - # - privipaddr::IpAddr - WorkerLocalInfo() = new(getipaddr()) -end - type Worker host::ByteString port::Uint16 @@ -104,21 +96,26 @@ type Worker add_msgs::Array{Any,1} id::Int gcflag::Bool - privhost::ByteString + bind_addr::IpAddr manage::Function config::Dict - winfo::WorkerLocalInfo Worker(host::String, port::Integer, sock::TcpSocket, id::Int) = - new(bytestring(host), uint16(port), sock, IOBuffer(), {}, {}, id, false, "") + new(bytestring(host), uint16(port), sock, IOBuffer(), {}, {}, id, false) end Worker(host::String, port::Integer, sock::TcpSocket) = Worker(host, port, sock, 0) -Worker(host::String, port::Integer) = - Worker(host, port, connect(host,uint16(port))) -Worker(host::String, privhost::String, port::Integer, tunnel_user::String, sshflags) = - Worker(host, port, connect("localhost", - ssh_tunnel(tunnel_user, host, privhost, uint16(port), sshflags))) +function Worker(host::String, port::Integer) + w = Worker(host, port, connect(host,uint16(port))) + w.bind_addr = getaddrinfo(host) + w +end +function Worker(host::String, bind_addr::String, port::Integer, tunnel_user::String, sshflags) + w = Worker(host, port, connect("localhost", + ssh_tunnel(tunnel_user, host, bind_addr, uint16(port), sshflags))) + w.bind_addr = parseip(bind_addr) + w +end function send_msg_now(w::Worker, kind, args...) @@ -182,10 +179,11 @@ end type LocalProcess id::Int - winfo::WorkerLocalInfo + bind_addr::IpAddr + LocalProcess() = new() end -const LPROC = LocalProcess(1, WorkerLocalInfo()) +const LPROC = LocalProcess() const map_pid_wrkr = Dict{Int, Union(Worker, LocalProcess)}() const map_sock_wrkr = ObjectIdDict() @@ -211,17 +209,18 @@ type ProcessGroup end const PGRP = ProcessGroup({}) -getprivipaddr(pid::Integer) = getprivipaddr(worker_from_id(pid)) -getprivipaddr(w::Union(Worker, LocalProcess)) = fetchwinfo(w).privipaddr - -fetchwinfo(pid::Integer) = fetchwinfo(worker_from_id(pid)) -function fetchwinfo(w::Union(Worker, LocalProcess)) - # retrieve and cache upon first access - if !isdefined(w, :winfo) - w.winfo = remotecall_fetch(w.id, fetchwinfo, w.id) +get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid)) +function get_bind_addr(w::Union(Worker, LocalProcess)) + if !isdefined(w, :bind_addr) + if w.id != myid() + w.bind_addr = remotecall_fetch(w.id, get_bind_addr, w.id) + else + error("LPROC.bind_addr not defined") # Should never happend since LPROC.bind_addr + # is defined early on during process init. + end end - w.winfo -end + w.bind_addr +end function add_workers(pg::ProcessGroup, ws::Array{Any,1}) # NOTE: currently only node 1 can add new nodes, since nobody else @@ -232,10 +231,11 @@ function add_workers(pg::ProcessGroup, ws::Array{Any,1}) register_worker(w) create_message_handler_loop(w.socket) end - all_locs = map(x -> isa(x, Worker) ? (x.privhost, x.port, x.id) : ("", 0, x.id), pg.workers) + + all_locs = map(x -> isa(x, Worker) ? (string(x.bind_addr), x.port, x.id, x.manage == manage_local_worker) : ("", 0, x.id, true), pg.workers) for w in ws - send_msg_now(w, :join_pgrp, w.id, all_locs) + send_msg_now(w, :join_pgrp, w.id, all_locs, w.manage == manage_local_worker) end for w in ws @schedule begin @@ -255,8 +255,16 @@ end procs() = Int[x.id for x in PGRP.workers] function procs(pid::Integer) - ipatpid = getprivipaddr(pid) - Int[x.id for x in filter(w -> getprivipaddr(w) == ipatpid, PGRP.workers)] + if myid() == 1 + if (pid == 1) || (map_pid_wrkr[pid].manage == manage_local_worker) + Int[x.id for x in filter(w -> (w.id==1) || (w.manage == manage_local_worker), PGRP.workers)] + else + ipatpid = get_bind_addr(pid) + Int[x.id for x in filter(w -> get_bind_addr(w) == ipatpid, PGRP.workers)] + end + else + remotecall_fetch(1, procs, pid) + end end function workers() @@ -854,16 +862,24 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately # first connection; get process group info from client self_pid = LPROC.id = deserialize(sock) locs = deserialize(sock) + self_is_local = deserialize(sock) #print("\nLocation: ",locs,"\nId:",myid(),"\n") # joining existing process group register_worker(Worker("", 0, sock, 1)) register_worker(LPROC) - for (rhost, rport, rpid) in locs + for (rhost, rport, rpid, r_is_local) in locs if (rpid < self_pid) && (!(rpid == 1)) # Connect to them - w = Worker(rhost, rport) + if self_is_local && r_is_local + # If on localhost, use the loopback address - this addresses + # the special case of system suspend wherein the local ip + # may be changed upon system awake. + w = Worker("127.0.0.1", rport) + else + w = Worker(rhost, rport) + end w.id = rpid register_worker(w) create_message_handler_loop(w.socket) @@ -919,8 +935,6 @@ end # argument is descriptor to write listening port # to. start_worker() = start_worker(STDOUT) function start_worker(out::IO) - global bind_addr - # we only explicitly monitor worker STDOUT on the console, so redirect # stderr to stdout so we can see the output. # at some point we might want some or all worker output to go to log @@ -929,14 +943,11 @@ function start_worker(out::IO) # exit when process 1 shut down. Don't yet know why. #redirect_stderr(STDOUT) - if !isdefined(Base,:bind_addr) - bind_addr = getipaddr() - end (actual_port,sock) = listenany(uint16(9009)) sock.ccb = accept_handler print(out, "julia_worker:") # print header print(out, "$(dec(actual_port))#") # print port - print(out, bind_addr) #TODO: print hostname + print(out, LPROC.bind_addr) print(out, '\n') flush(out) # close STDIN; workers will not use it @@ -973,8 +984,8 @@ function start_cluster_workers(np::Integer, config::Dict, cman::ClusterManager) elseif insttype == :io_host read_cb_response(inst) = begin - (priv_hostname, port) = read_worker_host_port(inst[1]) - inst[1], priv_hostname, port, inst[2], inst[3] + (bind_addr, port) = read_worker_host_port(inst[1]) + inst[1], bind_addr, port, inst[2], inst[3] end elseif insttype == :io_host_port read_cb_response(inst) = (inst[1], inst[2], inst[3], inst[2], inst[4]) @@ -985,8 +996,8 @@ function start_cluster_workers(np::Integer, config::Dict, cman::ClusterManager) end for i=1:np - (io, privhost, port, pubhost, wconfig) = read_cb_response(instances[i]) - ws[i] = create_worker(privhost, port, pubhost, io, wconfig, cman.manage) + (io, bind_addr, port, pubhost, wconfig) = read_cb_response(instances[i]) + ws[i] = create_worker(bind_addr, port, pubhost, io, wconfig, cman.manage) end ws end @@ -995,14 +1006,14 @@ function read_worker_host_port(io::IO) io.line_buffered = true while true conninfo = readline(io) - private_hostname, port = parse_connection_info(conninfo) - if private_hostname != "" - return private_hostname, port + bind_addr, port = parse_connection_info(conninfo) + if bind_addr != "" + return bind_addr, port end end end -function create_worker(privhost, port, pubhost, stream, config, manage) +function create_worker(bind_addr, port, pubhost, stream, config, manage) tunnel = config[:tunnel] s = split(pubhost,'@') @@ -1020,12 +1031,11 @@ function create_worker(privhost, port, pubhost, stream, config, manage) if tunnel sshflags = config[:sshflags] - w = Worker(pubhost, privhost, port, user, sshflags) + w = Worker(pubhost, bind_addr, port, user, sshflags) else - w = Worker(pubhost, port) + w = Worker(bind_addr, port) end - w.privhost = privhost w.config = config w.manage = manage @@ -1060,10 +1070,10 @@ end tunnel_port = 9201 # establish an SSH tunnel to a remote worker # returns P such that localhost:P connects to host:port -function ssh_tunnel(user, host, privhost, port, sshflags) +function ssh_tunnel(user, host, bind_addr, port, sshflags) global tunnel_port localp = tunnel_port::Int - while !success(detach(`ssh -f -o ExitOnForwardFailure=yes $sshflags $(user)@$host -L $localp:$privhost:$(int(port)) sleep 60`)) && localp < 10000 + while !success(detach(`ssh -f -o ExitOnForwardFailure=yes $sshflags $(user)@$host -L $localp:$bind_addr:$(int(port)) sleep 60`)) && localp < 10000 localp += 1 end @@ -1095,7 +1105,7 @@ function launch_local_workers(cman::LocalManager, np::Integer, config::Dict) # start the processes first... for i in 1:np - io, pobj = readsfrom(detach(`$(dir)/$(exename) --bind-to 127.0.0.1 $exeflags`)) + io, pobj = readsfrom(detach(`$(dir)/$(exename) --bind-to $(LPROC.bind_addr) $exeflags`)) io_objs[i] = io configs[i] = merge(config, {:process => pobj}) end @@ -1123,7 +1133,7 @@ show(io::IO, cman::SSHManager) = println("SSHManager(machines=", cman.machines, function launch_ssh_workers(cman::SSHManager, np::Integer, config::Dict) dir = config[:dir] exename = config[:exename] - exeflags = config[:exeflags] + exeflags_base = config[:exeflags] io_objs = cell(np) configs = cell(np) @@ -1131,22 +1141,35 @@ function launch_ssh_workers(cman::SSHManager, np::Integer, config::Dict) # start the processes first... for i in 1:np - # machine could be of the format [user@]host[:port] - machine_def = split(cman.machines[i], ':') + # machine could be of the format [user@]host[:port] bind_addr + machine_bind = split(cman.machines[i]) + if length(machine_bind) > 1 + exeflags = `--bind-to $(machine_bind[2]) $exeflags_base` + else + exeflags = exeflags_base + end + machine_def = machine_bind[1] + + machine_def = split(machine_def, ':') portopt = length(machine_def) == 2 ? ` -p $(machine_def[2]) ` : `` config[:sshflags] = `$(config[:sshflags]) $portopt` sshflags = config[:sshflags] - cman.machines[i] = machine_def[1] + host = cman.machines[i] = machine_def[1] - io, pobj = readsfrom(detach(`ssh -n $sshflags $(machine_def[1]) "sh -l -c \"cd $dir && $exename $exeflags\""`)) + # Build up the ssh command + cmd = `cd $dir && $exename $exeflags` # launch julia + cmd = `sh -l -c $(shell_escape(cmd))` # shell to launch under + cmd = `ssh -n $sshflags $host $(shell_escape(cmd))` # use ssh to remote launch + + io, pobj = readsfrom(detach(cmd)) io_objs[i] = io configs[i] = merge(config, {:machine => cman.machines[i]}) end # ...and then read the host:port info. This optimizes overall start times. # For ssh, the tunnel connection, if any, has to be with the specified machine name. - # but the port needs to be forwarded to the private hostname/ip-address + # but the port needs to be forwarded to the bound hostname/ip-address return (:io_host, collect(zip(io_objs, cman.machines, configs))) end @@ -1537,3 +1560,20 @@ function disable_nagle(sock) end end end + +function check_same_host(pids) + if myid() != 1 + return remotecall_fetch(1, check_same_host, pids) + else + # We checkfirst if all test pids have been started using the local manager, + # else we check for the same bind_to addr. This handles the special case + # where the local ip address may change - as during a system sleep/awake + if all(p -> (p==1) || (map_pid_wrkr[p].manage == manage_local_worker), pids) + return true + else + first_bind_addr = map_pid_wrkr[pids[1]].bind_addr + return all(p -> (p != 1) && (map_pid_wrkr[p].bind_addr == first_bind_addr), pids[2:end]) + end + end +end + diff --git a/base/sharedarray.jl b/base/sharedarray.jl index ccf0808de298b..8fea97e352737 100644 --- a/base/sharedarray.jl +++ b/base/sharedarray.jl @@ -33,7 +33,11 @@ function SharedArray(T::Type, dims::NTuple; init=false, pids=Int[]) pids = procs(myid()) onlocalhost = true else - onlocalhost = assert_same_host(pids) + if !check_same_host(pids) + error("SharedArray requires all requested processes to be on the same machine.") + end + + onlocalhost = myid() in procs(pids[1]) end local shm_seg_name = "" @@ -348,13 +352,3 @@ end @unix_only shm_open(shm_seg_name, oflags, permissions) = ccall(:shm_open, Int, (Ptr{Uint8}, Int, Int), shm_seg_name, oflags, permissions) -function assert_same_host(pids) - first_privip = getprivipaddr(pids[1]) - if !all(x -> getprivipaddr(x) == first_privip, pids) - error("SharedArray requires all requested processes to be on the same machine.") - end - - return myid() in procs(pids[1]) -end - - diff --git a/doc/manual/getting-started.rst b/doc/manual/getting-started.rst index 0ed2a6863fa4d..13e5cc84d44ca 100644 --- a/doc/manual/getting-started.rst +++ b/doc/manual/getting-started.rst @@ -71,7 +71,9 @@ worker processes, while ``--machinefile file`` will launch a worker for each line in file ``file``. The machines defined in ``file`` must be accessible via a passwordless ``ssh`` login, with Julia installed at the same location as the current host. Each machine definition takes the form -``[user@]host[:port]`` +``[user@]host[:port] [bind_addr]`` . ``user`` defaults to current user, +``port`` to the standard ssh port. Optionally, in case of multi-homed hosts, +``bind_addr`` may be used to explicitly specify an interface. If you have code that you want executed whenever julia is run, you can diff --git a/doc/stdlib/base.rst b/doc/stdlib/base.rst index 5a65899b3f87d..9720e07d99763 100644 --- a/doc/stdlib/base.rst +++ b/doc/stdlib/base.rst @@ -4536,8 +4536,9 @@ Parallel Computing Add processes on remote machines via SSH. Requires julia to be installed in the same location on each node, or to be available via a shared file system. - ``machines`` is a vector of host definitions of the form ``[user@]host[:port]``. A worker is started - for each such definition. + ``machines`` is a vector of host definitions of the form ``[user@]host[:port] [bind_addr]``. ``user`` defaults + to current user, ``port`` to the standard ssh port. Optionally, in case of multi-homed hosts, ``bind_addr`` + may be used to explicitly specify an interface. Keyword arguments: