Skip to content

Commit

Permalink
Merge pull request #21458 from JuliaLang/jn/yieldto-error
Browse files Browse the repository at this point in the history
improve yieldto error handling
  • Loading branch information
vtjnash committed Apr 23, 2017
2 parents 02d7b14 + 011810a commit 57bcefb
Show file tree
Hide file tree
Showing 12 changed files with 178 additions and 106 deletions.
29 changes: 15 additions & 14 deletions base/channels.jl
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,9 @@ true
"""
function Channel(func::Function; ctype=Any, csize=0, taskref=nothing)
chnl = Channel{ctype}(csize)
task = Task(()->func(chnl))
bind(chnl,task)
schedule(task)
yield()
task = Task(() -> func(chnl))
bind(chnl, task)
yield(task) # immediately start it

isa(taskref, Ref{Task}) && (taskref[] = task)
return chnl
Expand Down Expand Up @@ -215,14 +214,13 @@ function channeled_tasks(n::Int, funcs...; ctypes=fill(Any,n), csizes=fill(0,n))
@assert length(csizes) == n
@assert length(ctypes) == n

chnls = map(i->Channel{ctypes[i]}(csizes[i]), 1:n)
tasks=Task[Task(()->f(chnls...)) for f in funcs]
chnls = map(i -> Channel{ctypes[i]}(csizes[i]), 1:n)
tasks = Task[ Task(() -> f(chnls...)) for f in funcs ]

# bind all tasks to all channels and schedule them
foreach(t -> foreach(c -> bind(c,t), chnls), tasks)
foreach(t -> foreach(c -> bind(c, t), chnls), tasks)
foreach(schedule, tasks)

yield() # Allow scheduled tasks to run
yield() # Allow scheduled tasks to run

return (chnls, tasks)
end
Expand Down Expand Up @@ -283,9 +281,8 @@ function put_unbuffered(c::Channel, v)
end
end
taker = shift!(c.takers)
schedule(current_task())
yieldto(taker, v)
v
yield(taker, v) # immediately give taker a chance to run, but don't block the current task
return v
end

push!(c::Channel, v) = put!(c, v)
Expand Down Expand Up @@ -328,8 +325,12 @@ function take_unbuffered(c::Channel{T}) where T
push!(c.takers, current_task())
try
if length(c.putters) > 0
putter = shift!(c.putters)
return yieldto(putter)::T
let putter = shift!(c.putters)
return Base.try_yieldto(putter) do
# if we fail to start putter, put it back in the queue
unshift!(c.putters, putter)
end::T
end
else
return wait()::T
end
Expand Down
116 changes: 72 additions & 44 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -127,8 +127,8 @@ function schedule(t::Task, arg; error=false)
return enq_work(t)
end

# fast version of schedule(t,v);wait()
function schedule_and_wait(t::Task, v=nothing)
# fast version of `schedule(t, arg); wait()`
function schedule_and_wait(t::Task, arg=nothing)
t.state == :runnable || error("schedule: Task not runnable")
if isempty(Workqueue)
return yieldto(t, v)
Expand All @@ -149,6 +149,19 @@ tasks.
"""
yield() = (enq_work(current_task()); wait())

"""
yield(t::Task, arg = nothing)
A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
immediately yields to `t` before calling the scheduler.
"""
function yield(t::Task, x::ANY = nothing)
t.state == :runnable || error("schedule: Task not runnable")
t.result = x
enq_work(current_task())
return try_yieldto(ensure_self_descheduled, t)
end

"""
yieldto(t::Task, arg = nothing)
Expand All @@ -157,12 +170,44 @@ called with no arguments. On subsequent switches, `arg` is returned from the tas
call to `yieldto`. This is a low-level call that only switches tasks, not considering states
or scheduling in any way. Its use is discouraged.
"""
yieldto(t::Task, x::ANY = nothing) = ccall(:jl_switchto, Any, (Any, Any), t, x)
function yieldto(t::Task, x::ANY = nothing)
t.result = x
return try_yieldto(Void, t)
end

function try_yieldto(undo::F, t::Task) where F
try
ccall(:jl_switchto, Void, (Any,), t)
catch e
undo()
rethrow(e)
end
ct = current_task()
exc = ct.exception
if exc !== nothing
ct.exception = nothing
throw(exc)
end
result = ct.result
ct.result = nothing
return result
end

# yield to a task, throwing an exception in it
function throwto(t::Task, exc)
function throwto(t::Task, exc::ANY)
t.exception = exc
yieldto(t)
return yieldto(t)
end

function ensure_self_descheduled()
# return a queued task to the runnable state
ct = current_task()
if ct.state == :queued
i = findfirst(Workqueue, ct)
i == 0 || deleteat!(Workqueue, i)
ct.state = :runnable
end
nothing
end

function wait()
Expand All @@ -175,43 +220,28 @@ function wait()
pause()
end
else
t = shift!(Workqueue)
if t.state != :queued
# assume this somehow got queued twice,
# probably broken now, but try discarding this switch and keep going
# can't throw here, because it's probably not the fault of the caller to wait
# and don't want to use print() here, because that may try to incur a task switch
ccall(:jl_safe_printf, Void, (Ptr{UInt8}, Vararg{Int32}),
"\nWARNING: Workqueue inconsistency detected: shift!(Workqueue).state != :queued\n")
continue
end
arg = t.result
t.result = nothing
t.state = :runnable
local result
try
result = yieldto(t, arg)
current_task().state == :runnable || throw(AssertionError("current_task().state == :runnable"))
catch e
ct = current_task()
if ct.state == :queued
if t.state == :runnable
# assume we failed to queue t
# return it to the queue to be scheduled later
t.result = arg
t.state = :queued
push!(Workqueue, t)
end
# return ourself to the runnable state
i = findfirst(Workqueue, ct)
i == 0 || deleteat!(Workqueue, i)
ct.state = :runnable
let t = shift!(Workqueue)
if t.state != :queued
# assume this somehow got queued twice,
# probably broken now, but try discarding this switch and keep going
# can't throw here, because it's probably not the fault of the caller to wait
# and don't want to use print() here, because that may try to incur a task switch
ccall(:jl_safe_printf, Void, (Ptr{UInt8}, Vararg{Int32}),
"\nWARNING: Workqueue inconsistency detected: shift!(Workqueue).state != :queued\n")
continue
end
t.state = :runnable
result = try_yieldto(t) do
# we failed to yield to t
# return it to the head of the queue to be scheduled later
unshift!(Workqueue, t)
t.state = :queued
ensure_self_descheduled()
end
rethrow(e)
process_events(false)
# return when we come out of the queue
return result
end
process_events(false)
# return when we come out of the queue
return result
end
end
assert(false)
Expand Down Expand Up @@ -277,8 +307,7 @@ function AsyncCondition(cb::Function)
end)
# must start the task right away so that it can wait for the AsyncCondition before
# we re-enter the event loop. this avoids a race condition. see issue #12719
enq_work(current_task())
yieldto(waiter)
yield(waiter)
return async
end

Expand Down Expand Up @@ -409,7 +438,6 @@ function Timer(cb::Function, timeout::Real, repeat::Real=0.0)
end)
# must start the task right away so that it can wait for the Timer before
# we re-enter the event loop. this avoids a race condition. see issue #12719
enq_work(current_task())
yieldto(waiter)
yield(waiter)
return t
end
30 changes: 23 additions & 7 deletions base/stream.jl
Original file line number Diff line number Diff line change
Expand Up @@ -794,7 +794,7 @@ uv_write(s::LibuvStream, p::Vector{UInt8}) = uv_write(s, pointer(p), UInt(sizeof
function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
check_open(s)
uvw = Libc.malloc(_sizeof_uv_write)
uv_req_set_data(uvw,C_NULL)
uv_req_set_data(uvw, C_NULL) # in case we get interrupted before arriving at the wait call
err = ccall(:jl_uv_write,
Int32,
(Ptr{Void}, Ptr{Void}, UInt, Ptr{Void}, Ptr{Void}),
Expand All @@ -805,8 +805,21 @@ function uv_write(s::LibuvStream, p::Ptr{UInt8}, n::UInt)
uv_error("write", err)
end
ct = current_task()
uv_req_set_data(uvw,ct)
stream_wait(ct)
preserve_handle(ct)
try
uv_req_set_data(uvw, ct)
wait()
finally
if uv_req_data(uvw) != C_NULL
# uvw is still alive,
# so make sure we don't get spurious notifications later
uv_req_set_data(uvw, C_NULL)
else
# done with uvw
Libc.free(uvw)
end
unpreserve_handle(ct)
end
return Int(n)
end

Expand Down Expand Up @@ -855,14 +868,17 @@ write(s::LibuvStream, b::UInt8) = write(s, Ref{UInt8}(b))
function uv_writecb_task(req::Ptr{Void}, status::Cint)
d = uv_req_data(req)
if d != C_NULL
uv_req_set_data(req, C_NULL)
if status < 0
err = UVError("write",status)
schedule(unsafe_pointer_to_objref(d)::Task,err,error=true)
err = UVError("write", status)
schedule(unsafe_pointer_to_objref(d)::Task, err, error=true)
else
schedule(unsafe_pointer_to_objref(d)::Task)
end
else
# no owner for this req, safe to just free it
Libc.free(req)
end
Libc.free(req)
nothing
end

Expand Down Expand Up @@ -948,7 +964,7 @@ end
function connect!(sock::PipeEndpoint, path::AbstractString)
@assert sock.status == StatusInit
req = Libc.malloc(_sizeof_uv_connect)
uv_req_set_data(req,C_NULL)
uv_req_set_data(req, C_NULL)
ccall(:uv_pipe_connect, Void, (Ptr{Void}, Ptr{Void}, Cstring, Ptr{Void}), req, sock.handle, path, uv_jl_connectcb::Ptr{Void})
sock.status = StatusConnecting
return sock
Expand Down
4 changes: 2 additions & 2 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ function task_done_hook(t::Task)
if isa(result,InterruptException) && isdefined(Base,:active_repl_backend) &&
active_repl_backend.backend_task.state == :runnable && isempty(Workqueue) &&
active_repl_backend.in_eval
throwto(active_repl_backend.backend_task, result)
throwto(active_repl_backend.backend_task, result) # this terminates the task
end
if !suppress_excp_printing(t)
let bt = t.backtrace
Expand All @@ -253,7 +253,7 @@ function task_done_hook(t::Task)
end
# Clear sigatomic before waiting
sigatomic_end()
wait()
wait() # this will not return
end


Expand Down
2 changes: 1 addition & 1 deletion doc/src/stdlib/parallel.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,11 @@

```@docs
Core.Task
Base.yieldto
Base.current_task
Base.istaskdone
Base.istaskstarted
Base.yield
Base.yieldto
Base.task_local_storage(::Any)
Base.task_local_storage(::Any, ::Any)
Base.task_local_storage(::Function, ::Any, ::Any)
Expand Down
2 changes: 1 addition & 1 deletion src/dump.c
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ static const jl_fptr_t id_to_fptrs[] = {
jl_f_tuple, jl_f_svec, jl_f_intrinsic_call, jl_f_invoke_kwsorter,
jl_f_getfield, jl_f_setfield, jl_f_fieldtype, jl_f_nfields,
jl_f_arrayref, jl_f_arrayset, jl_f_arraysize, jl_f_apply_type,
jl_f_applicable, jl_f_invoke, jl_unprotect_stack, jl_f_sizeof, jl_f__expr,
jl_f_applicable, jl_f_invoke, jl_f_sizeof, jl_f__expr,
NULL };

static const intptr_t LongSymbol_tag = 23;
Expand Down
5 changes: 3 additions & 2 deletions src/gc.c
Original file line number Diff line number Diff line change
Expand Up @@ -1710,7 +1710,6 @@ static void jl_gc_mark_thread_local(jl_ptls_t ptls, jl_ptls_t ptls2)
gc_push_root(ptls, ptls2->current_task, 0);
gc_push_root(ptls, ptls2->root_task, 0);
gc_push_root(ptls, ptls2->exception_in_transit, 0);
gc_push_root(ptls, ptls2->task_arg_in_transit, 0);
}

// mark the initial root set
Expand All @@ -1734,7 +1733,9 @@ static void mark_roots(jl_ptls_t ptls)
if (jl_all_methods != NULL)
gc_push_root(ptls, jl_all_methods, 0);

// gc_push_root(ptls, jl_unprotect_stack_func, 0);
#ifndef COPY_STACKS
gc_push_root(ptls, jl_unprotect_stack_func, 0);
#endif

// constants
gc_push_root(ptls, jl_typetype_type, 0);
Expand Down
3 changes: 1 addition & 2 deletions src/julia.h
Original file line number Diff line number Diff line change
Expand Up @@ -1495,7 +1495,7 @@ typedef struct _jl_task_t {
} jl_task_t;

JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, size_t ssize);
JL_DLLEXPORT jl_value_t *jl_switchto(jl_task_t *t, jl_value_t *arg);
JL_DLLEXPORT void jl_switchto(jl_task_t *t);
JL_DLLEXPORT void JL_NORETURN jl_throw(jl_value_t *e);
JL_DLLEXPORT void JL_NORETURN jl_rethrow(void);
JL_DLLEXPORT void JL_NORETURN jl_rethrow_other(jl_value_t *e);
Expand Down Expand Up @@ -1797,7 +1797,6 @@ typedef struct {
#define jl_current_task (jl_get_ptls_states()->current_task)
#define jl_root_task (jl_get_ptls_states()->root_task)
#define jl_exception_in_transit (jl_get_ptls_states()->exception_in_transit)
#define jl_task_arg_in_transit (jl_get_ptls_states()->task_arg_in_transit)


// codegen interface ----------------------------------------------------------
Expand Down
1 change: 0 additions & 1 deletion src/julia_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -401,7 +401,6 @@ JL_DLLEXPORT void jl_typeassert(jl_value_t *x, jl_value_t *t);
#define JL_CALLABLE(name) \
JL_DLLEXPORT jl_value_t *name(jl_value_t *F, jl_value_t **args, uint32_t nargs)

JL_CALLABLE(jl_unprotect_stack);
JL_CALLABLE(jl_f_tuple);
JL_CALLABLE(jl_f_intrinsic_call);
extern jl_function_t *jl_unprotect_stack_func;
Expand Down
1 change: 0 additions & 1 deletion src/julia_threads.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@ typedef struct _jl_tls_states_t {
struct _jl_module_t *current_module;
struct _jl_task_t *volatile current_task;
struct _jl_task_t *root_task;
struct _jl_value_t *volatile task_arg_in_transit;
void *stackbase;
char *stack_lo;
char *stack_hi;
Expand Down
Loading

0 comments on commit 57bcefb

Please sign in to comment.