Skip to content

Commit

Permalink
Merge pull request #25261 from JuliaLang/kf/iterate
Browse files Browse the repository at this point in the history
Iteration protocol change
  • Loading branch information
Keno committed May 18, 2018
2 parents cea567d + 8ade016 commit 0f8d4f8
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 66 deletions.
18 changes: 12 additions & 6 deletions src/pmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,8 @@ function process_batch_errors!(p, f, results, on_error, retry_delays, retry_chec
if length(reprocess) > 0
errors = [x[2] for x in reprocess]
exceptions = [x.ex for x in errors]
state = start(retry_delays)
state = iterate(retry_delays)
state != nothing && (state = state[2])
if (length(retry_delays) > 0) &&
(retry_check==nothing || all([retry_check(state,ex)[2] for ex in exceptions]))
# BatchProcessingError.data is a tuple of original args
Expand Down Expand Up @@ -256,13 +257,18 @@ julia> collect(c)
"""
function head_and_tail(c, n)
head = Vector{eltype(c)}(undef, n)
s = start(c)
i = 0
while i < n && !done(c, s)
n == 0 && return (head, c)
i = 1
y = iterate(c)
y == nothing && return (resize!(head, 0), ())
head[i] = y[1]
while i < n
y = iterate(c, y[2])
y == nothing && return (resize!(head, i), ())
i += 1
head[i], s = next(c, s)
head[i] = y[1]
end
return resize!(head, i), Iterators.rest(c, s)
return head, Iterators.rest(c, s)
end

"""
Expand Down
60 changes: 0 additions & 60 deletions src/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -54,24 +54,6 @@ precompile(Tuple{typeof(Distributed.worker_id_from_socket), Sockets.TCPSocket})
precompile(Tuple{Type{Distributed.ClusterSerializer{Sockets.TCPSocket}}, Sockets.TCPSocket})
precompile(Tuple{typeof(Distributed.send_msg_), Distributed.Worker, Distributed.MsgHeader, Distributed.ResultMsg, Bool})
precompile(Tuple{typeof(Distributed.send_msg_now), Distributed.Worker, Distributed.MsgHeader, Distributed.ResultMsg})
precompile(Tuple{Type{Core.Compiler.Generator{I, F} where F where I}, Type{Core.Compiler.Const}, Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{Type{Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}}, Type{Core.Compiler.Const}, Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{typeof(Core.Compiler.convert), Type{Tuple{Int64, typeof(Distributed.rmprocs)}}, Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{typeof(Core.Compiler.collect), Type{Any}, Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}})
precompile(Tuple{typeof(Core.Compiler._collect), Type{Any}, Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}, Core.Compiler.HasLength})
precompile(Tuple{typeof(Core.Compiler.length), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}})
precompile(Tuple{typeof(Core.Compiler.length), Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{typeof(Core.Compiler.copyto!), Array{Any, 1}, Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}})
precompile(Tuple{typeof(Core.Compiler.start), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}})
precompile(Tuple{typeof(Core.Compiler.start), Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{typeof(Core.Compiler.done), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}, Int64})
precompile(Tuple{typeof(Core.Compiler.done), Tuple{Int64, typeof(Distributed.rmprocs)}, Int64})
precompile(Tuple{typeof(Core.Compiler.next), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}, Int64})
precompile(Tuple{typeof(Core.Compiler.next), Tuple{Int64, typeof(Distributed.rmprocs)}, Int64})
precompile(Tuple{typeof(Core.Compiler.getindex), Tuple{Int64, typeof(Distributed.rmprocs)}, Int64})
precompile(Tuple{typeof(Core.Compiler.start), Tuple{typeof(Distributed.rmprocs), Int64}})
precompile(Tuple{typeof(Core.Compiler.indexed_next), Tuple{typeof(Distributed.rmprocs), Int64}, Int64, Int64})
precompile(Tuple{typeof(Core.Compiler.getindex), Tuple{typeof(Distributed.rmprocs), Int64}, Int64})
precompile(Tuple{typeof(Distributed.register_worker_streams), Distributed.Worker})
precompile(Tuple{typeof(Distributed.register_worker_streams), Distributed.LocalProcess})
precompile(Tuple{Type{Distributed.ClusterSerializer{Sockets.TCPSocket}}, Sockets.TCPSocket})
Expand All @@ -81,15 +63,6 @@ precompile(Tuple{typeof(Base.convert), Type{Distributed.ClusterManager}, Distrib
precompile(Tuple{typeof(Base.convert), Type{Distributed.WorkerConfig}, Distributed.WorkerConfig})
precompile(Tuple{typeof(Base.get), Base.Dict{Any, Any}, Distributed.RRID, Bool})
precompile(Tuple{typeof(Base.ht_keyindex), Base.Dict{Any, Any}, Distributed.RRID})
precompile(Tuple{typeof(Core.Compiler.isbits), Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{Type{Core.Compiler.Generator{I, F} where F where I}, Type{QuoteNode}, Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{Type{Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}}, Type{QuoteNode}, Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{typeof(Core.Compiler._collect), Type{Any}, Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}, Core.Compiler.HasLength})
precompile(Tuple{typeof(Core.Compiler.length), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}})
precompile(Tuple{typeof(Core.Compiler.copyto!), Array{Any, 1}, Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}})
precompile(Tuple{typeof(Core.Compiler.start), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}})
precompile(Tuple{typeof(Core.Compiler.done), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}, Int64})
precompile(Tuple{typeof(Core.Compiler.next), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}, Int64})
precompile(Tuple{typeof(Distributed.local_remotecall_thunk), typeof(Distributed.rmprocs), Tuple{Int64}, Array{Any, 1}})
precompile(Tuple{typeof(Distributed.remote_do), typeof(Distributed.rmprocs), Distributed.Worker, Int64})
precompile(Tuple{typeof(Distributed.remote_do), typeof(Distributed.rmprocs), Distributed.LocalProcess, Int64})
Expand Down Expand Up @@ -140,39 +113,6 @@ precompile(Tuple{typeof(Serialization.serialize_cycle_header), Distributed.Clust
precompile(Tuple{typeof(Serialization.serialize_any), Distributed.ClusterSerializer{Sockets.TCPSocket}, Type{Union{Tuple{Any, Int64}, Tuple{Tuple{}, Any, Bool}}}})
precompile(Tuple{typeof(Distributed.send_msg_), Distributed.Worker, Distributed.MsgHeader, Distributed.ResultMsg, Bool})
precompile(Tuple{typeof(Distributed.send_msg_now), Distributed.Worker, Distributed.MsgHeader, Distributed.ResultMsg})
precompile(Tuple{Type{Core.Compiler.Generator{I, F} where F where I}, Type{Core.Compiler.Const}, Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{Type{Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}}, Type{Core.Compiler.Const}, Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{typeof(Core.Compiler.convert), Type{Tuple{Int64, typeof(Distributed.rmprocs)}}, Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{typeof(Core.Compiler.collect), Type{Any}, Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}})
precompile(Tuple{typeof(Core.Compiler._collect), Type{Any}, Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}, Core.Compiler.HasLength})
precompile(Tuple{typeof(Core.Compiler.length), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}})
precompile(Tuple{typeof(Core.Compiler.length), Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{typeof(Core.Compiler.copyto!), Array{Any, 1}, Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}})
precompile(Tuple{typeof(Core.Compiler.start), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}})
precompile(Tuple{typeof(Core.Compiler.start), Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{typeof(Core.Compiler.done), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}, Int64})
precompile(Tuple{typeof(Core.Compiler.done), Tuple{Int64, typeof(Distributed.rmprocs)}, Int64})
precompile(Tuple{typeof(Core.Compiler.next), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{Core.Compiler.Const}}, Int64})
precompile(Tuple{typeof(Core.Compiler.next), Tuple{Int64, typeof(Distributed.rmprocs)}, Int64})
precompile(Tuple{typeof(Core.Compiler.getindex), Tuple{Int64, typeof(Distributed.rmprocs)}, Int64})
precompile(Tuple{typeof(Core.Compiler.start), Tuple{typeof(Distributed.rmprocs), Int64}})
precompile(Tuple{typeof(Core.Compiler.indexed_next), Tuple{typeof(Distributed.rmprocs), Int64}, Int64, Int64})
precompile(Tuple{typeof(Core.Compiler.getindex), Tuple{typeof(Distributed.rmprocs), Int64}, Int64})
precompile(Tuple{typeof(Serialization.deserialize), Distributed.ClusterSerializer{Sockets.TCPSocket}})
precompile(Tuple{typeof(Serialization.deserialize_cycle), Distributed.ClusterSerializer{Sockets.TCPSocket}, Expr})
precompile(Tuple{typeof(Serialization.handle_deserialize), Distributed.ClusterSerializer{Sockets.TCPSocket}, Int32})
precompile(Tuple{typeof(Serialization.deserialize_array), Distributed.ClusterSerializer{Sockets.TCPSocket}})
precompile(Tuple{typeof(Serialization.deserialize_datatype), Distributed.ClusterSerializer{Sockets.TCPSocket}})
precompile(Tuple{typeof(Serialization.deserialize_expr), Distributed.ClusterSerializer{Sockets.TCPSocket}, Int64})
precompile(Tuple{typeof(Core.Compiler.isbits), Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{Type{Core.Compiler.Generator{I, F} where F where I}, Type{QuoteNode}, Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{Type{Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}}, Type{QuoteNode}, Tuple{Int64, typeof(Distributed.rmprocs)}})
precompile(Tuple{typeof(Core.Compiler._collect), Type{Any}, Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}, Core.Compiler.HasLength})
precompile(Tuple{typeof(Core.Compiler.length), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}})
precompile(Tuple{typeof(Core.Compiler.copyto!), Array{Any, 1}, Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}})
precompile(Tuple{typeof(Core.Compiler.start), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}})
precompile(Tuple{typeof(Core.Compiler.done), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}, Int64})
precompile(Tuple{typeof(Core.Compiler.next), Core.Compiler.Generator{Tuple{Int64, typeof(Distributed.rmprocs)}, Type{QuoteNode}}, Int64})
precompile(Tuple{typeof(Distributed.local_remotecall_thunk), typeof(Distributed.rmprocs), Tuple{Int64}, Array{Any, 1}})
precompile(Tuple{typeof(Distributed.remote_do), typeof(Distributed.rmprocs), Distributed.Worker, Int64})
precompile(Tuple{typeof(Distributed.remote_do), typeof(Distributed.rmprocs), Distributed.LocalProcess, Int64})
Expand Down

0 comments on commit 0f8d4f8

Please sign in to comment.