Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

worklist changes for partr (1/3) #30806

Merged
merged 5 commits into from
Mar 19, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion base/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -240,10 +240,11 @@ include("env.jl")

# Scheduling
include("libuv.jl")
include("linked_list.jl")
include("event.jl")
include("task.jl")
include("threads.jl")
include("lock.jl")
include("task.jl")
include("weakkeydict.jl")

# Logging
Expand Down
4 changes: 2 additions & 2 deletions base/boot.jl
Original file line number Diff line number Diff line change
Expand Up @@ -380,8 +380,8 @@ eval(Core, :(LineInfoNode(mod::Module, method::Symbol, file::Symbol, line::Int,

Module(name::Symbol=:anonymous, std_imports::Bool=true) = ccall(:jl_f_new_module, Ref{Module}, (Any, Bool), name, std_imports)

function Task(@nospecialize(f), reserved_stack::Int=0)
return ccall(:jl_new_task, Ref{Task}, (Any, Int), f, reserved_stack)
function _Task(@nospecialize(f), reserved_stack::Int, completion_future)
return ccall(:jl_new_task, Ref{Task}, (Any, Any, Int), f, completion_future, reserved_stack)
end

# simple convert for use by constructors of types in Core
Expand Down
211 changes: 8 additions & 203 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ Abstract implementation of a condition object
for synchonizing tasks objects with a given lock.
"""
struct GenericCondition{L<:AbstractLock}
waitq::Vector{Any}
waitq::InvasiveLinkedList{Task}
lock::L

GenericCondition{L}() where {L<:AbstractLock} = new{L}([], L())
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}([], l)
GenericCondition(l::AbstractLock) = new{typeof(l)}([], l)
GenericCondition{L}() where {L<:AbstractLock} = new{L}(InvasiveLinkedList{Task}(), L())
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}(InvasiveLinkedList{Task}(), l)
GenericCondition(l::AbstractLock) = new{typeof(l)}(InvasiveLinkedList{Task}(), l)
end

assert_havelock(c::GenericCondition) = assert_havelock(c.lock)
Expand Down Expand Up @@ -94,11 +94,10 @@ function wait(c::GenericCondition)
assert_havelock(c)
push!(c.waitq, ct)
token = unlockall(c.lock)

try
return wait()
catch
filter!(x->x!==ct, c.waitq)
list_deletefirst!(c.waitq, ct)
rethrow()
finally
relockall(c.lock, token)
Expand All @@ -118,16 +117,11 @@ notify(c::GenericCondition, @nospecialize(arg = nothing); all=true, error=false)
function notify(c::GenericCondition, @nospecialize(arg), all, error)
assert_havelock(c)
cnt = 0
if all
cnt = length(c.waitq)
for t in c.waitq
schedule(t, arg, error=error)
end
empty!(c.waitq)
elseif !isempty(c.waitq)
cnt = 1
while !isempty(c.waitq)
t = popfirst!(c.waitq)
schedule(t, arg, error=error)
cnt += 1
all || break
end
return cnt
end
Expand Down Expand Up @@ -161,195 +155,6 @@ This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-saf
const Condition = GenericCondition{AlwaysLockedST}


## scheduler and work queue

global const Workqueue = Task[]

function enq_work(t::Task)
t.state == :runnable || error("schedule: Task not runnable")
ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop())
push!(Workqueue, t)
t.state = :queued
return t
end

schedule(t::Task) = enq_work(t)

"""
schedule(t::Task, [val]; error=false)

Add a [`Task`](@ref) to the scheduler's queue. This causes the task to run constantly when the system
is otherwise idle, unless the task performs a blocking operation such as [`wait`](@ref).

If a second argument `val` is provided, it will be passed to the task (via the return value of
[`yieldto`](@ref)) when it runs again. If `error` is `true`, the value is raised as an exception in
the woken task.

# Examples
```jldoctest
julia> a5() = sum(i for i in 1:1000);

julia> b = Task(a5);

julia> istaskstarted(b)
false

julia> schedule(b);

julia> yield();

julia> istaskstarted(b)
true

julia> istaskdone(b)
true
```
"""
function schedule(t::Task, arg; error=false)
# schedule a task to be (re)started with the given value or exception
if error
t.exception = arg
else
t.result = arg
end
return enq_work(t)
end

# fast version of `schedule(t, arg); wait()`
function schedule_and_wait(t::Task, arg=nothing)
t.state == :runnable || error("schedule: Task not runnable")
if isempty(Workqueue)
return yieldto(t, arg)
else
t.result = arg
push!(Workqueue, t)
t.state = :queued
end
return wait()
end

"""
yield()

Switch to the scheduler to allow another scheduled task to run. A task that calls this
function is still runnable, and will be restarted immediately if there are no other runnable
tasks.
"""
yield() = (enq_work(current_task()); wait())

"""
yield(t::Task, arg = nothing)

A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
immediately yields to `t` before calling the scheduler.
"""
function yield(t::Task, @nospecialize x = nothing)
t.state == :runnable || error("schedule: Task not runnable")
t.result = x
enq_work(current_task())
return try_yieldto(ensure_rescheduled, Ref(t))
end

"""
yieldto(t::Task, arg = nothing)

Switch to the given task. The first time a task is switched to, the task's function is
called with no arguments. On subsequent switches, `arg` is returned from the task's last
call to `yieldto`. This is a low-level call that only switches tasks, not considering states
or scheduling in any way. Its use is discouraged.
"""
function yieldto(t::Task, @nospecialize x = nothing)
t.result = x
return try_yieldto(identity, Ref(t))
end

function try_yieldto(undo, reftask::Ref{Task})
try
ccall(:jl_switchto, Cvoid, (Any,), reftask)
catch
undo(reftask[])
rethrow()
end
ct = current_task()
exc = ct.exception
if exc !== nothing
ct.exception = nothing
throw(exc)
end
result = ct.result
ct.result = nothing
return result
end

# yield to a task, throwing an exception in it
function throwto(t::Task, @nospecialize exc)
t.exception = exc
return yieldto(t)
end

function ensure_rescheduled(othertask::Task)
ct = current_task()
if ct !== othertask && othertask.state == :runnable
# we failed to yield to othertask
# return it to the head of the queue to be scheduled later
pushfirst!(Workqueue, othertask)
othertask.state = :queued
end
if ct.state == :queued
# if the current task was queued,
# also need to return it to the runnable state
# before throwing an error
i = findfirst(t->t===ct, Workqueue)
i === nothing || deleteat!(Workqueue, i)
ct.state = :runnable
end
nothing
end

@noinline function poptask()
t = popfirst!(Workqueue)
if t.state != :queued
# assume this somehow got queued twice,
# probably broken now, but try discarding this switch and keep going
# can't throw here, because it's probably not the fault of the caller to wait
# and don't want to use print() here, because that may try to incur a task switch
ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...),
"\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :queued\n")
return
end
t.state = :runnable
return Ref(t)
end

function wait()
while true
if isempty(Workqueue)
c = process_events(true)
if c == 0 && eventloop() != C_NULL && isempty(Workqueue)
# if there are no active handles and no runnable tasks, just
# wait for signals.
pause()
end
else
reftask = poptask()
if reftask !== nothing
result = try_yieldto(ensure_rescheduled, reftask)
process_events(false)
# return when we come out of the queue
return result
end
end
end
# unreachable
end

if Sys.iswindows()
pause() = ccall(:Sleep, stdcall, Cvoid, (UInt32,), 0xffffffff)
else
pause() = ccall(:pause, Cvoid, ())
end


## async event notifications

"""
Expand Down
Loading