Skip to content

Commit

Permalink
support lazy all_to_all connection setups (JuliaLang#22814)
Browse files Browse the repository at this point in the history
  • Loading branch information
amitmurthy committed Jul 25, 2017
1 parent d328a82 commit fd951c2
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 63 deletions.
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ This section lists changes that do not have deprecation warnings.
the type of `n`). Use the corresponding mutating functions `randperm!` and `randcycle!`
to control the array type ([#22723]).

* Worker-worker connections are setup lazily for an `:all_to_all` topology. Use keyword
arg `lazy=false` to force all connections to be setup during a `addprocs` call. ([#22814])

Library improvements
--------------------

Expand Down
96 changes: 77 additions & 19 deletions base/distributed/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ mutable struct Worker
state::WorkerState
c_state::Condition # wait for state changes
ct_time::Float64 # creation time
conn_func::Nullable{Function} # Used to setup connections lazily

r_stream::IO
w_stream::IO
Expand All @@ -82,12 +83,13 @@ mutable struct Worker
w
end

function Worker(id::Int)
Worker(id::Int) = Worker(id, Nullable{Function}())
function Worker(id::Int, conn_func)
@assert id > 0
if haskey(map_pid_wrkr, id)
return map_pid_wrkr[id]
end
w=new(id, [], [], false, W_CREATED, Condition(), time())
w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func)
register_worker(w)
w
end
Expand All @@ -102,21 +104,56 @@ end

function check_worker_state(w::Worker)
if w.state == W_CREATED
if PGRP.topology == :all_to_all
# Since higher pids connect with lower pids, the remote worker
# may not have connected to us yet. Wait for some time.
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

@schedule (sleep(timeout); notify(w.c_state; all=true))
wait(w.c_state)
w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
if !isclusterlazy()
if PGRP.topology == :all_to_all
# Since higher pids connect with lower pids, the remote worker
# may not have connected to us yet. Wait for some time.
wait_for_conn(w)
else
error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology))
end
else
error("peer $(w.id) is not connected to $(myid()). Topology : " * string(PGRP.topology))
w.ct_time = time()
if myid() > w.id
@schedule exec_conn_func(w)
else
# route request via node 1
@schedule remotecall_fetch((p,to_id) -> remotecall_fetch(exec_conn_func, p, to_id), 1, w.id, myid())
end
wait_for_conn(w)
end
end
end

exec_conn_func(id::Int) = exec_conn_func(worker_from_id(id))
function exec_conn_func(w::Worker)
if isnull(w.conn_func)
return wait_for_conn(w) # Some other task may be trying to connect at the same time.
end

try
f = get(w.conn_func)
w.conn_func = Nullable{Function}()
f()
catch e
w.conn_func = () -> throw(e)
rethrow(e)
end
nothing
end

function wait_for_conn(w)
if w.state == W_CREATED
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

@schedule (sleep(timeout); notify(w.c_state; all=true))
wait(w.c_state)
w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
end
nothing
end

## process group creation ##

mutable struct LocalProcess
Expand Down Expand Up @@ -340,6 +377,17 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
params = merge(default_addprocs_params(), AnyDict(kwargs))
topology(Symbol(params[:topology]))

if PGRP.topology != :all_to_all
params[:lazy] = false
end

if isnull(PGRP.lazy) || nprocs() == 1
PGRP.lazy = Nullable{Bool}(params[:lazy])
elseif isclusterlazy() != params[:lazy]
throw(ArgumentError(string("Active workers with lazy=", isclusterlazy(),
". Cannot set lazy=", params[:lazy])))
end

# References to launched workers, filled when each worker is fully initialized and
# has connected to all nodes.
launched_q = Int[] # Asynchronously filled by the launch method
Expand Down Expand Up @@ -396,7 +444,8 @@ default_addprocs_params() = AnyDict(
:dir => pwd(),
:exename => joinpath(JULIA_HOME, julia_exename()),
:exeflags => ``,
:enable_threaded_blas => false)
:enable_threaded_blas => false,
:lazy => true)


function setup_launched_worker(manager, wconfig, launched_q)
Expand Down Expand Up @@ -517,7 +566,7 @@ function create_worker(manager, wconfig)

all_locs = map(x -> isa(x, Worker) ? (get(x.config.connect_at, ()), x.id) : ((), x.id, true), join_list)
send_connection_hdr(w, true)
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, get(wconfig.enable_threaded_blas, false))
join_message = JoinPGRPMsg(w.id, all_locs, PGRP.topology, get(wconfig.enable_threaded_blas, false), isclusterlazy())
send_msg_now(w, MsgHeader(RRID(0,0), ntfy_oid), join_message)

@schedule manage(w.manager, w.id, w.config, :register)
Expand Down Expand Up @@ -619,8 +668,9 @@ mutable struct ProcessGroup
workers::Array{Any,1}
refs::Dict # global references
topology::Symbol
lazy::Nullable{Bool}

ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all)
ProcessGroup(w::Array{Any,1}) = new("pg-default", w, Dict(), :all_to_all, Nullable{Bool}())
end
const PGRP = ProcessGroup([])

Expand All @@ -634,6 +684,14 @@ function topology(t)
t
end

function isclusterlazy()
if isnull(PGRP.lazy)
return false
else
return get(PGRP.lazy)
end
end

get_bind_addr(pid::Integer) = get_bind_addr(worker_from_id(pid))
get_bind_addr(w::LocalProcess) = LPROC.bind_addr
function get_bind_addr(w::Worker)
Expand Down Expand Up @@ -667,7 +725,7 @@ myid() = LPROC.id
Get the number of available processes.
"""
function nprocs()
if myid() == 1 || PGRP.topology == :all_to_all
if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy())
n = length(PGRP.workers)
# filter out workers in the process of being setup/shutdown.
for jw in PGRP.workers
Expand Down Expand Up @@ -698,7 +756,7 @@ end
Returns a list of all process identifiers.
"""
function procs()
if myid() == 1 || PGRP.topology == :all_to_all
if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy())
# filter out workers in the process of being setup/shutdown.
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state == W_CONNECTED)]
else
Expand All @@ -707,7 +765,7 @@ function procs()
end

function id_in_procs(id) # faster version of `id in procs()`
if myid() == 1 || PGRP.topology == :all_to_all
if myid() == 1 || (PGRP.topology == :all_to_all && !isclusterlazy())
for x in PGRP.workers
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state == W_CONNECTED)
return true
Expand Down Expand Up @@ -903,7 +961,7 @@ function deregister_worker(pg, pid)
if myid() == 1 && isdefined(w, :config)
# Notify the cluster manager of this workers death
manage(w.manager, w.id, w.config, :deregister)
if PGRP.topology != :all_to_all
if PGRP.topology != :all_to_all || isclusterlazy()
for rpid in workers()
try
remote_do(deregister_worker, rpid, pid)
Expand Down
6 changes: 5 additions & 1 deletion base/distributed/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ Keyword arguments:
A worker with a cluster manager identity `ident` will connect to all workers specified
in `connect_idents`.
* `lazy`: Applicable only with `topology=:all_to_all`. If `true`, worker-worker connections
are setup lazily, i.e. they are setup at the first instance of a remote call between
workers. Default is true.
Environment variables :
Expand Down Expand Up @@ -302,7 +306,7 @@ addprocs(; kwargs...) = addprocs(Sys.CPU_CORES; kwargs...)
Launches workers using the in-built `LocalManager` which only launches workers on the
local host. This can be used to take advantage of multiple cores. `addprocs(4)` will add 4
processes on the local machine. If `restrict` is `true`, binding is restricted to
`127.0.0.1`. Keyword args `dir`, `exename`, `exeflags`, `topology`, and
`127.0.0.1`. Keyword args `dir`, `exename`, `exeflags`, `topology`, `lazy` and
`enable_threaded_blas` have the same effect as documented for `addprocs(machines)`.
"""
function addprocs(np::Integer; restrict=true, kwargs...)
Expand Down
1 change: 1 addition & 0 deletions base/distributed/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ struct JoinPGRPMsg <: AbstractMsg
other_workers::Array
topology::Symbol
enable_threaded_blas::Bool
lazy::Bool
end
struct JoinCompleteMsg <: AbstractMsg
cpu_cores::Int
Expand Down
12 changes: 10 additions & 2 deletions base/distributed/process_messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -310,14 +310,22 @@ function handle_msg(msg::JoinPGRPMsg, header, r_stream, w_stream, version)
disable_threaded_libs()
end

lazy = msg.lazy
PGRP.lazy = Nullable{Bool}(lazy)

wait_tasks = Task[]
for (connect_at, rpid) in msg.other_workers
wconfig = WorkerConfig()
wconfig.connect_at = connect_at

let rpid=rpid, wconfig=wconfig
t = @async connect_to_peer(cluster_manager, rpid, wconfig)
push!(wait_tasks, t)
if lazy
# The constructor registers the object with a global registry.
Worker(rpid, Nullable{Function}(()->connect_to_peer(cluster_manager, rpid, wconfig)))
else
t = @async connect_to_peer(cluster_manager, rpid, wconfig)
push!(wait_tasks, t)
end
end
end

Expand Down
6 changes: 6 additions & 0 deletions doc/src/manual/parallel-computing.md
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,12 @@ connected to each other:
fields `ident` and `connect_idents` in `WorkerConfig`. A worker with a cluster-manager-provided
identity `ident` will connect to all workers specified in `connect_idents`.

Keyword argument `lazy=true|false` only affects `topology` option `:all_to_all`. If `true`, the cluster
starts off with the master connected to all workers. Specific worker-worker connections are established
at the first remote invocation between two workers. This helps in reducing initial resources allocated for
intra-cluster communication. Connections are setup depending on the runtime requirements of a parallel
program. Default value for `lazy` is `true`.

Currently, sending a message between unconnected workers results in an error. This behaviour,
as with the functionality and interface, should be considered experimental in nature and may change
in future releases.
Expand Down
84 changes: 43 additions & 41 deletions test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,47 +13,6 @@ include("testenv.jl")
addprocs_with_testenv(4)
@test nprocs() == 5

function reuseport_tests()
# Run the test on all processes.
results = asyncmap(procs()) do p
remotecall_fetch(p) do
ports_lower = [] # ports of pids lower than myid()
ports_higher = [] # ports of pids higher than myid()
for w in Base.Distributed.PGRP.workers
w.id == myid() && continue
port = Base._sockname(w.r_stream, true)[2]
if (w.id == 1)
# master connects to workers
push!(ports_higher, port)
elseif w.id < myid()
push!(ports_lower, port)
elseif w.id > myid()
push!(ports_higher, port)
end
end
@assert (length(ports_lower) + length(ports_higher)) == nworkers()
for portset in [ports_lower, ports_higher]
if (length(portset) > 0) && (length(unique(portset)) != 1)
warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?")
return 0
end
end
return myid()
end
end

# Ensure that the code has indeed been successfully executed everywhere
@test all(p -> p in results, procs())
end

# Test that the client port is reused. SO_REUSEPORT may not be supported on
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
if ccall(:jl_has_so_reuseport, Int32, ()) == 1
reuseport_tests()
else
info("SO_REUSEPORT is unsupported, skipping reuseport tests.")
end

id_me = myid()
id_other = filter(x -> x != id_me, procs())[rand(1:(nprocs()-1))]

Expand Down Expand Up @@ -1817,6 +1776,49 @@ p1,p2 = addprocs_with_testenv(2)
@everywhere f22865(p) = remotecall_fetch(x->x.*2, p, ones(2))
@test ones(2).*2 == remotecall_fetch(f22865, p1, p2)

function reuseport_tests()
# Run the test on all processes.
results = asyncmap(procs()) do p
remotecall_fetch(p) do
ports_lower = [] # ports of pids lower than myid()
ports_higher = [] # ports of pids higher than myid()
for w in Base.Distributed.PGRP.workers
w.id == myid() && continue
port = Base._sockname(w.r_stream, true)[2]
if (w.id == 1)
# master connects to workers
push!(ports_higher, port)
elseif w.id < myid()
push!(ports_lower, port)
elseif w.id > myid()
push!(ports_higher, port)
end
end
@assert (length(ports_lower) + length(ports_higher)) == nworkers()
for portset in [ports_lower, ports_higher]
if (length(portset) > 0) && (length(unique(portset)) != 1)
warn("SO_REUSEPORT TESTS FAILED. UNSUPPORTED/OLDER UNIX VERSION?")
return 0
end
end
return myid()
end
end

# Ensure that the code has indeed been successfully executed everywhere
@test all(p -> p in results, procs())
end

# Test that the client port is reused. SO_REUSEPORT may not be supported on
# all UNIX platforms, Linux kernels prior to 3.9 and older versions of OSX
if ccall(:jl_has_so_reuseport, Int32, ()) == 1
rmprocs(workers())
addprocs_with_testenv(4; lazy=false)
reuseport_tests()
else
info("SO_REUSEPORT is unsupported, skipping reuseport tests.")
end

# Run topology tests last after removing all workers, since a given
# cluster at any time only supports a single topology.
rmprocs(workers())
Expand Down
Loading

0 comments on commit fd951c2

Please sign in to comment.