Skip to content

Commit

Permalink
bind-to checked. Use iface address instead of loopback if available.
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Apr 17, 2014
1 parent 53ac944 commit 588d6ff
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 77 deletions.
25 changes: 22 additions & 3 deletions base/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,27 @@ end
# try to include() a file, ignoring if not found
try_include(path::String) = isfile(path) && include(path)

function init_bind_addr(args::Vector{UTF8String})
# Treat --bind-to in a position independent manner in ARGS since
# --worker, -n and --machinefile options are affected by it
btoidx = findfirst(args, "--bind-to")
if btoidx > 0
bind_addr = parseip(args[btoidx+1])
else
try
bind_addr = getipaddr()
catch
# All networking is unavailable, initialize bind_addr to the loopback address
# Will cause an exception to be raised only when used.
bind_addr = ip"127.0.0.1"
end
end
global LPROC
LPROC.bind_addr = bind_addr
end


function process_options(args::Vector{UTF8String})
global bind_addr
quiet = false
repl = true
startup = true
Expand All @@ -186,8 +205,7 @@ function process_options(args::Vector{UTF8String})
start_worker()
# doesn't return
elseif args[i]=="--bind-to"
i += 1
bind_addr = args[i]
i+=1 # has already been processed
elseif args[i]=="-e" || args[i]=="--eval"
repl = false
i+=1
Expand Down Expand Up @@ -322,6 +340,7 @@ function _start()
early_init()

try
init_bind_addr(ARGS)
any(a->(a=="--worker"), ARGS) || init_head_sched()
init_load_path()
(quiet,repl,startup,color_set,no_history_file) = process_options(copy(ARGS))
Expand Down
160 changes: 100 additions & 60 deletions base/multi.jl
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,6 @@ end

abstract ClusterManager

type WorkerLocalInfo
# Currently only one field, in the future we may add more fields like
# local OS pid, system info like RAM, CPU type, etc.
#
privipaddr::IpAddr
WorkerLocalInfo() = new(getipaddr())
end

type Worker
host::ByteString
port::Uint16
Expand All @@ -104,21 +96,26 @@ type Worker
add_msgs::Array{Any,1}
id::Int
gcflag::Bool
privhost::ByteString
bind_addr::IpAddr
manage::Function
config::Dict
winfo::WorkerLocalInfo

Worker(host::String, port::Integer, sock::TcpSocket, id::Int) =
new(bytestring(host), uint16(port), sock, IOBuffer(), {}, {}, id, false, "")
new(bytestring(host), uint16(port), sock, IOBuffer(), {}, {}, id, false)
end
Worker(host::String, port::Integer, sock::TcpSocket) =
Worker(host, port, sock, 0)
Worker(host::String, port::Integer) =
Worker(host, port, connect(host,uint16(port)))
Worker(host::String, privhost::String, port::Integer, tunnel_user::String, sshflags) =
Worker(host, port, connect("localhost",
ssh_tunnel(tunnel_user, host, privhost, uint16(port), sshflags)))
function Worker(host::String, port::Integer)
w = Worker(host, port, connect(host,uint16(port)))
w.bind_addr = getaddrinfo(host)
w
end
function Worker(host::String, bind_addr::String, port::Integer, tunnel_user::String, sshflags)
w = Worker(host, port, connect("localhost",
ssh_tunnel(tunnel_user, host, bind_addr, uint16(port), sshflags)))
w.bind_addr = parseip(bind_addr)
w
end


function send_msg_now(w::Worker, kind, args...)
Expand Down Expand Up @@ -182,10 +179,11 @@ end

type LocalProcess
id::Int
winfo::WorkerLocalInfo
bind_addr::IpAddr
LocalProcess() = new()
end

const LPROC = LocalProcess(1, WorkerLocalInfo())
const LPROC = LocalProcess()

const map_pid_wrkr = Dict{Int, Union(Worker, LocalProcess)}()
const map_sock_wrkr = ObjectIdDict()
Expand All @@ -211,17 +209,18 @@ type ProcessGroup
end
const PGRP = ProcessGroup({})

getprivipaddr(pid::Integer) = getprivipaddr(worker_from_id(pid))
getprivipaddr(w::Union(Worker, LocalProcess)) = fetchwinfo(w).privipaddr

fetchwinfo(pid::Integer) = fetchwinfo(worker_from_id(pid))
function fetchwinfo(w::Union(Worker, LocalProcess))
# retrieve and cache upon first access
if !isdefined(w, :winfo)
w.winfo = remotecall_fetch(w.id, fetchwinfo, w.id)
get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid))
function get_bind_addr(w::Union(Worker, LocalProcess))
if !isdefined(w, :bind_addr)
if w.id != myid()
w.bind_addr = remotecall_fetch(w.id, get_bind_addr, w.id)
else
error("LPROC.bind_addr not defined") # Should never happend since LPROC.bind_addr
# is defined early on during process init.
end
end
w.winfo
end
w.bind_addr
end

function add_workers(pg::ProcessGroup, ws::Array{Any,1})
# NOTE: currently only node 1 can add new nodes, since nobody else
Expand All @@ -232,10 +231,11 @@ function add_workers(pg::ProcessGroup, ws::Array{Any,1})
register_worker(w)
create_message_handler_loop(w.socket)
end
all_locs = map(x -> isa(x, Worker) ? (x.privhost, x.port, x.id) : ("", 0, x.id), pg.workers)

all_locs = map(x -> isa(x, Worker) ? (string(x.bind_addr), x.port, x.id, x.manage == manage_local_worker) : ("", 0, x.id, true), pg.workers)

for w in ws
send_msg_now(w, :join_pgrp, w.id, all_locs)
send_msg_now(w, :join_pgrp, w.id, all_locs, w.manage == manage_local_worker)
end
for w in ws
@schedule begin
Expand All @@ -255,8 +255,16 @@ end

procs() = Int[x.id for x in PGRP.workers]
function procs(pid::Integer)
ipatpid = getprivipaddr(pid)
Int[x.id for x in filter(w -> getprivipaddr(w) == ipatpid, PGRP.workers)]
if myid() == 1
if (pid == 1) || (map_pid_wrkr[pid].manage == manage_local_worker)
Int[x.id for x in filter(w -> (w.id==1) || (w.manage == manage_local_worker), PGRP.workers)]
else
ipatpid = get_bind_addr(pid)
Int[x.id for x in filter(w -> get_bind_addr(w) == ipatpid, PGRP.workers)]
end
else
remotecall_fetch(1, procs, pid)
end
end

function workers()
Expand Down Expand Up @@ -854,16 +862,24 @@ function create_message_handler_loop(sock::AsyncStream) #returns immediately
# first connection; get process group info from client
self_pid = LPROC.id = deserialize(sock)
locs = deserialize(sock)
self_is_local = deserialize(sock)
#print("\nLocation: ",locs,"\nId:",myid(),"\n")
# joining existing process group

register_worker(Worker("", 0, sock, 1))
register_worker(LPROC)

for (rhost, rport, rpid) in locs
for (rhost, rport, rpid, r_is_local) in locs
if (rpid < self_pid) && (!(rpid == 1))
# Connect to them
w = Worker(rhost, rport)
if self_is_local && r_is_local
# If on localhost, use the loopback address - this addresses
# the special case of system suspend wherein the local ip
# may be changed upon system awake.
w = Worker("127.0.0.1", rport)
else
w = Worker(rhost, rport)
end
w.id = rpid
register_worker(w)
create_message_handler_loop(w.socket)
Expand Down Expand Up @@ -919,8 +935,6 @@ end
# argument is descriptor to write listening port # to.
start_worker() = start_worker(STDOUT)
function start_worker(out::IO)
global bind_addr

# we only explicitly monitor worker STDOUT on the console, so redirect
# stderr to stdout so we can see the output.
# at some point we might want some or all worker output to go to log
Expand All @@ -929,14 +943,11 @@ function start_worker(out::IO)
# exit when process 1 shut down. Don't yet know why.
#redirect_stderr(STDOUT)

if !isdefined(Base,:bind_addr)
bind_addr = getipaddr()
end
(actual_port,sock) = listenany(uint16(9009))
sock.ccb = accept_handler
print(out, "julia_worker:") # print header
print(out, "$(dec(actual_port))#") # print port
print(out, bind_addr) #TODO: print hostname
print(out, LPROC.bind_addr)
print(out, '\n')
flush(out)
# close STDIN; workers will not use it
Expand Down Expand Up @@ -973,8 +984,8 @@ function start_cluster_workers(np::Integer, config::Dict, cman::ClusterManager)
elseif insttype == :io_host
read_cb_response(inst) =
begin
(priv_hostname, port) = read_worker_host_port(inst[1])
inst[1], priv_hostname, port, inst[2], inst[3]
(bind_addr, port) = read_worker_host_port(inst[1])
inst[1], bind_addr, port, inst[2], inst[3]
end
elseif insttype == :io_host_port
read_cb_response(inst) = (inst[1], inst[2], inst[3], inst[2], inst[4])
Expand All @@ -985,8 +996,8 @@ function start_cluster_workers(np::Integer, config::Dict, cman::ClusterManager)
end

for i=1:np
(io, privhost, port, pubhost, wconfig) = read_cb_response(instances[i])
ws[i] = create_worker(privhost, port, pubhost, io, wconfig, cman.manage)
(io, bind_addr, port, pubhost, wconfig) = read_cb_response(instances[i])
ws[i] = create_worker(bind_addr, port, pubhost, io, wconfig, cman.manage)
end
ws
end
Expand All @@ -995,14 +1006,14 @@ function read_worker_host_port(io::IO)
io.line_buffered = true
while true
conninfo = readline(io)
private_hostname, port = parse_connection_info(conninfo)
if private_hostname != ""
return private_hostname, port
bind_addr, port = parse_connection_info(conninfo)
if bind_addr != ""
return bind_addr, port
end
end
end

function create_worker(privhost, port, pubhost, stream, config, manage)
function create_worker(bind_addr, port, pubhost, stream, config, manage)
tunnel = config[:tunnel]

s = split(pubhost,'@')
Expand All @@ -1020,12 +1031,11 @@ function create_worker(privhost, port, pubhost, stream, config, manage)

if tunnel
sshflags = config[:sshflags]
w = Worker(pubhost, privhost, port, user, sshflags)
w = Worker(pubhost, bind_addr, port, user, sshflags)
else
w = Worker(pubhost, port)
w = Worker(bind_addr, port)
end

w.privhost = privhost
w.config = config
w.manage = manage

Expand Down Expand Up @@ -1060,10 +1070,10 @@ end
tunnel_port = 9201
# establish an SSH tunnel to a remote worker
# returns P such that localhost:P connects to host:port
function ssh_tunnel(user, host, privhost, port, sshflags)
function ssh_tunnel(user, host, bind_addr, port, sshflags)
global tunnel_port
localp = tunnel_port::Int
while !success(detach(`ssh -f -o ExitOnForwardFailure=yes $sshflags $(user)@$host -L $localp:$privhost:$(int(port)) sleep 60`)) && localp < 10000
while !success(detach(`ssh -f -o ExitOnForwardFailure=yes $sshflags $(user)@$host -L $localp:$bind_addr:$(int(port)) sleep 60`)) && localp < 10000
localp += 1
end

Expand Down Expand Up @@ -1095,7 +1105,7 @@ function launch_local_workers(cman::LocalManager, np::Integer, config::Dict)

# start the processes first...
for i in 1:np
io, pobj = readsfrom(detach(`$(dir)/$(exename) --bind-to 127.0.0.1 $exeflags`))
io, pobj = readsfrom(detach(`$(dir)/$(exename) --bind-to $(LPROC.bind_addr) $exeflags`))
io_objs[i] = io
configs[i] = merge(config, {:process => pobj})
end
Expand Down Expand Up @@ -1123,30 +1133,43 @@ show(io::IO, cman::SSHManager) = println("SSHManager(machines=", cman.machines,
function launch_ssh_workers(cman::SSHManager, np::Integer, config::Dict)
dir = config[:dir]
exename = config[:exename]
exeflags = config[:exeflags]
exeflags_base = config[:exeflags]

io_objs = cell(np)
configs = cell(np)

# start the processes first...

for i in 1:np
# machine could be of the format [user@]host[:port]
machine_def = split(cman.machines[i], ':')
# machine could be of the format [user@]host[:port] bind_addr
machine_bind = split(cman.machines[i])
if length(machine_bind) > 1
exeflags = `--bind-to $(machine_bind[2]) $exeflags_base`
else
exeflags = exeflags_base
end
machine_def = machine_bind[1]

machine_def = split(machine_def, ':')
portopt = length(machine_def) == 2 ? ` -p $(machine_def[2]) ` : ``
config[:sshflags] = `$(config[:sshflags]) $portopt`

sshflags = config[:sshflags]
cman.machines[i] = machine_def[1]
host = cman.machines[i] = machine_def[1]

io, pobj = readsfrom(detach(`ssh -n $sshflags $(machine_def[1]) "sh -l -c \"cd $dir && $exename $exeflags\""`))
# Build up the ssh command
cmd = `cd $dir && $exename $exeflags` # launch julia
cmd = `sh -l -c $(shell_escape(cmd))` # shell to launch under
cmd = `ssh -n $sshflags $host $(shell_escape(cmd))` # use ssh to remote launch

io, pobj = readsfrom(detach(cmd))
io_objs[i] = io
configs[i] = merge(config, {:machine => cman.machines[i]})
end

# ...and then read the host:port info. This optimizes overall start times.
# For ssh, the tunnel connection, if any, has to be with the specified machine name.
# but the port needs to be forwarded to the private hostname/ip-address
# but the port needs to be forwarded to the bound hostname/ip-address
return (:io_host, collect(zip(io_objs, cman.machines, configs)))
end

Expand Down Expand Up @@ -1537,3 +1560,20 @@ function disable_nagle(sock)
end
end
end

function check_same_host(pids)
if myid() != 1
return remotecall_fetch(1, check_same_host, pids)
else
# We checkfirst if all test pids have been started using the local manager,
# else we check for the same bind_to addr. This handles the special case
# where the local ip address may change - as during a system sleep/awake
if all(p -> (p==1) || (map_pid_wrkr[p].manage == manage_local_worker), pids)
return true
else
first_bind_addr = map_pid_wrkr[pids[1]].bind_addr
return all(p -> (p != 1) && (map_pid_wrkr[p].bind_addr == first_bind_addr), pids[2:end])
end
end
end

Loading

0 comments on commit 588d6ff

Please sign in to comment.