forked from JuliaLang/julia
-
Notifications
You must be signed in to change notification settings - Fork 0
/
task.jl
114 lines (98 loc) · 2.29 KB
/
task.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
show(io::IO, t::Task) = print(io, "Task")
current_task() = ccall(:jl_get_current_task, Any, ())::Task
istaskdone(t::Task) = t.done
function task_local_storage()
t = current_task()
if is(t.storage, nothing)
t.storage = ObjectIdDict()
end
(t.storage)::ObjectIdDict
end
task_local_storage(key) = task_local_storage()[key]
task_local_storage(key, val) = (task_local_storage()[key] = val)
# NOTE: you can only wait for scheduled tasks
function wait(t::Task)
if is(t.donenotify, nothing)
t.donenotify = Condition()
end
while !istaskdone(t)
wait(t.donenotify)
end
t.result
end
function produce(v)
ct = current_task()
q = ct.consumers
if isa(q,Condition)
# make a task waiting for us runnable again
notify1(q)
end
yieldto(ct.last, v)
ct.parent = ct.last # always exit to last consumer
nothing
end
produce(v...) = produce(v)
function consume(P::Task)
while !(P.runnable || P.done)
if P.consumers === nothing
P.consumers = Condition()
end
wait(P.consumers)
end
ct = current_task()
prev = ct.last
ct.runnable = false
v = yieldto(P)
ct.last = prev
ct.runnable = true
if P.done
q = P.consumers
if !is(q, nothing)
notify(q, P.result)
end
end
v
end
start(t::Task) = nothing
function done(t::Task, val)
t.result = consume(t)
istaskdone(t)
end
next(t::Task, val) = (t.result, nothing)
macro task(ex)
:(Task(()->$(esc(ex))))
end
## condition variables
type Condition
waitq::Vector{Any}
Condition() = new({})
end
function wait(c::Condition)
ct = current_task()
if ct === Scheduler
error("cannot execute blocking function from scheduler")
end
push!(c.waitq, ct)
ct.runnable = false
args = yield(c)
if isa(args,InterruptException)
filter!(x->x!==ct, c.waitq)
error(args)
end
args
end
function notify(c::Condition, arg=nothing; all=true)
if all
for t in c.waitq
t.result = arg
enq_work(t)
end
empty!(c.waitq)
elseif !isempty(c.waitq)
t = shift!(c.waitq)
t.result = arg
enq_work(t)
end
nothing
end
notify1(c::Condition, arg=nothing) = notify(c, arg, all=false)