Skip to content

Commit

Permalink
A version of the sync macro that throws earlier (#34198)
Browse files Browse the repository at this point in the history
I've been looking at what causes deadlocks in our test suite in an effort to cut down on the number of failed tests on CI that result in hangs (since those are hard to diagnose and resolve). I found that by playing with various resource limits, it is easy to create hangs in the test suite. The reason we get a hang rather than a more easily diagnosable error is two fold. We either:

1. Aren't watching for the error (e.g. a socket remote end closing)
2. We aren't propagating the error to the top level

A very common situation for case 2) is that the test is wrapped in @sync which doesn't return until all tasks have finished or error'ed. However, in many cases one of the tasks produces data for the others, so if that task errors, the remaining tasks will wait forever. This PR aims to address that situation by introducing a new `Experimental.@sync` macro that immediately rethrows any errors thrown by a contained task rather than waiting for all of them to finish. The implementation isn't super performant (it allocates a new task per object being waited on), but should be sufficient for use in the test suite. A better implementation would create a new scheduler object that can be inserted into multiple wait queues.

Example usage of the new macro:

```
@sync begin
    @async error("Hello")
    @async sleep(1000)
end # Waits 1000s

Experimental.@sync begin
    @async error("Hello")
    @async sleep(1000)
end # Throws immediately
```

The macro doesn't do any sort of cleanup for the tasks that do not finish, and just lets them run. In the future, we may want to automatically cancel those tasks, but that seemed like a bigger design problem than the simple thing that I wanted (something that propagates error messages more readily, so we see them in the logs).
  • Loading branch information
Keno committed Mar 13, 2020
1 parent e5ba156 commit 558eec9
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 24 deletions.
52 changes: 52 additions & 0 deletions base/experimental.jl
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
"""
module Experimental

using Base: Threads, sync_varname

"""
Const(A::Array)
Expand Down Expand Up @@ -49,4 +51,54 @@ macro aliasscope(body)
end
end


function sync_end(refs)
local c_ex
defined = false
t = current_task()
cond = Threads.Condition()
lock(cond)
nremaining = length(refs)
for r in refs
schedule(Task(()->begin
try
wait(r)
lock(cond)
nremaining -= 1
nremaining == 0 && notify(cond)
unlock(cond)
catch e
lock(cond)
notify(cond, e; error=true)
unlock(cond)
end
end))
end
wait(cond)
unlock(cond)
end

"""
Experimental.@sync
Wait until all lexically-enclosed uses of `@async`, `@spawn`, `@spawnat` and `@distributed`
are complete, or at least one of them has errored. The first exception is immediately
rethrown. It is the responsibility of the user to cancel any still-running operations
during error handling.
!!! Note
This interface is experimental and subject to change or removal without notice.
"""
macro sync(block)
var = esc(sync_varname)
quote
let $var = Any[]
v = $(esc(block))
sync_end($var)
v
end
end
end


end
12 changes: 4 additions & 8 deletions stdlib/FileWatching/test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Test, FileWatching
using Base: uv_error
using Base: uv_error, Experimental

# This script does the following
# Sets up N unix pipes (or WSA sockets)
Expand Down Expand Up @@ -64,15 +64,14 @@ end

# Odd numbers trigger reads, even numbers timeout
for (i, intvl) in enumerate(intvls)
@sync begin
@Experimental.sync begin
global ready = 0
global ready_c = Condition()
t = Vector{Task}(undef, n)
for idx in 1:n
if isodd(idx)
t[idx] = @async pfd_tst_reads(idx, intvl)
@async pfd_tst_reads(idx, intvl)
else
t[idx] = @async pfd_tst_timeout(idx, intvl)
@async pfd_tst_timeout(idx, intvl)
end
end

Expand All @@ -96,9 +95,6 @@ for (i, intvl) in enumerate(intvls)
end
end
notify(ready_c, all=true)
for idx in 1:n
Base.wait(t[idx])
end
end
end

Expand Down
40 changes: 26 additions & 14 deletions stdlib/Sockets/test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Sockets, Random, Test
using Base: Experimental

# set up a watchdog alarm for 10 minutes
# so that we can attempt to get a "friendly" backtrace if something gets stuck
Expand Down Expand Up @@ -135,7 +136,7 @@ defaultport = rand(2000:4000)
write(sock, "Hello World\n")

# test "locked" println to a socket
@sync begin
@Experimental.sync begin
for i in 1:100
@async println(sock, "a", 1)
end
Expand Down Expand Up @@ -279,7 +280,7 @@ end
bind(a, ip"127.0.0.1", randport)
bind(b, ip"127.0.0.1", randport + 1)

@sync begin
@Experimental.sync begin
let i = 0
for _ = 1:30
@async let msg = String(recv(a))
Expand Down Expand Up @@ -358,22 +359,33 @@ end

# connect to it
client_sock = connect(addr, port)
server_sock = accept(listen_sock)
test_done = false
@Experimental.sync begin
@async begin
Base.wait_readnb(client_sock, 1)
test_done || error("Client disconnected prematurely.")
end
@async begin
server_sock = accept(listen_sock)

self_client_addr, self_client_port = getsockname(client_sock)
peer_client_addr, peer_client_port = getpeername(client_sock)
self_srvr_addr, self_srvr_port = getsockname(server_sock)
peer_srvr_addr, peer_srvr_port = getpeername(server_sock)

self_client_addr, self_client_port = getsockname(client_sock)
peer_client_addr, peer_client_port = getpeername(client_sock)
self_srvr_addr, self_srvr_port = getsockname(server_sock)
peer_srvr_addr, peer_srvr_port = getpeername(server_sock)
@test self_client_addr == peer_client_addr == self_srvr_addr == peer_srvr_addr

@test self_client_addr == peer_client_addr == self_srvr_addr == peer_srvr_addr
@test peer_client_port == self_srvr_port
@test peer_srvr_port == self_client_port
@test self_srvr_port != self_client_port

@test peer_client_port == self_srvr_port
@test peer_srvr_port == self_client_port
@test self_srvr_port != self_client_port
test_done = true

close(listen_sock)
close(client_sock)
close(server_sock)
close(listen_sock)
close(client_sock)
close(server_sock)
end
end
end
end

Expand Down
3 changes: 2 additions & 1 deletion test/channels.jl
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

using Random
using Base: Experimental

@testset "single-threaded Condition usage" begin
a = Condition()
Expand Down Expand Up @@ -253,7 +254,7 @@ end

using Dates
@testset "timedwait on multiple channels" begin
@sync begin
@Experimental.sync begin
rr1 = Channel(1)
rr2 = Channel(1)
rr3 = Channel(1)
Expand Down
3 changes: 2 additions & 1 deletion test/runtests.jl
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ using Distributed
using Dates
import REPL
using Printf: @sprintf
using Base: Experimental

include("choosetests.jl")
include("testenv.jl")
Expand Down Expand Up @@ -189,7 +190,7 @@ cd(@__DIR__) do
end
end
end
@sync begin
@Experimental.sync begin
for p in workers()
@async begin
push!(all_tasks, current_task())
Expand Down

0 comments on commit 558eec9

Please sign in to comment.