Skip to content

Commit

Permalink
Fix asyncmap
Browse files Browse the repository at this point in the history
  • Loading branch information
Keno committed Dec 31, 2017
1 parent 9bef78f commit 75eeebb
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 24 deletions.
39 changes: 20 additions & 19 deletions base/asyncmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -331,17 +331,12 @@ function iterate(itr::AsyncCollector)
return iterate(itr, AsyncCollectorState(chnl, worker_tasks))
end

function check_done(itr::AsyncCollector, state::AsyncCollectorState)
if !isopen(state.chnl) || done(itr.enumerator, state.enum_state)
close(state.chnl)

# wait for all tasks to finish
foreach(x->(v=wait(x); isa(v, Exception) && throw(v)), state.worker_tasks)
empty!(state.worker_tasks)
return true
else
return false
end
function wait_done(itr::AsyncCollector, state::AsyncCollectorState)
close(state.chnl)

# wait for all tasks to finish
foreach(x->(v=wait(x); isa(v, Exception) && throw(v)), state.worker_tasks)
empty!(state.worker_tasks)
end

function iterate(itr::AsyncCollector, state::AsyncCollectorState)
Expand All @@ -353,7 +348,10 @@ function iterate(itr::AsyncCollector, state::AsyncCollectorState)
y = isdefined(state, :enum_state) ?
iterate(itr.enumerator, state.enum_state) :
iterate(itr.enumerator)
y == nothing && return nothing
if y == nothing
wait_done(itr, state)
return nothing
end
(i, args), state.enum_state = y
put!(state.chnl, (i, args))

Expand Down Expand Up @@ -383,25 +381,28 @@ end

mutable struct AsyncGeneratorState
i::Int
collector_done::Bool
collector_state::AsyncCollectorState
AsyncGeneratorState(i::Int) = new(0)
AsyncGeneratorState(i::Int) = new(i, false)
end

function iterate(itr::AsyncGenerator, state::AsyncGeneratorState=AsyncGeneratorState(0))
state.i += 1

results_dict = itr.collector.results
while !haskey(results_dict, state.i)
if check_done(itr.collector, state.collector_state)
while !state.collector_done && !haskey(results_dict, state.i)
y = isdefined(state, :collector_state) ?
iterate(itr.collector, state.collector_state) :
iterate(itr.collector)
if y == nothing
# `check_done` waits for async tasks to finish. if we do not have the index
# we are looking for, it is an error.
!haskey(results_dict, state.i) && error("Error processing index ", i)
state.collector_done = true
break;
end
_, state.collector_state = isdefined(state, :collector_state) ?
iterate(itr.collector, state.collector_state) :
iterate(itr.collector)
_, state.collector_state = y
end
state.collector_done && isempty(results_dict) && return nothing
r = results_dict[state.i]
delete!(results_dict, state.i)

Expand Down
15 changes: 10 additions & 5 deletions stdlib/Distributed/src/pmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,18 @@ julia> collect(c)
"""
function head_and_tail(c, n)
head = Vector{eltype(c)}(uninitialized, n)
s = start(c)
i = 0
while i < n && !done(c, s)
n == 0 && return (head, c)
i = 1
y = iterate(c)
y == nothing && return (resize!(head, 0), ())
head[i] = y[1]
while i < n
y = iterate(c, y[2])
y == nothing && return (resize!(head, i), ())
i += 1
head[i], s = next(c, s)
head[i] = y[1]
end
return resize!(head, i), Iterators.rest(c, s)
return head, Iterators.rest(c, s)
end

"""
Expand Down

0 comments on commit 75eeebb

Please sign in to comment.