Skip to content

Commit

Permalink
more fixes to I/O and threading (JuliaLang#31733)
Browse files Browse the repository at this point in the history
hopefully helps JuliaLang#31713 and JuliaLang#31702

make more stream code thread safe
  • Loading branch information
JeffBezanson committed Apr 22, 2019
1 parent c15d984 commit 773140e
Show file tree
Hide file tree
Showing 11 changed files with 219 additions and 115 deletions.
68 changes: 48 additions & 20 deletions base/asyncevent.jl
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ Use [`isopen`](@ref) to check whether it is still active.
"""
mutable struct AsyncCondition
handle::Ptr{Cvoid}
cond::Condition
cond::ThreadSynchronizer
isopen::Bool

function AsyncCondition()
this = new(Libc.malloc(_sizeof_uv_async), Condition(), true)
this = new(Libc.malloc(_sizeof_uv_async), ThreadSynchronizer(), true)
associate_julia_struct(this.handle, this)
finalizer(uvfinalize, this)
err = ccall(:uv_async_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}, Ptr{Cvoid}),
Expand All @@ -41,14 +41,22 @@ the async condition object itself.
function AsyncCondition(cb::Function)
async = AsyncCondition()
waiter = Task(function()
while isopen(async)
success = try
wait(async)
true
catch exc # ignore possible exception on close()
isa(exc, EOFError) || rethrow()
lock(async.cond)
try
while isopen(async)
success = try
stream_wait(async, async.cond)
true
catch exc # ignore possible exception on close()
isa(exc, EOFError) || rethrow()
finally
unlock(async.cond)
end
success && cb(async)
lock(async.cond)
end
success && cb(async)
finally
unlock(async.cond)
end
end)
# must start the task right away so that it can wait for the AsyncCondition before
Expand All @@ -71,14 +79,14 @@ to check whether a timer is still active.
"""
mutable struct Timer
handle::Ptr{Cvoid}
cond::Condition
cond::ThreadSynchronizer
isopen::Bool

function Timer(timeout::Real; interval::Real = 0.0)
timeout 0 || throw(ArgumentError("timer cannot have negative timeout of $timeout seconds"))
interval 0 || throw(ArgumentError("timer cannot have negative repeat interval of $interval seconds"))

this = new(Libc.malloc(_sizeof_uv_timer), Condition(), true)
this = new(Libc.malloc(_sizeof_uv_timer), ThreadSynchronizer(), true)
err = ccall(:uv_timer_init, Cint, (Ptr{Cvoid}, Ptr{Cvoid}), eventloop(), this)
if err != 0
#TODO: this codepath is currently not tested
Expand All @@ -102,8 +110,13 @@ unsafe_convert(::Type{Ptr{Cvoid}}, t::Timer) = t.handle
unsafe_convert(::Type{Ptr{Cvoid}}, async::AsyncCondition) = async.handle

function wait(t::Union{Timer, AsyncCondition})
isopen(t) || throw(EOFError())
stream_wait(t, t.cond)
lock(t.cond)
try
isopen(t) || throw(EOFError())
stream_wait(t, t.cond)
finally
unlock(t.cond)
end
end

isopen(t::Union{Timer, AsyncCondition}) = t.isopen
Expand All @@ -128,24 +141,39 @@ function uvfinalize(t::Union{Timer, AsyncCondition})
end

function _uv_hook_close(t::Union{Timer, AsyncCondition})
uvfinalize(t)
notify_error(t.cond, EOFError())
lock(t.cond)
try
uvfinalize(t)
notify_error(t.cond, EOFError())
finally
unlock(t.cond)
end
nothing
end

function uv_asynccb(handle::Ptr{Cvoid})
async = @handle_as handle AsyncCondition
notify(async.cond)
lock(async.cond)
try
notify(async.cond)
finally
unlock(async.cond)
end
nothing
end

function uv_timercb(handle::Ptr{Cvoid})
t = @handle_as handle Timer
if ccall(:uv_timer_get_repeat, UInt64, (Ptr{Cvoid},), t) == 0
# timer is stopped now
close(t)
lock(t.cond)
try
if ccall(:uv_timer_get_repeat, UInt64, (Ptr{Cvoid},), t) == 0
# timer is stopped now
close(t)
end
notify(t.cond)
finally
unlock(t.cond)
end
notify(t.cond)
nothing
end

Expand Down
12 changes: 9 additions & 3 deletions base/condition.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

## thread/task locking abstraction

@noinline function concurrency_violation()
# can be useful for debugging
#try; error(); catch; ccall(:jlbacktrace, Cvoid, ()); end
error("concurrency violation detected")
end

"""
AbstractLock
Expand All @@ -18,10 +24,10 @@ unlockall(l::AbstractLock) = unlock(l) # internal function for implementing `wai
relockall(l::AbstractLock, token::Nothing) = lock(l) # internal function for implementing `wait`
assert_havelock(l::AbstractLock) = assert_havelock(l, Threads.threadid())
assert_havelock(l::AbstractLock, tid::Integer) =
(islocked(l) && tid == Threads.threadid()) ? nothing : error("concurrency violation detected")
(islocked(l) && tid == Threads.threadid()) ? nothing : concurrency_violation()
assert_havelock(l::AbstractLock, tid::Task) =
(islocked(l) && tid === current_task()) ? nothing : error("concurrency violation detected")
assert_havelock(l::AbstractLock, tid::Nothing) = error("concurrency violation detected")
(islocked(l) && tid === current_task()) ? nothing : concurrency_violation()
assert_havelock(l::AbstractLock, tid::Nothing) = concurrency_violation()

"""
AlwaysLockedST
Expand Down
2 changes: 1 addition & 1 deletion base/lock.jl
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ function relockall(rl::ReentrantLock, n::Int)
lock(rl)
n1 = rl.reentrancy_cnt
rl.reentrancy_cnt = n
n1 == 1 || error("concurrency violation detected")
n1 == 1 || concurrency_violation()
return
end

Expand Down
6 changes: 3 additions & 3 deletions base/locks-mt.jl
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ function mutex_destroy(x::Mutex)
end

function lock(m::Mutex)
m.ownertid == threadid() && error("concurrency violation detected") # deadlock
m.ownertid == threadid() && concurrency_violation() # deadlock
# Temporary solution before we have gc transition support in codegen.
# This could mess up gc state when we add codegen support.
gc_state = ccall(:jl_gc_safe_enter, Int8, ())
Expand All @@ -115,7 +115,7 @@ function lock(m::Mutex)
end

function trylock(m::Mutex)
m.ownertid == threadid() && error("concurrency violation detected") # deadlock
m.ownertid == threadid() && concurrency_violation() # deadlock
r = ccall(:uv_mutex_trylock, Cint, (Ptr{Cvoid},), m)
if r == 0
m.ownertid = threadid()
Expand All @@ -124,7 +124,7 @@ function trylock(m::Mutex)
end

function unlock(m::Mutex)
m.ownertid == threadid() || error("concurrency violation detected")
m.ownertid == threadid() || concurrency_violation()
m.ownertid = 0
ccall(:uv_mutex_unlock, Cvoid, (Ptr{Cvoid},), m)
return
Expand Down
92 changes: 50 additions & 42 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -118,19 +118,18 @@ mutable struct PipeEndpoint <: LibuvStream
handle::Ptr{Cvoid}
status::Int
buffer::IOBuffer
readnotify::Condition
connectnotify::Condition
cond::ThreadSynchronizer
closenotify::ThreadSynchronizer
sendbuf::Union{IOBuffer, Nothing}
lock::ReentrantLock
throttle::Int
function PipeEndpoint(handle::Ptr{Cvoid}, status)
lock = Threads.SpinLock()
p = new(handle,
status,
PipeBuffer(),
Condition(),
Condition(),
ThreadSynchronizer(),
ThreadSynchronizer(lock),
ThreadSynchronizer(lock),
nothing,
ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
Expand Down Expand Up @@ -164,19 +163,20 @@ mutable struct TTY <: LibuvStream
handle::Ptr{Cvoid}
status::Int
buffer::IOBuffer
readnotify::Condition
cond::ThreadSynchronizer
closenotify::ThreadSynchronizer
sendbuf::Union{IOBuffer, Nothing}
lock::ReentrantLock
throttle::Int
@static if Sys.iswindows(); ispty::Bool; end
function TTY(handle::Ptr{Cvoid}, status)
lock = Threads.SpinLock()
tty = new(
handle,
status,
PipeBuffer(),
Condition(),
ThreadSynchronizer(),
ThreadSynchronizer(lock),
ThreadSynchronizer(lock),
nothing,
ReentrantLock(),
DEFAULT_READ_BUFFER_SZ)
Expand Down Expand Up @@ -326,9 +326,14 @@ end

function wait_connected(x::Union{LibuvStream, LibuvServer})
check_open(x)
while x.status == StatusConnecting
stream_wait(x, x.connectnotify)
check_open(x)
lock(x.cond)
try
while x.status == StatusConnecting
stream_wait(x, x.cond)
check_open(x)
end
finally
unlock(x.cond)
end
end

Expand All @@ -339,16 +344,18 @@ function wait_readbyte(x::LibuvStream, c::UInt8)
return
end
preserve_handle(x)
lock(x.cond)
try
while isopen(x) && !occursin(c, x.buffer)
start_reading(x) # ensure we are reading
wait(x.readnotify)
wait(x.cond)
end
finally
if isempty(x.readnotify)
if isempty(x.cond)
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
end
unpreserve_handle(x)
unlock(x.cond)
end
nothing
end
Expand All @@ -361,20 +368,22 @@ function wait_readnb(x::LibuvStream, nb::Int)
end
oldthrottle = x.throttle
preserve_handle(x)
lock(x.cond)
try
while isopen(x) && bytesavailable(x.buffer) < nb
x.throttle = max(nb, x.throttle)
start_reading(x) # ensure we are reading
wait(x.readnotify)
wait(x.cond)
end
finally
if isempty(x.readnotify)
if isempty(x.cond)
stop_reading(x) # stop reading iff there are currently no other read clients of the stream
end
if oldthrottle <= x.throttle <= nb
x.throttle = oldthrottle
end
unpreserve_handle(x)
unlock(x.cond)
end
nothing
end
Expand Down Expand Up @@ -552,34 +561,34 @@ function uv_readcb(handle::Ptr{Cvoid}, nread::Cssize_t, buf::Ptr{Cvoid})
stream_unknown_type = @handle_as handle LibuvStream
nrequested = ccall(:jl_uv_buf_len, Csize_t, (Ptr{Cvoid},), buf)
function readcb_specialized(stream::LibuvStream, nread::Int, nrequested::UInt)
if nread < 0
if nread == UV_ENOBUFS && nrequested == 0
# remind the client that stream.buffer is full
notify(stream.readnotify)
elseif nread == UV_EOF
if isa(stream, TTY)
stream.status = StatusEOF # libuv called uv_stop_reading already
notify(stream.readnotify)
lock(stream.closenotify)
try
lock(stream.cond)
try
if nread < 0
if nread == UV_ENOBUFS && nrequested == 0
# remind the client that stream.buffer is full
notify(stream.cond)
elseif nread == UV_EOF
if isa(stream, TTY)
stream.status = StatusEOF # libuv called uv_stop_reading already
notify(stream.cond)
notify(stream.closenotify)
finally
unlock(stream.closenotify)
elseif stream.status != StatusClosing
# begin shutdown of the stream
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
end
elseif stream.status != StatusClosing
# begin shutdown of the stream
ccall(:jl_close_uv, Cvoid, (Ptr{Cvoid},), stream.handle)
stream.status = StatusClosing
else
# This is a fatal connection error. Shutdown requests as per the usual
# close function won't work and libuv will fail with an assertion failure
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream)
notify_error(stream.cond, _UVError("read", nread))
end
else
# This is a fatal connection error. Shutdown requests as per the usual
# close function won't work and libuv will fail with an assertion failure
ccall(:jl_forceclose_uv, Cvoid, (Ptr{Cvoid},), stream)
notify_error(stream.readnotify, _UVError("read", nread))
notify_filled(stream.buffer, nread)
notify(stream.cond)
end
else
notify_filled(stream.buffer, nread)
notify(stream.readnotify)
finally
unlock(stream.cond)
end

# Stop background reading when
Expand Down Expand Up @@ -613,11 +622,10 @@ function _uv_hook_close(uv::Union{LibuvStream, LibuvServer})
uv.status = StatusClosed
# notify any listeners that exist on this libuv stream type
notify(uv.closenotify)
notify(uv.cond)
finally
unlock(uv.closenotify)
end
isdefined(uv, :readnotify) && notify(uv.readnotify)
isdefined(uv, :connectnotify) && notify(uv.connectnotify)
nothing
end

Expand Down Expand Up @@ -797,7 +805,7 @@ function readbytes!(s::LibuvStream, a::Vector{UInt8}, nb::Int)
return bytesavailable(newbuf)
finally
s.buffer = sbuf
if !isempty(s.readnotify)
if !isempty(s.cond)
start_reading(s) # resume reading iff there are currently other read clients of the stream
end
end
Expand Down Expand Up @@ -833,7 +841,7 @@ function unsafe_read(s::LibuvStream, p::Ptr{UInt8}, nb::UInt)
nb == bytesavailable(newbuf) || throw(EOFError())
finally
s.buffer = sbuf
if !isempty(s.readnotify)
if !isempty(s.cond)
start_reading(s) # resume reading iff there are currently other read clients of the stream
end
end
Expand Down
2 changes: 2 additions & 0 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ NOINLINE uintptr_t gc_get_stack_ptr(void)
#ifdef JULIA_ENABLE_THREADING
static void jl_gc_wait_for_the_world(void)
{
if (jl_n_threads > 1)
jl_wake_libuv();
for (int i = 0;i < jl_n_threads;i++) {
jl_ptls_t ptls2 = jl_all_tls_states[i];
// FIXME: The acquire load pairs with the release stores
Expand Down
Loading

0 comments on commit 773140e

Please sign in to comment.