Skip to content

Commit

Permalink
Revert "Merge pull request #38405 from JuliaLang/vc/distributed_ts" (#…
Browse files Browse the repository at this point in the history
…41722)

Also reverts "fixup to pull request #38405 (#41641)"

Seems to be causing hanging in CI testing.

This reverts commit 5af1cf0 and this
reverts commit 5a16805, reversing
changes made to 02807b2.

(cherry picked from commit 66f9b55)
  • Loading branch information
vtjnash authored and KristofferC committed Aug 2, 2021
1 parent be3e807 commit 84e6bb9
Show file tree
Hide file tree
Showing 7 changed files with 47 additions and 169 deletions.
66 changes: 13 additions & 53 deletions stdlib/Distributed/src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,13 @@ end
@enum WorkerState W_CREATED W_CONNECTED W_TERMINATING W_TERMINATED
mutable struct Worker
id::Int
msg_lock::Threads.ReentrantLock # Lock for del_msgs, add_msgs, and gcflag
del_msgs::Array{Any,1}
add_msgs::Array{Any,1}
gcflag::Bool
state::WorkerState
c_state::Threads.Condition # wait for state changes, lock for state
ct_time::Float64 # creation time
conn_func::Any # used to setup connections lazily
c_state::Condition # wait for state changes
ct_time::Float64 # creation time
conn_func::Any # used to setup connections lazily

r_stream::IO
w_stream::IO
Expand Down Expand Up @@ -134,7 +133,7 @@ mutable struct Worker
if haskey(map_pid_wrkr, id)
return map_pid_wrkr[id]
end
w=new(id, Threads.ReentrantLock(), [], [], false, W_CREATED, Threads.Condition(), time(), conn_func)
w=new(id, [], [], false, W_CREATED, Condition(), time(), conn_func)
w.initialized = Event()
register_worker(w)
w
Expand All @@ -144,16 +143,12 @@ mutable struct Worker
end

function set_worker_state(w, state)
lock(w.c_state) do
w.state = state
notify(w.c_state; all=true)
end
w.state = state
notify(w.c_state; all=true)
end

function check_worker_state(w::Worker)
lock(w.c_state)
if w.state === W_CREATED
unlock(w.c_state)
if !isclusterlazy()
if PGRP.topology === :all_to_all
# Since higher pids connect with lower pids, the remote worker
Expand All @@ -173,8 +168,6 @@ function check_worker_state(w::Worker)
errormonitor(t)
wait_for_conn(w)
end
else
unlock(w.c_state)
end
end

Expand All @@ -193,25 +186,13 @@ function exec_conn_func(w::Worker)
end

function wait_for_conn(w)
lock(w.c_state)
if w.state === W_CREATED
unlock(w.c_state)
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

T = Threads.@spawn begin
sleep($timeout)
lock(w.c_state) do
notify(w.c_state; all=true)
end
end
errormonitor(T)
lock(w.c_state) do
wait(w.c_state)
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
end
else
unlock(w.c_state)
@async (sleep(timeout); notify(w.c_state; all=true))
wait(w.c_state)
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
end
nothing
end
Expand Down Expand Up @@ -490,10 +471,6 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
# The `launch` method should add an object of type WorkerConfig for every
# worker launched. It provides information required on how to connect
# to it.

# FIXME: launched should be a Channel, launch_ntfy should be a Threads.Condition
# but both are part of the public interface. This means we currently can't use
# `Threads.@spawn` in the code below.
launched = WorkerConfig[]
launch_ntfy = Condition()

Expand All @@ -506,10 +483,7 @@ function addprocs_locked(manager::ClusterManager; kwargs...)
while true
if isempty(launched)
istaskdone(t_launch) && break
@async begin
sleep(1)
notify(launch_ntfy)
end
@async (sleep(1); notify(launch_ntfy))
wait(launch_ntfy)
end

Expand Down Expand Up @@ -662,12 +636,7 @@ function create_worker(manager, wconfig)
# require the value of config.connect_at which is set only upon connection completion
for jw in PGRP.workers
if (jw.id != 1) && (jw.id < w.id)
# wait for wl to join
lock(jw.c_state) do
if jw.state === W_CREATED
wait(jw.c_state)
end
end
(jw.state === W_CREATED) && wait(jw.c_state)
push!(join_list, jw)
end
end
Expand All @@ -690,12 +659,7 @@ function create_worker(manager, wconfig)
end

for wl in wlist
lock(wl.c_state) do
if wl.state === W_CREATED
# wait for wl to join
wait(wl.c_state)
end
end
(wl.state === W_CREATED) && wait(wl.c_state)
push!(join_list, wl)
end
end
Expand All @@ -712,11 +676,7 @@ function create_worker(manager, wconfig)
@async manage(w.manager, w.id, w.config, :register)
# wait for rr_ntfy_join with timeout
timedout = false
@async begin
sleep($timeout)
timedout = true
put!(rr_ntfy_join, 1)
end
@async (sleep($timeout); timedout = true; put!(rr_ntfy_join, 1))
wait(rr_ntfy_join)
if timedout
error("worker did not connect within $timeout seconds")
Expand Down
10 changes: 7 additions & 3 deletions stdlib/Distributed/src/macros.jl
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

let nextidx = Threads.Atomic{Int}(0)
let nextidx = 0
global nextproc
function nextproc()
idx = Threads.atomic_add!(nextidx, 1)
return workers()[(idx % nworkers()) + 1]
p = -1
if p == -1
p = workers()[(nextidx % nworkers()) + 1]
nextidx += 1
end
p
end
end

Expand Down
2 changes: 1 addition & 1 deletion stdlib/Distributed/src/managers.jl
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ function launch(manager::SSHManager, params::Dict, launched::Array, launch_ntfy:
# Wait for all launches to complete.
@sync for (i, (machine, cnt)) in enumerate(manager.machines)
let machine=machine, cnt=cnt
@async try
@async try
launch_on_machine(manager, $machine, $cnt, params, launched, launch_ntfy)
catch e
print(stderr, "exception launching on machine $(machine) : $(e)\n")
Expand Down
28 changes: 15 additions & 13 deletions stdlib/Distributed/src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -126,20 +126,22 @@ function flush_gc_msgs(w::Worker)
if !isdefined(w, :w_stream)
return
end
lock(w.msg_lock) do
w.gcflag || return # early exit if someone else got to this
w.gcflag = false
msgs = w.add_msgs
w.add_msgs = Any[]
if !isempty(msgs)
remote_do(add_clients, w, msgs)
end
w.gcflag = false
new_array = Any[]
msgs = w.add_msgs
w.add_msgs = new_array
if !isempty(msgs)
remote_do(add_clients, w, msgs)
end

msgs = w.del_msgs
w.del_msgs = Any[]
if !isempty(msgs)
remote_do(del_clients, w, msgs)
end
# del_msgs gets populated by finalizers, so be very careful here about ordering of allocations
# XXX: threading requires this to be atomic
new_array = Any[]
msgs = w.del_msgs
w.del_msgs = new_array
if !isempty(msgs)
#print("sending delete of $msgs\n")
remote_do(del_clients, w, msgs)
end
end

Expand Down
46 changes: 11 additions & 35 deletions stdlib/Distributed/src/remotecall.jl
Original file line number Diff line number Diff line change
Expand Up @@ -247,42 +247,22 @@ function del_clients(pairs::Vector)
end
end

# The task below is coalescing the `flush_gc_msgs` call
# across multiple producers, see `send_del_client`,
# and `send_add_client`.
# XXX: Is this worth the additional complexity?
# `flush_gc_msgs` has to iterate over all connected workers.
const any_gc_flag = Threads.Condition()
const any_gc_flag = Condition()
function start_gc_msgs_task()
errormonitor(
Threads.@spawn begin
while true
lock(any_gc_flag) do
wait(any_gc_flag)
flush_gc_msgs() # handles throws internally
end
end
end
)
errormonitor(@async while true
wait(any_gc_flag)
flush_gc_msgs()
end)
end

# Function can be called within a finalizer
function send_del_client(rr)
if rr.where == myid()
del_client(rr)
elseif id_in_procs(rr.where) # process only if a valid worker
w = worker_from_id(rr.where)::Worker
msg = (remoteref_id(rr), myid())
# We cannot acquire locks from finalizers
Threads.@spawn begin
lock(w.msg_lock) do
push!(w.del_msgs, msg)
w.gcflag = true
end
lock(any_gc_flag) do
notify(any_gc_flag)
end
end
push!(w.del_msgs, (remoteref_id(rr), myid()))
w.gcflag = true
notify(any_gc_flag)
end
end

Expand All @@ -308,13 +288,9 @@ function send_add_client(rr::AbstractRemoteRef, i)
# to the processor that owns the remote ref. it will add_client
# itself inside deserialize().
w = worker_from_id(rr.where)
lock(w.msg_lock) do
push!(w.add_msgs, (remoteref_id(rr), i))
w.gcflag = true
end
lock(any_gc_flag) do
notify(any_gc_flag)
end
push!(w.add_msgs, (remoteref_id(rr), i))
w.gcflag = true
notify(any_gc_flag)
end
end

Expand Down
1 change: 0 additions & 1 deletion stdlib/Distributed/test/distributed_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -1689,5 +1689,4 @@ include("splitrange.jl")
# Run topology tests last after removing all workers, since a given
# cluster at any time only supports a single topology.
rmprocs(workers())
include("threads.jl")
include("topology.jl")
63 changes: 0 additions & 63 deletions stdlib/Distributed/test/threads.jl

This file was deleted.

0 comments on commit 84e6bb9

Please sign in to comment.