Skip to content

Commit

Permalink
Fix more invalidations from overloading == (#36282)
Browse files Browse the repository at this point in the history
* Improve typing of ProcessGroup.refs

* Add type annotation in stacktrace handling

* Eliminate boxing in REPL

Related to #15276
  • Loading branch information
timholy committed Jun 16, 2020
1 parent ab3c1d2 commit 65953d8
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 24 deletions.
14 changes: 14 additions & 0 deletions src/Distributed.jl
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,20 @@ function _require_callback(mod::Base.PkgId)
end
end

const REF_ID = Ref(1)
next_ref_id() = (id = REF_ID[]; REF_ID[] = id+1; id)

struct RRID
whence::Int
id::Int

RRID() = RRID(myid(),next_ref_id())
RRID(whence, id) = new(whence,id)
end

hash(r::RRID, h::UInt) = hash(r.whence, hash(r.id, h))
==(r::RRID, s::RRID) = (r.whence==s.whence && r.id==s.id)

include("clusterserialize.jl")
include("cluster.jl") # cluster setup and management, addprocs
include("messages.jl")
Expand Down
24 changes: 12 additions & 12 deletions src/cluster.jl
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ function set_worker_state(w, state)
end

function check_worker_state(w::Worker)
if w.state == W_CREATED
if w.state === W_CREATED
if !isclusterlazy()
if PGRP.topology === :all_to_all
# Since higher pids connect with lower pids, the remote worker
Expand Down Expand Up @@ -185,13 +185,13 @@ function exec_conn_func(w::Worker)
end

function wait_for_conn(w)
if w.state == W_CREATED
if w.state === W_CREATED
timeout = worker_timeout() - (time() - w.ct_time)
timeout <= 0 && error("peer $(w.id) has not connected to $(myid())")

@async (sleep(timeout); notify(w.c_state; all=true))
wait(w.c_state)
w.state == W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
w.state === W_CREATED && error("peer $(w.id) didn't connect to $(myid()) within $timeout seconds")
end
nothing
end
Expand Down Expand Up @@ -626,7 +626,7 @@ function create_worker(manager, wconfig)
# require the value of config.connect_at which is set only upon connection completion
for jw in PGRP.workers
if (jw.id != 1) && (jw.id < w.id)
(jw.state == W_CREATED) && wait(jw.c_state)
(jw.state === W_CREATED) && wait(jw.c_state)
push!(join_list, jw)
end
end
Expand All @@ -649,7 +649,7 @@ function create_worker(manager, wconfig)
end

for wl in wlist
(wl.state == W_CREATED) && wait(wl.c_state)
(wl.state === W_CREATED) && wait(wl.c_state)
push!(join_list, wl)
end
end
Expand Down Expand Up @@ -767,7 +767,7 @@ end
mutable struct ProcessGroup
name::AbstractString
workers::Array{Any,1}
refs::Dict # global references
refs::Dict{RRID,Any} # global references
topology::Symbol
lazy::Union{Bool, Nothing}

Expand Down Expand Up @@ -851,7 +851,7 @@ function nprocs()
n = length(PGRP.workers)
# filter out workers in the process of being setup/shutdown.
for jw in PGRP.workers
if !isa(jw, LocalProcess) && (jw.state != W_CONNECTED)
if !isa(jw, LocalProcess) && (jw.state !== W_CONNECTED)
n = n - 1
end
end
Expand Down Expand Up @@ -902,7 +902,7 @@ julia> procs()
function procs()
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
# filter out workers in the process of being setup/shutdown.
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state == W_CONNECTED)]
return Int[x.id for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
else
return Int[x.id for x in PGRP.workers]
end
Expand All @@ -911,7 +911,7 @@ end
function id_in_procs(id) # faster version of `id in procs()`
if myid() == 1 || (PGRP.topology === :all_to_all && !isclusterlazy())
for x in PGRP.workers
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state == W_CONNECTED)
if (x.id::Int) == id && (isa(x, LocalProcess) || (x::Worker).state === W_CONNECTED)
return true
end
end
Expand All @@ -933,7 +933,7 @@ Specifically all workers bound to the same ip-address as `pid` are returned.
"""
function procs(pid::Integer)
if myid() == 1
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state == W_CONNECTED)]
all_workers = [x for x in PGRP.workers if isa(x, LocalProcess) || (x.state === W_CONNECTED)]
if (pid == 1) || (isa(map_pid_wrkr[pid].manager, LocalManager))
Int[x.id for x in filter(w -> (w.id==1) || (isa(w.manager, LocalManager)), all_workers)]
else
Expand Down Expand Up @@ -1040,11 +1040,11 @@ function _rmprocs(pids, waitfor)

start = time_ns()
while (time_ns() - start) < waitfor*1e9
all(w -> w.state == W_TERMINATED, rmprocset) && break
all(w -> w.state === W_TERMINATED, rmprocset) && break
sleep(min(0.1, waitfor - (time_ns() - start)/1e9))
end

unremoved = [wrkr.id for wrkr in filter(w -> w.state != W_TERMINATED, rmprocset)]
unremoved = [wrkr.id for wrkr in filter(w -> w.state !== W_TERMINATED, rmprocset)]
if length(unremoved) > 0
estr = string("rmprocs: pids ", unremoved, " not terminated after ", waitfor, " seconds.")
throw(ErrorException(estr))
Expand Down
12 changes: 0 additions & 12 deletions src/messages.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,6 @@

abstract type AbstractMsg end

const REF_ID = Ref(1)
next_ref_id() = (id = REF_ID[]; REF_ID[] = id+1; id)

struct RRID
whence::Int
id::Int

RRID() = RRID(myid(),next_ref_id())
RRID(whence, id) = new(whence,id)
end
hash(r::RRID, h::UInt) = hash(r.whence, hash(r.id, h))
==(r::RRID, s::RRID) = (r.whence==s.whence && r.id==s.id)

## Wire format description
#
Expand Down

0 comments on commit 65953d8

Please sign in to comment.