-
-
Notifications
You must be signed in to change notification settings - Fork 5.4k
/
channels.jl
124 lines (97 loc) · 2.92 KB
/
channels.jl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
# This file is a part of Julia. License is MIT: http:https://julialang.org/license
abstract AbstractChannel
const DEF_CHANNEL_SZ=32
type Channel{T} <: AbstractChannel
cond_take::Condition # waiting for data to become available
cond_put::Condition # waiting for a writeable slot
state::Symbol
data::Array{T,1}
sz_max::Int # maximum size of channel
function Channel(sz)
sz_max = sz == typemax(Int) ? typemax(Int) - 1 : sz
new(Condition(), Condition(), :open, Array{T}(0), sz_max)
end
end
Channel(sz::Int = DEF_CHANNEL_SZ) = Channel{Any}(sz)
closed_exception() = InvalidStateException("Channel is closed.", :closed)
"""
close(c::Channel)
Closes a channel. An exception is thrown by:
* `put!` on a closed channel.
* `take!` and `fetch` on an empty, closed channel.
"""
function close(c::Channel)
c.state = :closed
notify_error(c::Channel, closed_exception())
nothing
end
isopen(c::Channel) = (c.state == :open)
type InvalidStateException <: Exception
msg::AbstractString
state::Symbol
end
"""
put!(c::Channel, v)
Appends an item `v` to the channel `c`. Blocks if the channel is full.
"""
function put!(c::Channel, v)
!isopen(c) && throw(closed_exception())
while length(c.data) == c.sz_max
wait(c.cond_put)
end
push!(c.data, v)
notify(c.cond_take, nothing, true, false) # notify all, since some of the waiters may be on a "fetch" call.
v
end
push!(c::Channel, v) = put!(c, v)
function fetch(c::Channel)
wait(c)
c.data[1]
end
"""
take!(c::Channel)
Removes and returns a value from a `Channel`. Blocks till data is available.
"""
function take!(c::Channel)
wait(c)
v = shift!(c.data)
notify(c.cond_put, nothing, false, false) # notify only one, since only one slot has become available for a put!.
v
end
shift!(c::Channel) = take!(c)
"""
isready(c::Channel)
Determine whether a `Channel` has a value stored to it.
`isready` on `Channel`s is non-blocking.
"""
isready(c::Channel) = n_avail(c) > 0
function wait(c::Channel)
while !isready(c)
!isopen(c) && throw(closed_exception())
wait(c.cond_take)
end
nothing
end
function notify_error(c::Channel, err)
notify_error(c.cond_take, err)
notify_error(c.cond_put, err)
end
eltype{T}(::Type{Channel{T}}) = T
n_avail(c::Channel) = length(c.data)
show(io::IO, c::Channel) = print(io, "$(typeof(c))(sz_max:$(c.sz_max),sz_curr:$(n_avail(c)))")
start{T}(c::Channel{T}) = Ref{Nullable{T}}()
function done(c::Channel, state::Ref)
try
# we are waiting either for more data or channel to be closed
state[] = take!(c)
return false
catch e
if isa(e, InvalidStateException) && e.state==:closed
return true
else
rethrow(e)
end
end
end
next{T}(c::Channel{T}, state) = (v=get(state[]); state[]=nothing; (v, state))
iteratorsize{C<:Channel}(::Type{C}) = SizeUnknown()