Skip to content

Commit

Permalink
scheduler: use linked list for Workqueue
Browse files Browse the repository at this point in the history
  • Loading branch information
vtjnash authored and JeffBezanson committed Mar 19, 2019
1 parent 72d502a commit 9902531
Show file tree
Hide file tree
Showing 14 changed files with 309 additions and 76 deletions.
1 change: 1 addition & 0 deletions base/Base.jl
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ include("env.jl")

# Scheduling
include("libuv.jl")
include("linked_list.jl")
include("event.jl")
include("threads.jl")
include("lock.jl")
Expand Down
23 changes: 8 additions & 15 deletions base/event.jl
Original file line number Diff line number Diff line change
Expand Up @@ -56,12 +56,12 @@ Abstract implementation of a condition object
for synchonizing tasks objects with a given lock.
"""
struct GenericCondition{L<:AbstractLock}
waitq::Vector{Any}
waitq::InvasiveLinkedList{Task}
lock::L

GenericCondition{L}() where {L<:AbstractLock} = new{L}([], L())
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}([], l)
GenericCondition(l::AbstractLock) = new{typeof(l)}([], l)
GenericCondition{L}() where {L<:AbstractLock} = new{L}(InvasiveLinkedList{Task}(), L())
GenericCondition{L}(l::L) where {L<:AbstractLock} = new{L}(InvasiveLinkedList{Task}(), l)
GenericCondition(l::AbstractLock) = new{typeof(l)}(InvasiveLinkedList{Task}(), l)
end

assert_havelock(c::GenericCondition) = assert_havelock(c.lock)
Expand Down Expand Up @@ -94,11 +94,10 @@ function wait(c::GenericCondition)
assert_havelock(c)
push!(c.waitq, ct)
token = unlockall(c.lock)

try
return wait()
catch
filter!(x->x!==ct, c.waitq)
list_deletefirst!(c.waitq, ct)
rethrow()
finally
relockall(c.lock, token)
Expand All @@ -118,16 +117,11 @@ notify(c::GenericCondition, @nospecialize(arg = nothing); all=true, error=false)
function notify(c::GenericCondition, @nospecialize(arg), all, error)
assert_havelock(c)
cnt = 0
if all
cnt = length(c.waitq)
for t in c.waitq
schedule(t, arg, error=error)
end
empty!(c.waitq)
elseif !isempty(c.waitq)
cnt = 1
while !isempty(c.waitq)
t = popfirst!(c.waitq)
schedule(t, arg, error=error)
cnt += 1
all || break
end
return cnt
end
Expand Down Expand Up @@ -161,7 +155,6 @@ This object is NOT thread-safe. See [`Threads.Condition`](@ref) for a thread-saf
const Condition = GenericCondition{AlwaysLockedST}



## async event notifications

"""
Expand Down
151 changes: 151 additions & 0 deletions base/linked_list.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
# This file is a part of Julia. License is MIT: https://julialang.org/license

mutable struct InvasiveLinkedList{T}
# Invasive list requires that T have a field `.next >: U{T, Nothing}` and `.queue >: U{ILL{T}, Nothing}`
head::Union{T, Nothing}
tail::Union{T, Nothing}
InvasiveLinkedList{T}() where {T} = new{T}(nothing, nothing)
end

#const list_append!! = append!
#const list_deletefirst! = delete!

eltype(::Type{<:InvasiveLinkedList{T}}) where {T} = @isdefined(T) ? T : Any

iterate(q::InvasiveLinkedList) = (h = q.head; h === nothing ? nothing : (h, h))
iterate(q::InvasiveLinkedList{T}, v::T) where {T} = (h = v.next; h === nothing ? nothing : (h, h))

isempty(q::InvasiveLinkedList) = (q.head === nothing)

function length(q::InvasiveLinkedList)
i = 0
head = q.head
while head !== nothing
i += 1
head = head.next
end
return i
end

function list_append!!(q::InvasiveLinkedList{T}, q2::InvasiveLinkedList{T}) where T
q === q2 && error("can't append list to itself")
head2 = q2.head
if head2 !== nothing
tail2 = q2.tail::T
q2.head = nothing
q2.tail = nothing
tail = q.tail
q.tail = tail2
if tail === nothing
q.head = head2
else
tail.next = head2
end
while head2 !== nothing
head2.queue = q
head2 = head2.next
end
end
return q
end

function push!(q::InvasiveLinkedList{T}, val::T) where T
val.queue === nothing || error("val already in a list")
val.queue = q
tail = q.tail
if tail === nothing
q.head = q.tail = val
else
tail.next = val
q.tail = val
end
return q
end

function pushfirst!(q::InvasiveLinkedList{T}, val::T) where T
val.queue === nothing || error("val already in a list")
val.queue = q
head = q.head
if head === nothing
q.head = q.tail = val
else
val.next = head
q.head = val
end
return q
end

function pop!(q::InvasiveLinkedList{T}) where {T}
val = q.tail::T
list_deletefirst!(q, val) # expensive!
return val
end

function popfirst!(q::InvasiveLinkedList{T}) where {T}
val = q.head::T
list_deletefirst!(q, val) # cheap
return val
end

function list_deletefirst!(q::InvasiveLinkedList{T}, val::T) where T
val.queue === q || return
head = q.head::T
if head === val
if q.tail::T === val
q.head = q.tail = nothing
else
q.head = val.next::T
end
else
head_next = head.next
while head_next !== val
head = head_next
head_next = head.next
end
if q.tail::T === val
head.next = nothing
q.tail = head
else
head.next = val.next::T
end
end
val.next = nothing
val.queue = nothing
return q
end

#function list_deletefirst!(q::Array{T}, val::T) where T
# i = findfirst(isequal(val), q)
# i === nothing || deleteat!(q, i)
# return q
#end


mutable struct LinkedListItem{T}
# Adapter class to use any `T` in a LinkedList
next::Union{LinkedListItem{T}, Nothing}
queue::Union{InvasiveLinkedList{LinkedListItem{T}}, Nothing}
value::T
LinkedListItem{T}(value::T) where {T} = new{T}(nothing, nothing, value)
end
const LinkedList{T} = InvasiveLinkedList{LinkedListItem{T}}

# delegate methods, as needed
eltype(::Type{<:LinkedList{T}}) where {T} = @isdefined(T) ? T : Any
iterate(q::LinkedList) = (h = q.head; h === nothing ? nothing : (h.value, h))
iterate(q::InvasiveLinkedList{LLT}, v::LLT) where {LLT<:LinkedListItem} = (h = v.next; h === nothing ? nothing : (h.value, h))
push!(q::LinkedList{T}, val::T) where {T} = push!(q, LinkedListItem{T}(val))
pushfirst!(q::LinkedList{T}, val::T) where {T} = pushfirst!(q, LinkedListItem{T}(val))
pop!(q::LinkedList) = invoke(pop!, Tuple{InvasiveLinkedList,}, q).value
popfirst!(q::LinkedList) = invoke(popfirst!, Tuple{InvasiveLinkedList,}, q).value
function list_deletefirst!(q::LinkedList{T}, val::T) where T
h = q.head
while h !== nothing
if isequal(h.value, val)
list_deletefirst!(q, h)
break
end
h = h.next
end
return q
end
49 changes: 24 additions & 25 deletions base/task.jl
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ function get_task_tls(t::Task)
if t.storage === nothing
t.storage = IdDict()
end
(t.storage)::IdDict{Any,Any}
return (t.storage)::IdDict{Any,Any}
end

"""
Expand All @@ -168,12 +168,13 @@ for emulating dynamic scoping.
"""
function task_local_storage(body::Function, key, val)
tls = task_local_storage()
hadkey = haskey(tls,key)
old = get(tls,key,nothing)
hadkey = haskey(tls, key)
old = get(tls, key, nothing)
tls[key] = val
try body()
try
return body()
finally
hadkey ? (tls[key] = old) : delete!(tls,key)
hadkey ? (tls[key] = old) : delete!(tls, key)
end
end

Expand All @@ -200,7 +201,7 @@ exception, the exception is propagated (re-thrown in the task that called fetch)
"""
function fetch(t::Task)
wait(t)
task_result(t)
return task_result(t)
end


Expand Down Expand Up @@ -264,6 +265,7 @@ macro async(expr)
push!($var, task)
end
schedule(task)
task
end
end

Expand Down Expand Up @@ -358,13 +360,12 @@ end

## scheduler and work queue

global const Workqueue = Task[]
global const Workqueue = InvasiveLinkedList{Task}()

function enq_work(t::Task)
t.state == :runnable || error("schedule: Task not runnable")
(t.state == :runnable && t.queue === nothing) || error("schedule: Task not runnable")
ccall(:uv_stop, Cvoid, (Ptr{Cvoid},), eventloop())
push!(Workqueue, t)
t.state = :queued
return t
end

Expand Down Expand Up @@ -400,25 +401,28 @@ julia> istaskdone(b)
true
```
"""
function schedule(t::Task, arg; error=false)
function schedule(t::Task, @nospecialize(arg); error=false)
# schedule a task to be (re)started with the given value or exception
t.state == :runnable || error("schedule: Task not runnable")
if error
t.queue === nothing || Base.list_deletefirst!(t.queue, t)
t.exception = arg
else
t.queue === nothing || error("schedule: Task not runnable")
t.result = arg
end
return enq_work(t)
enq_work(t)
return t
end

# fast version of `schedule(t, arg); wait()`
function schedule_and_wait(t::Task, arg=nothing)
t.state == :runnable || error("schedule: Task not runnable")
function schedule_and_wait(t::Task, @nospecialize(arg)=nothing)
(t.state == :runnable && t.queue === nothing) || error("schedule: Task not runnable")
if isempty(Workqueue)
return yieldto(t, arg)
else
t.result = arg
push!(Workqueue, t)
t.state = :queued
end
return wait()
end
Expand All @@ -438,8 +442,7 @@ yield() = (enq_work(current_task()); wait())
A fast, unfair-scheduling version of `schedule(t, arg); yield()` which
immediately yields to `t` before calling the scheduler.
"""
function yield(t::Task, @nospecialize x = nothing)
t.state == :runnable || error("schedule: Task not runnable")
function yield(t::Task, @nospecialize(x=nothing))
t.result = x
enq_work(current_task())
return try_yieldto(ensure_rescheduled, Ref(t))
Expand All @@ -453,7 +456,7 @@ called with no arguments. On subsequent switches, `arg` is returned from the tas
call to `yieldto`. This is a low-level call that only switches tasks, not considering states
or scheduling in any way. Its use is discouraged.
"""
function yieldto(t::Task, @nospecialize x = nothing)
function yieldto(t::Task, @nospecialize(x=nothing))
t.result = x
return try_yieldto(identity, Ref(t))
end
Expand Down Expand Up @@ -488,32 +491,28 @@ function ensure_rescheduled(othertask::Task)
# we failed to yield to othertask
# return it to the head of the queue to be scheduled later
pushfirst!(Workqueue, othertask)
othertask.state = :queued
end
if ct.state == :queued
if ct.queue === Workqueue
# if the current task was queued,
# also need to return it to the runnable state
# before throwing an error
i = findfirst(t->t===ct, Workqueue)
i === nothing || deleteat!(Workqueue, i)
ct.state = :runnable
list_deletefirst!(Workqueue, ct)
end
nothing
end

function trypoptask()
isempty(Workqueue) && return
t = popfirst!(Workqueue)
if t.state != :queued
if t.state != :runnable
# assume this somehow got queued twice,
# probably broken now, but try discarding this switch and keep going
# can't throw here, because it's probably not the fault of the caller to wait
# and don't want to use print() here, because that may try to incur a task switch
ccall(:jl_safe_printf, Cvoid, (Ptr{UInt8}, Int32...),
"\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :queued\n")
"\nWARNING: Workqueue inconsistency detected: popfirst!(Workqueue).state != :runnable\n")
return
end
t.state = :runnable
return t
end

Expand Down
4 changes: 1 addition & 3 deletions doc/src/manual/control-flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -981,8 +981,6 @@ symbols:

| Symbol | Meaning |
|:----------- |:-------------------------------------------------- |
| `:runnable` | Currently running, or available to be switched to |
| `:waiting` | Blocked waiting for a specific event |
| `:queued` | In the scheduler's run queue about to be restarted |
| `:runnable` | Currently running, or able to run |
| `:done` | Successfully finished executing |
| `:failed` | Finished with an uncaught exception |
Loading

0 comments on commit 9902531

Please sign in to comment.