From bf7baa4a64739692418f48f051c27e0b870e9793 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Thu, 10 Jan 2019 17:16:03 -0500 Subject: [PATCH 1/5] run uv-run only on tid 0 --- src/jl_uv.c | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/src/jl_uv.c b/src/jl_uv.c index 6eb01267bdf52..93d850e1cfd7d 100644 --- a/src/jl_uv.c +++ b/src/jl_uv.c @@ -181,33 +181,33 @@ JL_DLLEXPORT void *jl_uv_write_handle(uv_write_t *req) { return req->handle; } JL_DLLEXPORT int jl_run_once(uv_loop_t *loop) { jl_ptls_t ptls = jl_get_ptls_states(); - if (loop) { + if (loop && ptls->tid == 0) { loop->stop_flag = 0; jl_gc_safepoint_(ptls); - return uv_run(loop,UV_RUN_ONCE); + return uv_run(loop, UV_RUN_ONCE); } - else return 0; + return 0; } JL_DLLEXPORT void jl_run_event_loop(uv_loop_t *loop) { jl_ptls_t ptls = jl_get_ptls_states(); - if (loop) { + if (loop && ptls->tid == 0) { loop->stop_flag = 0; jl_gc_safepoint_(ptls); - uv_run(loop,UV_RUN_DEFAULT); + uv_run(loop, UV_RUN_DEFAULT); } } JL_DLLEXPORT int jl_process_events(uv_loop_t *loop) { jl_ptls_t ptls = jl_get_ptls_states(); - if (loop) { + if (loop && ptls->tid == 0) { loop->stop_flag = 0; jl_gc_safepoint_(ptls); - return uv_run(loop,UV_RUN_NOWAIT); + return uv_run(loop, UV_RUN_NOWAIT); } - else return 0; + return 0; } static void jl_proc_exit_cleanup(uv_process_t *process, int64_t exit_status, int term_signal) From 8485ea3aa05b4db9c5e29cd4b8f2d785cdff37e7 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Wed, 9 Jan 2019 14:00:27 -0500 Subject: [PATCH 2/5] tasks: code reorg [nfc] --- base/Base.jl | 2 +- base/event.jl | 188 ------------------------------------------------- base/task.jl | 189 ++++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 190 insertions(+), 189 deletions(-) diff --git a/base/Base.jl b/base/Base.jl index b7807495c3366..d51e20e455e41 100644 --- a/base/Base.jl +++ b/base/Base.jl @@ -241,9 +241,9 @@ include("env.jl") # Scheduling include("libuv.jl") include("event.jl") -include("task.jl") include("threads.jl") include("lock.jl") +include("task.jl") include("weakkeydict.jl") # Logging diff --git a/base/event.jl b/base/event.jl index 0c73e09e3deea..4e1fd96bf06e7 100644 --- a/base/event.jl +++ b/base/event.jl @@ -161,194 +161,6 @@ This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-saf const Condition = GenericCondition{AlwaysLockedST} -## scheduler and work queue - -global const Workqueue = Task[] - -function enq_work(t::Task) - t.state == :runnable || error("schedule: Task not runnable") - ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop()) - push!(Workqueue, t) - t.state = :queued - return t -end - -schedule(t::Task) = enq_work(t) - -""" - schedule(t::Task, [val]; error=false) - -Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system -is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref). - -If a second argument `val` is provided, it will be passed to the task (via the return value of -[`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in -the woken task. - -# Examples -```jldoctest -julia> a5() = sum(i for i in 1:1000); - -julia> b = Task(a5); - -julia> istaskstarted(b) -false - -julia> schedule(b); - -julia> yield(); - -julia> istaskstarted(b) -true - -julia> istaskdone(b) -true -``` -""" -function schedule(t::Task, arg; error=false) - # schedule a task to be (re)started with the given value or exception - if error - t.exception = arg - else - t.result = arg - end - return enq_work(t) -end - -# fast version of `schedule(t, arg); wait()` -function schedule_and_wait(t::Task, arg=nothing) - t.state == :runnable || error("schedule: Task not runnable") - if isempty(Workqueue) - return yieldto(t, arg) - else - t.result = arg - push!(Workqueue, t) - t.state = :queued - end - return wait() -end - -""" - yield() - -Switch to the scheduler to allow another scheduled task to run. A task that calls this -function is still runnable, and will be restarted immediately if there are no other runnable -tasks. -""" -yield() = (enq_work(current_task()); wait()) - -""" - yield(t::Task, arg = nothing) - -A fast, unfair-scheduling version of `schedule(t, arg); yield()` which -immediately yields to `t` before calling the scheduler. -""" -function yield(t::Task, @nospecialize x = nothing) - t.state == :runnable || error("schedule: Task not runnable") - t.result = x - enq_work(current_task()) - return try_yieldto(ensure_rescheduled, Ref(t)) -end - -""" - yieldto(t::Task, arg = nothing) - -Switch to the given task. The first time a task is switched to, the task's function is -called with no arguments. On subsequent switches, `arg` is returned from the task's last -call to `yieldto`. This is a low-level call that only switches tasks, not considering states -or scheduling in any way. Its use is discouraged. -""" -function yieldto(t::Task, @nospecialize x = nothing) - t.result = x - return try_yieldto(identity, Ref(t)) -end - -function try_yieldto(undo, reftask::Ref{Task}) - try - ccall(:jl_switchto, Cvoid, (Any,), reftask) - catch - undo(reftask[]) - rethrow() - end - ct = current_task() - exc = ct.exception - if exc !== nothing - ct.exception = nothing - throw(exc) - end - result = ct.result - ct.result = nothing - return result -end - -# yield to a task, throwing an exception in it -function throwto(t::Task, @nospecialize exc) - t.exception = exc - return yieldto(t) -end - -function ensure_rescheduled(othertask::Task) - ct = current_task() - if ct !== othertask && othertask.state == :runnable - # we failed to yield to othertask - # return it to the head of the queue to be scheduled later - pushfirst!(Workqueue, othertask) - othertask.state = :queued - end - if ct.state == :queued - # if the current task was queued, - # also need to return it to the runnable state - # before throwing an error - i = findfirst(t->t===ct, Workqueue) - i === nothing || deleteat!(Workqueue, i) - ct.state = :runnable - end - nothing -end - -@noinline function poptask() - t = popfirst!(Workqueue) - if t.state != :queued - # assume this somehow got queued twice, - # probably broken now, but try discarding this switch and keep going - # can't throw here, because it's probably not the fault of the caller to wait - # and don't want to use print() here, because that may try to incur a task switch - ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...), - "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :queued\n") - return - end - t.state = :runnable - return Ref(t) -end - -function wait() - while true - if isempty(Workqueue) - c = process_events(true) - if c == 0 && eventloop() != C_NULL && isempty(Workqueue) - # if there are no active handles and no runnable tasks, just - # wait for signals. - pause() - end - else - reftask = poptask() - if reftask !== nothing - result = try_yieldto(ensure_rescheduled, reftask) - process_events(false) - # return when we come out of the queue - return result - end - end - end - # unreachable -end - -if Sys.iswindows() - pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff) -else - pause() = ccall(:pause, Cvoid, ()) -end - ## async event notifications diff --git a/base/task.jl b/base/task.jl index 4045cde09ffa1..46371916d8cdb 100644 --- a/base/task.jl +++ b/base/task.jl @@ -354,3 +354,192 @@ function timedwait(testcb::Function, secs::Float64; pollint::Float64=0.1) end ret end + + +## scheduler and work queue + +global const Workqueue = Task[] + +function enq_work(t::Task) + t.state == :runnable || error("schedule: Task not runnable") + ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop()) + push!(Workqueue, t) + t.state = :queued + return t +end + +schedule(t::Task) = enq_work(t) + +""" + schedule(t::Task, [val]; error=false) + +Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system +is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref). + +If a second argument `val` is provided, it will be passed to the task (via the return value of +[`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in +the woken task. + +# Examples +```jldoctest +julia> a5() = sum(i for i in 1:1000); + +julia> b = Task(a5); + +julia> istaskstarted(b) +false + +julia> schedule(b); + +julia> yield(); + +julia> istaskstarted(b) +true + +julia> istaskdone(b) +true +``` +""" +function schedule(t::Task, arg; error=false) + # schedule a task to be (re)started with the given value or exception + if error + t.exception = arg + else + t.result = arg + end + return enq_work(t) +end + +# fast version of `schedule(t, arg); wait()` +function schedule_and_wait(t::Task, arg=nothing) + t.state == :runnable || error("schedule: Task not runnable") + if isempty(Workqueue) + return yieldto(t, arg) + else + t.result = arg + push!(Workqueue, t) + t.state = :queued + end + return wait() +end + +""" + yield() + +Switch to the scheduler to allow another scheduled task to run. A task that calls this +function is still runnable, and will be restarted immediately if there are no other runnable +tasks. +""" +yield() = (enq_work(current_task()); wait()) + +""" + yield(t::Task, arg = nothing) + +A fast, unfair-scheduling version of `schedule(t, arg); yield()` which +immediately yields to `t` before calling the scheduler. +""" +function yield(t::Task, @nospecialize x = nothing) + t.state == :runnable || error("schedule: Task not runnable") + t.result = x + enq_work(current_task()) + return try_yieldto(ensure_rescheduled, Ref(t)) +end + +""" + yieldto(t::Task, arg = nothing) + +Switch to the given task. The first time a task is switched to, the task's function is +called with no arguments. On subsequent switches, `arg` is returned from the task's last +call to `yieldto`. This is a low-level call that only switches tasks, not considering states +or scheduling in any way. Its use is discouraged. +""" +function yieldto(t::Task, @nospecialize x = nothing) + t.result = x + return try_yieldto(identity, Ref(t)) +end + +function try_yieldto(undo, reftask::Ref{Task}) + try + ccall(:jl_switchto, Cvoid, (Any,), reftask) + catch + undo(reftask[]) + rethrow() + end + ct = current_task() + exc = ct.exception + if exc !== nothing + ct.exception = nothing + throw(exc) + end + result = ct.result + ct.result = nothing + return result +end + +# yield to a task, throwing an exception in it +function throwto(t::Task, @nospecialize exc) + t.exception = exc + return yieldto(t) +end + +function ensure_rescheduled(othertask::Task) + ct = current_task() + if ct !== othertask && othertask.state == :runnable + # we failed to yield to othertask + # return it to the head of the queue to be scheduled later + pushfirst!(Workqueue, othertask) + othertask.state = :queued + end + if ct.state == :queued + # if the current task was queued, + # also need to return it to the runnable state + # before throwing an error + i = findfirst(t->t===ct, Workqueue) + i === nothing || deleteat!(Workqueue, i) + ct.state = :runnable + end + nothing +end + +@noinline function poptask() + t = popfirst!(Workqueue) + if t.state != :queued + # assume this somehow got queued twice, + # probably broken now, but try discarding this switch and keep going + # can't throw here, because it's probably not the fault of the caller to wait + # and don't want to use print() here, because that may try to incur a task switch + ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...), + "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :queued\n") + return + end + t.state = :runnable + return Ref(t) +end + +function wait() + while true + if isempty(Workqueue) + c = process_events(true) + if c == 0 && eventloop() != C_NULL && isempty(Workqueue) + # if there are no active handles and no runnable tasks, just + # wait for signals. + pause() + end + else + reftask = poptask() + if reftask !== nothing + result = try_yieldto(ensure_rescheduled, reftask) + process_events(false) + # return when we come out of the queue + return result + end + end + end + # unreachable +end + +if Sys.iswindows() + pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff) +else + pause() = ccall(:pause, Cvoid, ()) +end From 72d502a93c696fcb071665b3b9dd76faf4215ff9 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Tue, 15 Jan 2019 13:54:08 -0500 Subject: [PATCH 3/5] tasks: reorg wait function [nfc] --- base/task.jl | 42 +++++++++++++++++++++++------------------- 1 file changed, 23 insertions(+), 19 deletions(-) diff --git a/base/task.jl b/base/task.jl index 46371916d8cdb..3beb5e12202e9 100644 --- a/base/task.jl +++ b/base/task.jl @@ -501,7 +501,8 @@ function ensure_rescheduled(othertask::Task) nothing end -@noinline function poptask() +function trypoptask() + isempty(Workqueue) && return t = popfirst!(Workqueue) if t.state != :queued # assume this somehow got queued twice, @@ -513,29 +514,32 @@ end return end t.state = :runnable - return Ref(t) + return t end -function wait() +@noinline function poptaskref() + local task while true - if isempty(Workqueue) - c = process_events(true) - if c == 0 && eventloop() != C_NULL && isempty(Workqueue) - # if there are no active handles and no runnable tasks, just - # wait for signals. - pause() - end - else - reftask = poptask() - if reftask !== nothing - result = try_yieldto(ensure_rescheduled, reftask) - process_events(false) - # return when we come out of the queue - return result - end + task = trypoptask() + task === nothing || break + if process_events(true) == 0 + task = trypoptask() + task === nothing || break + # if there are no active handles and no runnable tasks, just + # wait for signals. + pause() end end - # unreachable + return Ref(task) +end + + +function wait() + reftask = poptaskref() + result = try_yieldto(ensure_rescheduled, reftask) + process_events(false) + # return when we come out of the queue + return result end if Sys.iswindows() From 9902531419a91210d01126ad6979ee8c7cdad4a1 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Thu, 3 Jan 2019 15:48:15 -0500 Subject: [PATCH 4/5] scheduler: use linked list for Workqueue --- base/Base.jl | 1 + base/event.jl | 23 ++- base/linked_list.jl | 151 ++++++++++++++++++++ base/task.jl | 49 ++++--- doc/src/manual/control-flow.md | 4 +- src/init.c | 26 +--- src/julia.h | 2 + src/task.c | 16 ++- stdlib/Distributed/test/distributed_exec.jl | 12 +- stdlib/REPL/test/repl.jl | 1 + stdlib/Serialization/src/Serialization.jl | 6 +- test/channels.jl | 2 +- test/runtests.jl | 10 +- test/threads.jl | 82 +++++++++++ 14 files changed, 309 insertions(+), 76 deletions(-) create mode 100644 base/linked_list.jl diff --git a/base/Base.jl b/base/Base.jl index d51e20e455e41..a48d365489669 100644 --- a/base/Base.jl +++ b/base/Base.jl @@ -240,6 +240,7 @@ include("env.jl") # Scheduling include("libuv.jl") +include("linked_list.jl") include("event.jl") include("threads.jl") include("lock.jl") diff --git a/base/event.jl b/base/event.jl index 4e1fd96bf06e7..ecd1db5525c2b 100644 --- a/base/event.jl +++ b/base/event.jl @@ -56,12 +56,12 @@ Abstract implementation of a condition object for synchonizing tasks objects with a given lock. """ struct GenericCondition{L<:AbstractLock} - waitq::Vector{Any} + waitq::InvasiveLinkedList{Task} lock::L - GenericCondition{L}() where {L<:AbstractLock} = new{L}([], L()) - GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}([], l) - GenericCondition(l::AbstractLock) = new{typeof(l)}([], l) + GenericCondition{L}() where {L<:AbstractLock} = new{L}(InvasiveLinkedList{Task}(), L()) + GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}(InvasiveLinkedList{Task}(), l) + GenericCondition(l::AbstractLock) = new{typeof(l)}(InvasiveLinkedList{Task}(), l) end assert_havelock(c::GenericCondition) = assert_havelock(c.lock) @@ -94,11 +94,10 @@ function wait(c::GenericCondition) assert_havelock(c) push!(c.waitq, ct) token = unlockall(c.lock) - try return wait() catch - filter!(x->x!==ct, c.waitq) + list_deletefirst!(c.waitq, ct) rethrow() finally relockall(c.lock, token) @@ -118,16 +117,11 @@ notify(c::GenericCondition, @nospecialize(arg = nothing); all=true, error=false) function notify(c::GenericCondition, @nospecialize(arg), all, error) assert_havelock(c) cnt = 0 - if all - cnt = length(c.waitq) - for t in c.waitq - schedule(t, arg, error=error) - end - empty!(c.waitq) - elseif !isempty(c.waitq) - cnt = 1 + while !isempty(c.waitq) t = popfirst!(c.waitq) schedule(t, arg, error=error) + cnt += 1 + all || break end return cnt end @@ -161,7 +155,6 @@ This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-saf const Condition = GenericCondition{AlwaysLockedST} - ## async event notifications """ diff --git a/base/linked_list.jl b/base/linked_list.jl new file mode 100644 index 0000000000000..195d2d02d61f1 --- /dev/null +++ b/base/linked_list.jl @@ -0,0 +1,151 @@ +# This file is a part of Julia. License is MIT: https://julialang.org/license + +mutable struct InvasiveLinkedList{T} + # Invasive list requires that T have a field `.next >: U{T, Nothing}` and `.queue >: U{ILL{T}, Nothing}` + head::Union{T, Nothing} + tail::Union{T, Nothing} + InvasiveLinkedList{T}() where {T} = new{T}(nothing, nothing) +end + +#const list_append!! = append! +#const list_deletefirst! = delete! + +eltype(::Type{<:InvasiveLinkedList{T}}) where {T} = @isdefined(T) ? T : Any + +iterate(q::InvasiveLinkedList) = (h = q.head; h === nothing ? nothing : (h, h)) +iterate(q::InvasiveLinkedList{T}, v::T) where {T} = (h = v.next; h === nothing ? nothing : (h, h)) + +isempty(q::InvasiveLinkedList) = (q.head === nothing) + +function length(q::InvasiveLinkedList) + i = 0 + head = q.head + while head !== nothing + i += 1 + head = head.next + end + return i +end + +function list_append!!(q::InvasiveLinkedList{T}, q2::InvasiveLinkedList{T}) where T + q === q2 && error("can't append list to itself") + head2 = q2.head + if head2 !== nothing + tail2 = q2.tail::T + q2.head = nothing + q2.tail = nothing + tail = q.tail + q.tail = tail2 + if tail === nothing + q.head = head2 + else + tail.next = head2 + end + while head2 !== nothing + head2.queue = q + head2 = head2.next + end + end + return q +end + +function push!(q::InvasiveLinkedList{T}, val::T) where T + val.queue === nothing || error("val already in a list") + val.queue = q + tail = q.tail + if tail === nothing + q.head = q.tail = val + else + tail.next = val + q.tail = val + end + return q +end + +function pushfirst!(q::InvasiveLinkedList{T}, val::T) where T + val.queue === nothing || error("val already in a list") + val.queue = q + head = q.head + if head === nothing + q.head = q.tail = val + else + val.next = head + q.head = val + end + return q +end + +function pop!(q::InvasiveLinkedList{T}) where {T} + val = q.tail::T + list_deletefirst!(q, val) # expensive! + return val +end + +function popfirst!(q::InvasiveLinkedList{T}) where {T} + val = q.head::T + list_deletefirst!(q, val) # cheap + return val +end + +function list_deletefirst!(q::InvasiveLinkedList{T}, val::T) where T + val.queue === q || return + head = q.head::T + if head === val + if q.tail::T === val + q.head = q.tail = nothing + else + q.head = val.next::T + end + else + head_next = head.next + while head_next !== val + head = head_next + head_next = head.next + end + if q.tail::T === val + head.next = nothing + q.tail = head + else + head.next = val.next::T + end + end + val.next = nothing + val.queue = nothing + return q +end + +#function list_deletefirst!(q::Array{T}, val::T) where T +# i = findfirst(isequal(val), q) +# i === nothing || deleteat!(q, i) +# return q +#end + + +mutable struct LinkedListItem{T} + # Adapter class to use any `T` in a LinkedList + next::Union{LinkedListItem{T}, Nothing} + queue::Union{InvasiveLinkedList{LinkedListItem{T}}, Nothing} + value::T + LinkedListItem{T}(value::T) where {T} = new{T}(nothing, nothing, value) +end +const LinkedList{T} = InvasiveLinkedList{LinkedListItem{T}} + +# delegate methods, as needed +eltype(::Type{<:LinkedList{T}}) where {T} = @isdefined(T) ? T : Any +iterate(q::LinkedList) = (h = q.head; h === nothing ? nothing : (h.value, h)) +iterate(q::InvasiveLinkedList{LLT}, v::LLT) where {LLT<:LinkedListItem} = (h = v.next; h === nothing ? nothing : (h.value, h)) +push!(q::LinkedList{T}, val::T) where {T} = push!(q, LinkedListItem{T}(val)) +pushfirst!(q::LinkedList{T}, val::T) where {T} = pushfirst!(q, LinkedListItem{T}(val)) +pop!(q::LinkedList) = invoke(pop!, Tuple{InvasiveLinkedList,}, q).value +popfirst!(q::LinkedList) = invoke(popfirst!, Tuple{InvasiveLinkedList,}, q).value +function list_deletefirst!(q::LinkedList{T}, val::T) where T + h = q.head + while h !== nothing + if isequal(h.value, val) + list_deletefirst!(q, h) + break + end + h = h.next + end + return q +end diff --git a/base/task.jl b/base/task.jl index 3beb5e12202e9..9a854048fd910 100644 --- a/base/task.jl +++ b/base/task.jl @@ -142,7 +142,7 @@ function get_task_tls(t::Task) if t.storage === nothing t.storage = IdDict() end - (t.storage)::IdDict{Any,Any} + return (t.storage)::IdDict{Any,Any} end """ @@ -168,12 +168,13 @@ for emulating dynamic scoping. """ function task_local_storage(body::Function, key, val) tls = task_local_storage() - hadkey = haskey(tls,key) - old = get(tls,key,nothing) + hadkey = haskey(tls, key) + old = get(tls, key, nothing) tls[key] = val - try body() + try + return body() finally - hadkey ? (tls[key] = old) : delete!(tls,key) + hadkey ? (tls[key] = old) : delete!(tls, key) end end @@ -200,7 +201,7 @@ exception, the exception is propagated (re-thrown in the task that called fetch) """ function fetch(t::Task) wait(t) - task_result(t) + return task_result(t) end @@ -264,6 +265,7 @@ macro async(expr) push!($var, task) end schedule(task) + task end end @@ -358,13 +360,12 @@ end ## scheduler and work queue -global const Workqueue = Task[] +global const Workqueue = InvasiveLinkedList{Task}() function enq_work(t::Task) - t.state == :runnable || error("schedule: Task not runnable") + (t.state == :runnable && t.queue === nothing) || error("schedule: Task not runnable") ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop()) push!(Workqueue, t) - t.state = :queued return t end @@ -400,25 +401,28 @@ julia> istaskdone(b) true ``` """ -function schedule(t::Task, arg; error=false) +function schedule(t::Task, @nospecialize(arg); error=false) # schedule a task to be (re)started with the given value or exception + t.state == :runnable || error("schedule: Task not runnable") if error + t.queue === nothing || Base.list_deletefirst!(t.queue, t) t.exception = arg else + t.queue === nothing || error("schedule: Task not runnable") t.result = arg end - return enq_work(t) + enq_work(t) + return t end # fast version of `schedule(t, arg); wait()` -function schedule_and_wait(t::Task, arg=nothing) - t.state == :runnable || error("schedule: Task not runnable") +function schedule_and_wait(t::Task, @nospecialize(arg)=nothing) + (t.state == :runnable && t.queue === nothing) || error("schedule: Task not runnable") if isempty(Workqueue) return yieldto(t, arg) else t.result = arg push!(Workqueue, t) - t.state = :queued end return wait() end @@ -438,8 +442,7 @@ yield() = (enq_work(current_task()); wait()) A fast, unfair-scheduling version of `schedule(t, arg); yield()` which immediately yields to `t` before calling the scheduler. """ -function yield(t::Task, @nospecialize x = nothing) - t.state == :runnable || error("schedule: Task not runnable") +function yield(t::Task, @nospecialize(x=nothing)) t.result = x enq_work(current_task()) return try_yieldto(ensure_rescheduled, Ref(t)) @@ -453,7 +456,7 @@ called with no arguments. On subsequent switches, `arg` is returned from the tas call to `yieldto`. This is a low-level call that only switches tasks, not considering states or scheduling in any way. Its use is discouraged. """ -function yieldto(t::Task, @nospecialize x = nothing) +function yieldto(t::Task, @nospecialize(x=nothing)) t.result = x return try_yieldto(identity, Ref(t)) end @@ -488,15 +491,12 @@ function ensure_rescheduled(othertask::Task) # we failed to yield to othertask # return it to the head of the queue to be scheduled later pushfirst!(Workqueue, othertask) - othertask.state = :queued end - if ct.state == :queued + if ct.queue === Workqueue # if the current task was queued, # also need to return it to the runnable state # before throwing an error - i = findfirst(t->t===ct, Workqueue) - i === nothing || deleteat!(Workqueue, i) - ct.state = :runnable + list_deletefirst!(Workqueue, ct) end nothing end @@ -504,16 +504,15 @@ end function trypoptask() isempty(Workqueue) && return t = popfirst!(Workqueue) - if t.state != :queued + if t.state != :runnable # assume this somehow got queued twice, # probably broken now, but try discarding this switch and keep going # can't throw here, because it's probably not the fault of the caller to wait # and don't want to use print() here, because that may try to incur a task switch ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...), - "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :queued\n") + "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :runnable\n") return end - t.state = :runnable return t end diff --git a/doc/src/manual/control-flow.md b/doc/src/manual/control-flow.md index a8611b4541d2b..13dc4accfa345 100644 --- a/doc/src/manual/control-flow.md +++ b/doc/src/manual/control-flow.md @@ -981,8 +981,6 @@ symbols: | Symbol | Meaning | |:----------- |:-------------------------------------------------- | -| `:runnable` | Currently running, or available to be switched to | -| `:waiting` | Blocked waiting for a specific event | -| `:queued` | In the scheduler's run queue about to be restarted | +| `:runnable` | Currently running, or able to run | | `:done` | Successfully finished executing | | `:failed` | Finished with an uncaught exception | diff --git a/src/init.c b/src/init.c index 74c7423e5b92a..eebc96a4540c0 100644 --- a/src/init.c +++ b/src/init.c @@ -850,26 +850,16 @@ static jl_value_t *core(const char *name) // fetch references to things defined in boot.jl void jl_get_builtin_hooks(void) { - int t; - for (t = 0; t < jl_n_threads; t++) { - jl_ptls_t ptls2 = jl_all_tls_states[t]; - ptls2->root_task->tls = jl_nothing; - ptls2->root_task->donenotify = jl_nothing; - ptls2->root_task->exception = jl_nothing; - ptls2->root_task->result = jl_nothing; - } - jl_char_type = (jl_datatype_t*)core("Char"); jl_int8_type = (jl_datatype_t*)core("Int8"); jl_int16_type = (jl_datatype_t*)core("Int16"); jl_uint16_type = (jl_datatype_t*)core("UInt16"); - jl_float16_type = (jl_datatype_t*)core("Float16"); jl_float32_type = (jl_datatype_t*)core("Float32"); jl_float64_type = (jl_datatype_t*)core("Float64"); jl_floatingpoint_type = (jl_datatype_t*)core("AbstractFloat"); - jl_number_type = (jl_datatype_t*)core("Number"); - jl_signed_type = (jl_datatype_t*)core("Signed"); + jl_number_type = (jl_datatype_t*)core("Number"); + jl_signed_type = (jl_datatype_t*)core("Signed"); jl_datatype_t *jl_unsigned_type = (jl_datatype_t*)core("Unsigned"); jl_datatype_t *jl_integer_type = (jl_datatype_t*)core("Integer"); jl_bool_type->super = jl_integer_type; @@ -888,19 +878,17 @@ void jl_get_builtin_hooks(void) jl_boundserror_type = (jl_datatype_t*)core("BoundsError"); jl_memory_exception = jl_new_struct_uninit((jl_datatype_t*)core("OutOfMemoryError")); jl_readonlymemory_exception = jl_new_struct_uninit((jl_datatype_t*)core("ReadOnlyMemoryError")); - jl_typeerror_type = (jl_datatype_t*)core("TypeError"); - + jl_typeerror_type = (jl_datatype_t*)core("TypeError"); #ifdef SEGV_EXCEPTION jl_segv_exception = jl_new_struct_uninit((jl_datatype_t*)core("SegmentationFault")); #endif + jl_argumenterror_type = (jl_datatype_t*)core("ArgumentError"); + jl_methoderror_type = (jl_datatype_t*)core("MethodError"); + jl_loaderror_type = (jl_datatype_t*)core("LoadError"); + jl_initerror_type = (jl_datatype_t*)core("InitError"); jl_weakref_type = (jl_datatype_t*)core("WeakRef"); jl_vecelement_typename = ((jl_datatype_t*)jl_unwrap_unionall(core("VecElement")))->name; - - jl_argumenterror_type = (jl_datatype_t*)core("ArgumentError"); - jl_methoderror_type = (jl_datatype_t*)core("MethodError"); - jl_loaderror_type = (jl_datatype_t*)core("LoadError"); - jl_initerror_type = (jl_datatype_t*)core("InitError"); } void jl_get_builtins(void) diff --git a/src/julia.h b/src/julia.h index 79f343cb75465..b3d26a2b0b4df 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1614,6 +1614,8 @@ typedef struct _jl_handler_t { typedef struct _jl_task_t { JL_DATA_TYPE + jl_value_t *next; // invasive linked list for scheduler + jl_value_t *queue; // invasive linked list for scheduler jl_value_t *tls; jl_sym_t *state; jl_value_t *donenotify; diff --git a/src/task.c b/src/task.c index a22be317f55ef..3cd0e22c3a2cd 100644 --- a/src/task.c +++ b/src/task.c @@ -475,6 +475,8 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, size_t ssize) if (t->stkbuf == NULL) jl_throw(jl_memory_exception); } + t->next = jl_nothing; + t->queue = jl_nothing; t->tls = jl_nothing; t->state = runnable_sym; t->start = start; @@ -524,7 +526,9 @@ void jl_init_tasks(void) JL_GC_DISABLED NULL, jl_any_type, jl_emptysvec, - jl_perm_symsvec(8, + jl_perm_symsvec(10, + "next", + "queue", "storage", "state", "donenotify", @@ -533,7 +537,9 @@ void jl_init_tasks(void) JL_GC_DISABLED "backtrace", "logstate", "code"), - jl_svec(8, + jl_svec(10, + jl_any_type, + jl_any_type, jl_any_type, jl_sym_type, jl_any_type, @@ -542,7 +548,9 @@ void jl_init_tasks(void) JL_GC_DISABLED jl_any_type, jl_any_type, jl_any_type), - 0, 1, 7); + 0, 1, 9); + jl_value_t *listt = jl_new_struct(jl_uniontype_type, jl_task_type, jl_void_type); + jl_svecset(jl_task_type->types, 0, listt); done_sym = jl_symbol("done"); failed_sym = jl_symbol("failed"); runnable_sym = jl_symbol("runnable"); @@ -923,6 +931,8 @@ void jl_init_root_task(void *stack_lo, void *stack_hi) ptls->current_task->stkbuf = stack; ptls->current_task->bufsz = ssize; ptls->current_task->started = 1; + ptls->current_task->next = jl_nothing; + ptls->current_task->queue = jl_nothing; ptls->current_task->tls = jl_nothing; ptls->current_task->state = runnable_sym; ptls->current_task->start = NULL; diff --git a/stdlib/Distributed/test/distributed_exec.jl b/stdlib/Distributed/test/distributed_exec.jl index 592588c8c0788..38dabed09dba0 100644 --- a/stdlib/Distributed/test/distributed_exec.jl +++ b/stdlib/Distributed/test/distributed_exec.jl @@ -752,16 +752,20 @@ end # issue #13168 function f13168(n) val = 0 - for i=1:n val+=sum(rand(n,n)^2) end - val + for i = 1:n + val += sum(rand(n, n)^2) + end + return val end let t = schedule(@task f13168(100)) - @test t.state == :queued + @test t.state == :runnable + @test t.queue !== nothing @test_throws ErrorException schedule(t) yield() @test t.state == :done + @test t.queue === nothing @test_throws ErrorException schedule(t) - @test isa(fetch(t),Float64) + @test isa(fetch(t), Float64) end # issue #13122 diff --git a/stdlib/REPL/test/repl.jl b/stdlib/REPL/test/repl.jl index 9aa34b68b54de..0e6446859c2b6 100644 --- a/stdlib/REPL/test/repl.jl +++ b/stdlib/REPL/test/repl.jl @@ -28,6 +28,7 @@ function kill_timer(delay) # **DON'T COPY ME.** # The correct way to handle timeouts is to close the handle: # e.g. `close(stdout_read); close(stdin_write)` + test_task.queue === nothing || Base.list_deletefirst!(test_task.queue, test_task) schedule(test_task, "hard kill repl test"; error=true) print(stderr, "WARNING: attempting hard kill of repl test after exceeding timeout\n") end diff --git a/stdlib/Serialization/src/Serialization.jl b/stdlib/Serialization/src/Serialization.jl index 2367400ebee6b..7bbe4bdf7650e 100644 --- a/stdlib/Serialization/src/Serialization.jl +++ b/stdlib/Serialization/src/Serialization.jl @@ -429,11 +429,7 @@ function serialize(s::AbstractSerializer, t::Task) if istaskstarted(t) && !istaskdone(t) error("cannot serialize a running Task") end - state = [t.code, - t.storage, - t.state == :queued || t.state == :runnable ? (:runnable) : t.state, - t.result, - t.exception] + state = [t.code, t.storage, t.state, t.result, t.exception] writetag(s.io, TASK_TAG) for fld in state serialize(s, fld) diff --git a/test/channels.jl b/test/channels.jl index 9e49badb8ae66..0011daaa6b20c 100644 --- a/test/channels.jl +++ b/test/channels.jl @@ -274,7 +274,7 @@ end redirect_stderr(oldstderr) close(newstderr[2]) end - @test fetch(errstream) == "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :queued\n" + @test fetch(errstream) == "\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :runnable\n" end @testset "schedule_and_wait" begin diff --git a/test/runtests.jl b/test/runtests.jl index e6f00c61aa955..aa961ad7cb2f7 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -195,7 +195,15 @@ cd(@__DIR__) do isa(e, InterruptException) || rethrow() # If the test suite was merely interrupted, still print the # summary, which can be useful to diagnose what's going on - foreach(task->try; schedule(task, InterruptException(); error=true); catch; end, all_tasks) + foreach(task -> begin + istaskstarted(task) || return + istaskdone(task) && return + try + schedule(task, InterruptException(); error=true) + catch ex + @error "InterruptException" exception=ex,catch_backtrace() + end + end, all_tasks) foreach(wait, all_tasks) finally if @isdefined stdin_monitor diff --git a/test/threads.jl b/test/threads.jl index b2063f7b244fa..884f8e37d8525 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -518,3 +518,85 @@ let e = Event(), started = Event() wait(@async (wait(e); blocked = false)) @test !blocked end + + +@testset "InvasiveLinkedList" begin + @test eltype(Base.InvasiveLinkedList{Integer}) == Integer + @test eltype(Base.LinkedList{Integer}) == Integer + @test eltype(Base.InvasiveLinkedList{<:Integer}) == Any + @test eltype(Base.LinkedList{<:Integer}) == Any + @test eltype(Base.InvasiveLinkedList{<:Base.LinkedListItem{Integer}}) == Any + + t = Base.LinkedList{Integer}() + @test eltype(t) == Integer + @test isempty(t) + @test length(t) == 0 + @test isempty(collect(t)::Vector{Integer}) + @test pushfirst!(t, 2) === t + @test !isempty(t) + @test length(t) == 1 + @test pushfirst!(t, 1) === t + @test !isempty(t) + @test length(t) == 2 + @test collect(t) == [1, 2] + @test pop!(t) == 2 + @test !isempty(t) + @test length(t) == 1 + @test collect(t) == [1] + @test pop!(t) == 1 + @test isempty(t) + @test length(t) == 0 + @test collect(t) == [] + + @test push!(t, 1) === t + @test !isempty(t) + @test length(t) == 1 + @test push!(t, 2) === t + @test !isempty(t) + @test length(t) == 2 + @test collect(t) == [1, 2] + @test popfirst!(t) == 1 + @test popfirst!(t) == 2 + @test isempty(collect(t)::Vector{Integer}) + + @test push!(t, 5) === t + @test push!(t, 6) === t + @test push!(t, 7) === t + @test length(t) === 3 + @test Base.list_deletefirst!(t, 1) === t + @test length(t) === 3 + @test Base.list_deletefirst!(t, 6) === t + @test length(t) === 2 + @test collect(t) == [5, 7] + @test Base.list_deletefirst!(t, 6) === t + @test length(t) === 2 + @test Base.list_deletefirst!(t, 7) === t + @test length(t) === 1 + @test collect(t) == [5] + @test Base.list_deletefirst!(t, 5) === t + @test length(t) === 0 + @test collect(t) == [] + @test isempty(t) + + t2 = Base.LinkedList{Integer}() + @test push!(t, 5) === t + @test push!(t, 6) === t + @test push!(t, 7) === t + @test push!(t2, 2) === t2 + @test push!(t2, 3) === t2 + @test push!(t2, 4) === t2 + @test Base.list_append!!(t, t2) === t + @test isempty(t2) + @test isempty(collect(t2)::Vector{Integer}) + @test collect(t) == [5, 6, 7, 2, 3, 4] + @test Base.list_append!!(t, t2) === t + @test collect(t) == [5, 6, 7, 2, 3, 4] + @test Base.list_append!!(t2, t) === t2 + @test isempty(t) + @test collect(t2) == [5, 6, 7, 2, 3, 4] + @test push!(t, 1) === t + @test collect(t) == [1] + @test Base.list_append!!(t2, t) === t2 + @test isempty(t) + @test collect(t2) == [5, 6, 7, 2, 3, 4, 1] +end From bd610a3289a130f6fb8c5c524b562d9f15d42205 Mon Sep 17 00:00:00 2001 From: Jameson Nash Date: Mon, 18 Mar 2019 15:54:28 -0400 Subject: [PATCH 5/5] make Workqueue threadsafe (#30838) --- base/boot.jl | 4 +- base/task.jl | 167 ++++++++++++++++++++++++++++-------- base/threadingconstructs.jl | 7 +- src/gc-debug.c | 8 +- src/gc.c | 6 +- src/init.c | 11 ++- src/julia.h | 5 +- src/task.c | 50 ++++++----- test/threads.jl | 24 +++++- 9 files changed, 210 insertions(+), 72 deletions(-) diff --git a/base/boot.jl b/base/boot.jl index 63c7f4b5b4821..8961fcd751c46 100644 --- a/base/boot.jl +++ b/base/boot.jl @@ -380,8 +380,8 @@ eval(Core, :(LineInfoNode(mod::Module, method::Symbol, file::Symbol, line::Int, Module(name::Symbol=:anonymous, std_imports::Bool=true) = ccall(:jl_f_new_module, Ref{Module}, (Any, Bool), name, std_imports) -function Task(@nospecialize(f), reserved_stack::Int=0) - return ccall(:jl_new_task, Ref{Task}, (Any, Int), f, reserved_stack) +function _Task(@nospecialize(f), reserved_stack::Int, completion_future) + return ccall(:jl_new_task, Ref{Task}, (Any, Any, Int), f, completion_future, reserved_stack) end # simple convert for use by constructors of types in Core diff --git a/base/task.jl b/base/task.jl index 9a854048fd910..fb73e433db850 100644 --- a/base/task.jl +++ b/base/task.jl @@ -2,6 +2,9 @@ ## basic task functions and TLS +const ThreadSynchronizer = GenericCondition{Threads.SpinLock} +Core.Task(@nospecialize(f), reserved_stack::Int=0) = Core._Task(f, reserved_stack, ThreadSynchronizer()) + # Container for a captured exception and its backtrace. Can be serialized. struct CapturedException <: Exception ex::Any @@ -135,6 +138,8 @@ istaskstarted(t::Task) = ccall(:jl_is_task_started, Cint, (Any,), t) != 0 istaskfailed(t::Task) = (t.state == :failed) +Threads.threadid(t::Task) = Int(ccall(:jl_get_task_tid, Int16, (Any,), t)+1) + task_result(t::Task) = t.result task_local_storage() = get_task_tls(current_task()) @@ -181,13 +186,15 @@ end # NOTE: you can only wait for scheduled tasks function wait(t::Task) if !istaskdone(t) - if t.donenotify === nothing - t.donenotify = Condition() + lock(t.donenotify) + try + while !istaskdone(t) + wait(t.donenotify) + end + finally + unlock(t.donenotify) end end - while !istaskdone(t) - wait(t.donenotify) - end if istaskfailed(t) throw(t.exception) end @@ -273,7 +280,7 @@ end function register_taskdone_hook(t::Task, hook) tls = get_task_tls(t) push!(get!(tls, :TASKDONE_HOOKS, []), hook) - t + return t end # runtime system hook called when a task finishes @@ -286,9 +293,17 @@ function task_done_hook(t::Task) t.backtrace = catch_backtrace() end - if isa(t.donenotify, Condition) && !isempty(t.donenotify.waitq) - handled = true - notify(t.donenotify, result, true, err) + donenotify = t.donenotify + if isa(donenotify, ThreadSynchronizer) + lock(donenotify) + try + if !isempty(donenotify.waitq) + handled = true + notify(donenotify, result, true, err) + end + finally + unlock(donenotify) + end end # Execute any other hooks registered in the TLS @@ -298,8 +313,8 @@ function task_done_hook(t::Task) handled = true end - if err && !handled - if isa(result,InterruptException) && isdefined(Base,:active_repl_backend) && + if err && !handled && Threads.threadid() == 1 + if isa(result, InterruptException) && isdefined(Base, :active_repl_backend) && active_repl_backend.backend_task.state == :runnable && isempty(Workqueue) && active_repl_backend.in_eval throwto(active_repl_backend.backend_task, result) # this terminates the task @@ -313,7 +328,8 @@ function task_done_hook(t::Task) # If an InterruptException happens while blocked in the event loop, try handing # the exception to the REPL task since the current task is done. # issue #19467 - if isa(e,InterruptException) && isdefined(Base,:active_repl_backend) && + if Threads.threadid() == 1 && + isa(e, InterruptException) && isdefined(Base, :active_repl_backend) && active_repl_backend.backend_task.state == :runnable && isempty(Workqueue) && active_repl_backend.in_eval throwto(active_repl_backend.backend_task, e) @@ -360,12 +376,78 @@ end ## scheduler and work queue -global const Workqueue = InvasiveLinkedList{Task}() +struct InvasiveLinkedListSynchronized{T} + queue::InvasiveLinkedList{T} + lock::Threads.SpinLock + InvasiveLinkedListSynchronized{T}() where {T} = new(InvasiveLinkedList{T}(), Threads.SpinLock()) +end +isempty(W::InvasiveLinkedListSynchronized) = isempty(W.queue) +length(W::InvasiveLinkedListSynchronized) = length(W.queue) +function push!(W::InvasiveLinkedListSynchronized{T}, t::T) where T + lock(W.lock) + try + push!(W.queue, t) + finally + unlock(W.lock) + end + return W +end +function pushfirst!(W::InvasiveLinkedListSynchronized{T}, t::T) where T + lock(W.lock) + try + pushfirst!(W.queue, t) + finally + unlock(W.lock) + end + return W +end +function pop!(W::InvasiveLinkedListSynchronized) + lock(W.lock) + try + return pop!(W.queue) + finally + unlock(W.lock) + end +end +function popfirst!(W::InvasiveLinkedListSynchronized) + lock(W.lock) + try + return popfirst!(W.queue) + finally + unlock(W.lock) + end +end +function list_deletefirst!(W::InvasiveLinkedListSynchronized{T}, t::T) where T + lock(W.lock) + try + list_deletefirst!(W.queue, t) + finally + unlock(W.lock) + end + return W +end + +const StickyWorkqueue = InvasiveLinkedListSynchronized{Task} +global const Workqueues = [StickyWorkqueue()] +global const Workqueue = Workqueues[1] # default work queue is thread 1 +function __preinit_threads__() + if length(Workqueues) < Threads.nthreads() + resize!(Workqueues, Threads.nthreads()) + for i = 2:length(Workqueues) + Workqueues[i] = StickyWorkqueue() + end + end + nothing +end function enq_work(t::Task) (t.state == :runnable && t.queue === nothing) || error("schedule: Task not runnable") - ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop()) - push!(Workqueue, t) + tid = (t.sticky ? Threads.threadid(t) : 0) + if tid == 0 + tid = Threads.threadid() + end + push!(Workqueues[tid], t) + tid == 1 && ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop()) return t end @@ -418,11 +500,12 @@ end # fast version of `schedule(t, arg); wait()` function schedule_and_wait(t::Task, @nospecialize(arg)=nothing) (t.state == :runnable && t.queue === nothing) || error("schedule: Task not runnable") - if isempty(Workqueue) + W = Workqueues[Threads.threadid()] + if isempty(W) return yieldto(t, arg) else t.result = arg - push!(Workqueue, t) + push!(W, t) end return wait() end @@ -487,23 +570,24 @@ end function ensure_rescheduled(othertask::Task) ct = current_task() + W = Workqueues[Threads.threadid()] if ct !== othertask && othertask.state == :runnable # we failed to yield to othertask - # return it to the head of the queue to be scheduled later - pushfirst!(Workqueue, othertask) - end - if ct.queue === Workqueue - # if the current task was queued, - # also need to return it to the runnable state - # before throwing an error - list_deletefirst!(Workqueue, ct) + # return it to the head of a queue to be retried later + tid = Threads.threadid(othertask) + Wother = tid == 0 ? W : Workqueues[tid] + pushfirst!(Wother, othertask) end + # if the current task was queued, + # also need to return it to the runnable state + # before throwing an error + list_deletefirst!(W, ct) nothing end -function trypoptask() - isempty(Workqueue) && return - t = popfirst!(Workqueue) +function trypoptask(W::StickyWorkqueue) + isempty(W) && return + t = popfirst!(W) if t.state != :runnable # assume this somehow got queued twice, # probably broken now, but try discarding this switch and keep going @@ -516,17 +600,25 @@ function trypoptask() return t end -@noinline function poptaskref() +@noinline function poptaskref(W::StickyWorkqueue) local task while true - task = trypoptask() + task = trypoptask(W) task === nothing || break - if process_events(true) == 0 - task = trypoptask() - task === nothing || break - # if there are no active handles and no runnable tasks, just - # wait for signals. - pause() + if !Threads.in_threaded_loop[] && Threads.threadid() == 1 + if process_events(true) == 0 + task = trypoptask(W) + task === nothing || break + # if there are no active handles and no runnable tasks, just + # wait for signals. + pause() + end + else + if Threads.threadid() == 1 + process_events(false) + end + ccall(:jl_gc_safepoint, Cvoid, ()) + ccall(:jl_cpu_pause, Cvoid, ()) end end return Ref(task) @@ -534,7 +626,8 @@ end function wait() - reftask = poptaskref() + W = Workqueues[Threads.threadid()] + reftask = poptaskref(W) result = try_yieldto(ensure_rescheduled, reftask) process_events(false) # return when we come out of the queue diff --git a/base/threadingconstructs.jl b/base/threadingconstructs.jl index 61a1f598546a6..834fc672a2e1c 100644 --- a/base/threadingconstructs.jl +++ b/base/threadingconstructs.jl @@ -68,16 +68,17 @@ function _threadsfor(iter,lbody) # Hack to make nested threaded loops kinda work if threadid() != 1 || in_threaded_loop[] # We are in a nested threaded loop - threadsfor_fun(true) + Base.invokelatest(threadsfor_fun, true) else in_threaded_loop[] = true # the ccall is not expected to throw - ccall(:jl_threading_run, Ref{Cvoid}, (Any,), threadsfor_fun) + ccall(:jl_threading_run, Cvoid, (Any,), threadsfor_fun) in_threaded_loop[] = false end nothing end end + """ Threads.@threads @@ -96,7 +97,7 @@ macro threads(args...) throw(ArgumentError("need an expression argument to @threads")) end if ex.head === :for - return _threadsfor(ex.args[1],ex.args[2]) + return _threadsfor(ex.args[1], ex.args[2]) else throw(ArgumentError("unrecognized argument to @threads")) end diff --git a/src/gc-debug.c b/src/gc-debug.c index 40dc55a4f0550..30e63ca37754b 100644 --- a/src/gc-debug.c +++ b/src/gc-debug.c @@ -578,11 +578,13 @@ static void gc_scrub_task(jl_task_t *ta) { int16_t tid = ta->tid; jl_ptls_t ptls = jl_get_ptls_states(); - jl_ptls_t ptls2 = jl_all_tls_states[tid]; + jl_ptls_t ptls2 = NULL; + if (tid != -1) + ptls2 = jl_all_tls_states[tid]; char *low; char *high; - if (ta->copy_stack && ta == ptls2->current_task) { + if (ta->copy_stack && ptls2 && ta == ptls2->current_task) { low = (char*)ptls2->stackbase - ptls2->stacksize; high = (char*)ptls2->stackbase; } @@ -593,7 +595,7 @@ static void gc_scrub_task(jl_task_t *ta) else return; - if (ptls == ptls2 && ta == ptls2->current_task) { + if (ptls == ptls2 && ptls2 && ta == ptls2->current_task) { // scan up to current `sp` for current thread and task low = (char*)jl_get_frame_addr(); } diff --git a/src/gc.c b/src/gc.c index e5fb03cf95cc0..519d087f3c0b4 100644 --- a/src/gc.c +++ b/src/gc.c @@ -2331,7 +2331,9 @@ mark: { gc_scrub_record_task(ta); void *stkbuf = ta->stkbuf; int16_t tid = ta->tid; - jl_ptls_t ptls2 = jl_all_tls_states[tid]; + jl_ptls_t ptls2 = NULL; + if (tid != -1) + ptls2 = jl_all_tls_states[tid]; if (gc_cblist_task_scanner) { export_gc_state(ptls, &sp); gc_invoke_callbacks(jl_gc_cb_task_scanner_t, @@ -2347,7 +2349,7 @@ mark: { uintptr_t offset = 0; uintptr_t lb = 0; uintptr_t ub = (uintptr_t)-1; - if (ta == ptls2->current_task) { + if (ptls2 && ta == ptls2->current_task) { s = ptls2->pgcstack; } else if (stkbuf) { diff --git a/src/init.c b/src/init.c index eebc96a4540c0..d6475fa51042e 100644 --- a/src/init.c +++ b/src/init.c @@ -761,8 +761,6 @@ void _julia_init(JL_IMAGE_SEARCH rel) jl_init_codegen(); - jl_start_threads(); - jl_an_empty_vec_any = (jl_value_t*)jl_alloc_vec_any(0); jl_init_serializer(); jl_init_intrinsic_properties(); @@ -818,7 +816,16 @@ void _julia_init(JL_IMAGE_SEARCH rel) // it does "using Base" if Base is available. if (jl_base_module != NULL) { jl_add_standard_imports(jl_main_module); + // Do initialization needed before starting child threads + jl_value_t *f = jl_get_global(jl_base_module, jl_symbol("__preinit_threads__")); + if (f) { + size_t last_age = ptls->world_age; + ptls->world_age = jl_get_world_counter(); + jl_apply(&f, 1); + ptls->world_age = last_age; + } } + jl_start_threads(); // This needs to be after jl_start_threads if (jl_options.handle_signals == JL_OPTIONS_HANDLE_SIGNALS_ON) diff --git a/src/julia.h b/src/julia.h index b3d26a2b0b4df..4baa8e3e517bc 100644 --- a/src/julia.h +++ b/src/julia.h @@ -1595,8 +1595,8 @@ JL_DLLEXPORT void jl_sigatomic_end(void); // tasks and exceptions ------------------------------------------------------- - typedef struct _jl_timing_block_t jl_timing_block_t; + // info describing an exception handler typedef struct _jl_handler_t { jl_jmp_buf eh_ctx; @@ -1624,6 +1624,7 @@ typedef struct _jl_task_t { jl_value_t *backtrace; jl_value_t *logstate; jl_function_t *start; + uint8_t sticky; // record whether this Task can be migrated to a new thread // hidden state: jl_ucontext_t ctx; // saved thread state @@ -1651,7 +1652,7 @@ typedef struct _jl_task_t { jl_timing_block_t *timing_stack; } jl_task_t; -JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, size_t ssize); +JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t*, jl_value_t*, size_t); JL_DLLEXPORT void jl_switchto(jl_task_t **pt); JL_DLLEXPORT void JL_NORETURN jl_throw(jl_value_t *e JL_MAYBE_UNROOTED); JL_DLLEXPORT void JL_NORETURN jl_rethrow(void); diff --git a/src/task.c b/src/task.c index 3cd0e22c3a2cd..c95452dbe4bae 100644 --- a/src/task.c +++ b/src/task.c @@ -49,7 +49,7 @@ volatile int jl_in_stackwalk = 0; #endif #endif -// empirically, finish_task needs about 64k stack space to infer/run +// empirically, jl_finish_task needs about 64k stack space to infer/run // and additionally, gc-stack reserves 64k for the guard pages #if defined(MINSIGSTKSZ) && MINSIGSTKSZ > 131072 #define MINSTKSZ MINSIGSTKSZ @@ -101,6 +101,7 @@ static void NOINLINE save_stack(jl_ptls_t ptls, jl_task_t *lastt, jl_task_t **pt } *pt = lastt; // clear the gc-root for the target task before copying the stack for saving lastt->copy_stack = nb; + lastt->sticky = 1; memcpy_a16((uint64_t*)buf, (uint64_t*)frame_addr, nb); // this task's stack could have been modified after // it was marked by an incremental collection @@ -139,7 +140,7 @@ static void restore_stack2(jl_ptls_t ptls, jl_task_t *lastt) static jl_function_t *task_done_hook_func = NULL; -static void JL_NORETURN finish_task(jl_task_t *t, jl_value_t *resultval JL_MAYBE_UNROOTED) +void JL_NORETURN jl_finish_task(jl_task_t *t, jl_value_t *resultval JL_MAYBE_UNROOTED) { jl_ptls_t ptls = jl_get_ptls_states(); JL_SIGATOMIC_BEGIN(); @@ -155,15 +156,7 @@ static void JL_NORETURN finish_task(jl_task_t *t, jl_value_t *resultval JL_MAYBE ptls->in_finalizer = 0; ptls->in_pure_callback = 0; jl_get_ptls_states()->world_age = jl_world_counter; - if (ptls->tid != 0) { - // For now, only thread 0 runs the task scheduler. - // The others return to the thread loop - ptls->root_task->result = jl_nothing; - jl_task_t *task = ptls->root_task; - jl_switchto(&task); - gc_debug_critical_error(); - abort(); - } + // let the runtime know this task is dead and find a new task to run if (task_done_hook_func == NULL) { task_done_hook_func = (jl_function_t*)jl_get_global(jl_base_module, jl_symbol("task_done_hook")); @@ -254,6 +247,7 @@ static void ctx_switch(jl_ptls_t ptls, jl_task_t **pt) #ifdef COPY_STACKS // fall back to stack copying if mmap fails t->copy_stack = 1; + t->sticky = 1; t->bufsz = 0; memcpy(&t->ctx, &ptls->base_ctx, sizeof(t->ctx)); #else @@ -276,6 +270,7 @@ static void ctx_switch(jl_ptls_t ptls, jl_task_t **pt) if (lastt->copy_stack) { // save the old copy-stack save_stack(ptls, lastt, pt); // allocates (gc-safepoint, and can also fail) if (jl_setjmp(lastt->ctx.uc_mcontext, 0)) { + // TODO: mutex unlock the thread we just switched from #ifdef ENABLE_TIMINGS assert(blk == ptls->current_task->timing_stack); if (blk) @@ -296,6 +291,10 @@ static void ctx_switch(jl_ptls_t ptls, jl_task_t **pt) ptls->world_age = t->world_age; t->gcstack = NULL; ptls->current_task = t; + if (!lastt->sticky) + // release lastt to run on any tid + lastt->tid = -1; + t->tid = ptls->tid; jl_ucontext_t *lastt_ctx = (killed ? NULL : &lastt->ctx); #ifdef COPY_STACKS @@ -326,6 +325,7 @@ static void ctx_switch(jl_ptls_t ptls, jl_task_t **pt) else { jl_start_fiber(lastt_ctx, &t->ctx); } + // TODO: mutex unlock the thread we just switched from #ifdef ENABLE_TIMINGS assert(blk == ptls->current_task->timing_stack); if (blk) @@ -452,7 +452,7 @@ JL_DLLEXPORT void jl_rethrow_other(jl_value_t *e JL_MAYBE_UNROOTED) throw_internal(NULL); } -JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, size_t ssize) +JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, jl_value_t *completion_future, size_t ssize) { jl_ptls_t ptls = jl_get_ptls_states(); jl_task_t *t = (jl_task_t*)jl_gc_alloc(ptls, sizeof(jl_task_t), jl_task_type); @@ -481,18 +481,19 @@ JL_DLLEXPORT jl_task_t *jl_new_task(jl_function_t *start, size_t ssize) t->state = runnable_sym; t->start = start; t->result = jl_nothing; - t->donenotify = jl_nothing; + t->donenotify = completion_future; t->exception = jl_nothing; t->backtrace = jl_nothing; // Inherit logger state from parent task t->logstate = ptls->current_task->logstate; // there is no active exception handler available on this stack yet t->eh = NULL; - t->tid = 0; + // TODO: allow non-sticky tasks + t->tid = ptls->tid; + t->sticky = 1; t->gcstack = NULL; t->excstack = NULL; t->stkbuf = NULL; - t->tid = 0; t->started = 0; #ifdef ENABLE_TIMINGS t->timing_stack = NULL; @@ -526,7 +527,7 @@ void jl_init_tasks(void) JL_GC_DISABLED NULL, jl_any_type, jl_emptysvec, - jl_perm_symsvec(10, + jl_perm_symsvec(11, "next", "queue", "storage", @@ -536,8 +537,9 @@ void jl_init_tasks(void) JL_GC_DISABLED "exception", "backtrace", "logstate", - "code"), - jl_svec(10, + "code", + "sticky"), + jl_svec(11, jl_any_type, jl_any_type, jl_any_type, @@ -547,7 +549,8 @@ void jl_init_tasks(void) JL_GC_DISABLED jl_any_type, jl_any_type, jl_any_type, - jl_any_type), + jl_any_type, + jl_bool_type), 0, 1, 9); jl_value_t *listt = jl_new_struct(jl_uniontype_type, jl_task_type, jl_void_type); jl_svecset(jl_task_type->types, 0, listt); @@ -587,7 +590,7 @@ static void NOINLINE JL_NORETURN start_task(void) } skip_pop_exception:; } - finish_task(t, res); + jl_finish_task(t, res); gc_debug_critical_error(); abort(); } @@ -945,6 +948,7 @@ void jl_init_root_task(void *stack_lo, void *stack_hi) ptls->current_task->gcstack = NULL; ptls->current_task->excstack = NULL; ptls->current_task->tid = ptls->tid; + ptls->current_task->sticky = 1; #ifdef JULIA_ENABLE_THREADING arraylist_new(&ptls->current_task->locks, 0); #endif @@ -959,6 +963,12 @@ JL_DLLEXPORT int jl_is_task_started(jl_task_t *t) return t->started; } +JL_DLLEXPORT int16_t jl_get_task_tid(jl_task_t *t) +{ + return t->tid; +} + + #ifdef _OS_WINDOWS_ #if defined(_CPU_X86_) extern DWORD32 __readgsdword(int); diff --git a/test/threads.jl b/test/threads.jl index 884f8e37d8525..3fdcfd791871e 100644 --- a/test/threads.jl +++ b/test/threads.jl @@ -6,6 +6,24 @@ using Base.Threads: SpinLock, Mutex # threading constructs +let a = zeros(Int, 2 * nthreads()) + @threads for i = 1:length(a) + @sync begin + @async begin + @async (Libc.systemsleep(1); a[i] += 1) + yield() + a[i] += 1 + end + @async begin + yield() + @async (Libc.systemsleep(1); a[i] += 1) + a[i] += 1 + end + end + end + @test all(isequal(4), a) +end + # parallel loop with parallel atomic addition function threaded_loop(a, r, x) @threads for i in r @@ -434,7 +452,11 @@ function test_thread_cfunction() end @test sum(ok) == 10000 end -test_thread_cfunction() +if nthreads() == 1 + test_thread_cfunction() +else + @test_broken "cfunction trampoline code not thread-safe" +end # Compare the two ways of checking if threading is enabled. # `jl_tls_states` should only be defined on non-threading build.