Skip to content

Commit

Permalink
make retry() more flexible (JuliaLang#19331)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjarthur authored and amitmurthy committed Jan 12, 2017
1 parent d357955 commit 31b52b4
Show file tree
Hide file tree
Showing 7 changed files with 98 additions and 65 deletions.
75 changes: 50 additions & 25 deletions base/error.jl
Original file line number Diff line number Diff line change
Expand Up @@ -82,45 +82,70 @@ macro assert(ex, msgs...)
return :($(esc(ex)) ? $(nothing) : throw(Main.Base.AssertionError($msg)))
end

# NOTE: Please keep the constant values specified below in sync with the doc string
const DEFAULT_RETRY_N = 1
const DEFAULT_RETRY_ON = e->true
const DEFAULT_RETRY_MAX_DELAY = 10.0
immutable ExponentialBackOff
n::Int
first_delay::Float64
max_delay::Float64
factor::Float64
jitter::Float64

function ExponentialBackOff(n, first_delay, max_delay, factor, jitter)
all(x->x>=0, (n, first_delay, max_delay, factor, jitter)) || error("all inputs must be non-negative")
new(n, first_delay, max_delay, factor, jitter)
end
end

"""
retry(f, [retry_on]; n=1, max_delay=10.0) -> Function
ExponentialBackOff(; n=1, first_delay=0.05, max_delay=10.0, factor=5.0, jitter=0.1)
Returns a lambda that retries function `f` up to `n` times in the
event of an exception. If `retry_on` is a `Type` then retry only
for exceptions of that type. If `retry_on` is a function
`test_error(::Exception) -> Bool` then retry only if it is true.
A `Float64` iterator of length `n` whose elements exponentially increase at a
rate in the interval `factor` * (1 ± `jitter`). The first element is
`first_delay` and all elements are clamped to `max_delay`.
"""
ExponentialBackOff(; n=1, first_delay=0.05, max_delay=10.0, factor=5.0, jitter=0.1) =
ExponentialBackOff(n, first_delay, max_delay, factor, jitter)
start(ebo::ExponentialBackOff) = (ebo.n, min(ebo.first_delay, ebo.max_delay))
function next(ebo::ExponentialBackOff, state)
next_n = state[1]-1
curr_delay = state[2]
next_delay = min(ebo.max_delay, state[2] * ebo.factor * (1.0 - ebo.jitter + (rand() * 2.0 * ebo.jitter)))
(curr_delay, (next_n, next_delay))
end
done(ebo::ExponentialBackOff, state) = state[1]<1
length(ebo::ExponentialBackOff) = ebo.n

The first retry happens after a gap of 50 milliseconds or `max_delay`,
whichever is lower. Subsequently, the delays between retries are
exponentially increased with a random factor up to `max_delay`.
"""
retry(f::Function; delays=Base.ExponentialBackOff(), check=nothing) -> Function
Returns an anonymous function that calls function `f`. If an exception arises,
`f` is repeatedly called again, each time `check` returns `true`, after waiting the
number of seconds specified in `delays`. `check` should input `delays`'s
current state and the `Exception`.
**Examples**
# Examples
```julia
retry(http_get, e -> e.status == "503")(url)
retry(read, UVError)(io)
retry(f, delays=fill(5.0, 3))
retry(f, delays=rand(5:10, 2))
retry(f, delays=Base.ExponentialBackOff(n=3, first_delay=5, max_delay=1000))
retry(http_get, check=(s,e)->e.status == "503")(url)
retry(read, check=(s,e)->isa(e, UVError))(io)
```
"""
function retry(f::Function, retry_on::Function=DEFAULT_RETRY_ON; n=DEFAULT_RETRY_N, max_delay=DEFAULT_RETRY_MAX_DELAY)
function retry(f::Function; delays=ExponentialBackOff(), check=nothing)
(args...) -> begin
delay = min(0.05, max_delay)
for i = 1:n+1
state = start(delays)
while true
try
return f(args...)
catch e
if i > n || try retry_on(e) end !== true
rethrow(e)
done(delays, state) && rethrow(e)
if check !== nothing
state, retry_or_not = check(state, e)
retry_or_not || rethrow(e)
end
end
delay = min(max_delay, delay)
sleep(delay * (0.8 + (rand() * 0.2)))
delay = delay * 5
(delay, state) = next(delays, state)
sleep(delay)
end
end
end

retry(f::Function, t::Type; kw...) = retry(f, e->isa(e, t); kw...)
1 change: 1 addition & 0 deletions base/exports.jl
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ export
EachLine,
Enum,
Enumerate,
ExponentialBackOff,
Factorization,
FileMonitor,
FloatRange,
Expand Down
48 changes: 23 additions & 25 deletions base/pmap.jl
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ pgenerate(f, c) = pgenerate(default_worker_pool(), f, c)
pgenerate(f, c1, c...) = pgenerate(a->f(a...), zip(c1, c...))

"""
pmap([::AbstractWorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_n=0, retry_max_delay=DEFAULT_RETRY_MAX_DELAY, retry_on=DEFAULT_RETRY_ON) -> collection
pmap([::AbstractWorkerPool], f, c...; distributed=true, batch_size=1, on_error=nothing, retry_delays=[]), retry_check=nothing) -> collection
Transform collection `c` by applying `f` to each element using available
workers and tasks.
Expand Down Expand Up @@ -59,27 +59,27 @@ you can specify an error handling function via argument `on_error` which takes i
the exception. The function can stop the processing by rethrowing the error, or, to continue, return any value
which is then returned inline with the results to the caller.
Failed computation can also be retried via `retry_on`, `retry_n`, `retry_max_delay`, which are passed through
to `retry` as arguments `retry_on`, `n` and `max_delay` respectively. If batching is specified, and an entire batch fails,
all items in the batch are retried.
Failed computation can also be retried via `retry_delays`, `retry_check`, which
are passed through to `retry` as keyword arguments `delays` and `check`,
respectively. If batching is specified, and an entire batch fails, all items in
the batch are retried.
The following are equivalent:
* `pmap(f, c; distributed=false)` and `asyncmap(f,c)`
* `pmap(f, c; retry_n=1)` and `asyncmap(retry(remote(f)),c)`
* `pmap(f, c; retry_n=1, on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)`
* `pmap(f, c; retry_delays=Base.ExponentialBackOff())` and `asyncmap(retry(remote(f)),c)`
* `pmap(f, c; retry_delays=Base.ExponentialBackOff(), on_error=e->e)` and `asyncmap(x->try retry(remote(f))(x) catch e; e end, c)`
"""
function pmap(p::AbstractWorkerPool, f, c; distributed=true, batch_size=1, on_error=nothing,
retry_n=0,
retry_max_delay=DEFAULT_RETRY_MAX_DELAY,
retry_on=DEFAULT_RETRY_ON,
retry_delays=[],
retry_check=nothing,
# deprecated keyword args:
err_retry=nothing, err_stop=nothing, pids=nothing)
#15409
if err_retry !== nothing
depwarn("err_retry is deprecated, use pmap(retry(f), c...).", :pmap)
if err_retry == true
f = retry(f)
f = retry(f, delays=retry_delays, check=retry_check)
end
end
if pids !== nothing
Expand Down Expand Up @@ -110,8 +110,8 @@ function pmap(p::AbstractWorkerPool, f, c; distributed=true, batch_size=1, on_er
f = remote(p, f)
end

if retry_n > 0
f = wrap_retry(f, retry_on, retry_n, retry_max_delay)
if length(retry_delays) > 0
f = wrap_retry(f, retry_delays, retry_check)
end
if on_error !== nothing
f = wrap_on_error(f, on_error)
Expand All @@ -122,18 +122,18 @@ function pmap(p::AbstractWorkerPool, f, c; distributed=true, batch_size=1, on_er
# During batch processing, We need to ensure that if on_error is set, it is called
# for each element in error, and that we return as many elements as the original list.
# retry, if set, has to be called element wise and we will do a best-effort
# to ensure that we do not call mapped function on the same element more than retry_n.
# to ensure that we do not call mapped function on the same element more than length(retry_delays).
# This guarantee is not possible in case of worker death / network errors, wherein
# we will retry the entire batch on a new worker.
if (on_error !== nothing) || (retry_n > 0)
if (on_error !== nothing) || (length(retry_delays) > 0)
f = wrap_on_error(f, (x,e)->BatchProcessingError(x,e); capture_data=true)
end
f = wrap_batch(f, p, on_error)
results = asyncmap(f, c; ntasks=()->nworkers(p), batch_size=batch_size)

# handle error processing....
if (on_error !== nothing) || (retry_n > 0)
process_batch_errors!(p, f_orig, results, on_error, retry_on, retry_n, retry_max_delay)
if (on_error !== nothing) || (length(retry_delays) > 0)
process_batch_errors!(p, f_orig, results, on_error, retry_delays, retry_check)
end

return results
Expand All @@ -158,7 +158,7 @@ function wrap_on_error(f, on_error; capture_data=false)
end
end

wrap_retry(f, retry_on, n, max_delay) = retry(f, retry_on; n=n, max_delay=max_delay)
wrap_retry(f, retry_delays, retry_check) = retry(f, delays=retry_delays, check=retry_check)

function wrap_batch(f, p, on_error)
f = asyncmap_batch(f)
Expand All @@ -177,9 +177,9 @@ end

asyncmap_batch(f) = batch -> asyncmap(x->f(x...), batch)

function process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry_max_delay)
function process_batch_errors!(p, f, results, on_error, retry_delays, retry_check)
# Handle all the ones in error in another pmap, with batch size set to 1
if (on_error !== nothing) || (retry_n > 0)
if (on_error !== nothing) || (length(retry_delays) > 0)
reprocess = []
for (idx, v) in enumerate(results)
if isa(v, BatchProcessingError)
Expand All @@ -190,13 +190,11 @@ function process_batch_errors!(p, f, results, on_error, retry_on, retry_n, retry
if length(reprocess) > 0
errors = [x[2] for x in reprocess]
exceptions = [x.ex for x in errors]
if (retry_n > 0) && all([retry_on(ex) for ex in exceptions])
retry_n = retry_n - 1
state = start(retry_delays)
if (length(retry_delays) > 0) &&
(retry_check==nothing || all([retry_check(state,ex)[2] for ex in exceptions]))
error_processed = pmap(p, f, [x.data for x in errors];
on_error=on_error,
retry_on=retry_on,
retry_n=retry_n,
retry_max_delay=retry_max_delay)
on_error = on_error, retry_delays = collect(retry_delays)[2:end], retry_check = retry_check)
elseif on_error !== nothing
error_processed = map(on_error, exceptions)
else
Expand Down
2 changes: 1 addition & 1 deletion base/workerpool.jl
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ end
"""
remote([::AbstractWorkerPool], f) -> Function
Returns a lambda that executes function `f` on an available worker
Returns an anonymous function that executes function `f` on an available worker
using [`remotecall_fetch`](@ref).
"""
remote(f) = (args...; kwargs...)->remotecall_fetch(f, default_worker_pool(), args...; kwargs...)
Expand Down
1 change: 1 addition & 0 deletions doc/src/stdlib/base.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,7 @@ Core.UndefRefError
Core.UndefVarError
Base.InitError
Base.retry
Base.ExponentialBackOff
```

## Events
Expand Down
25 changes: 17 additions & 8 deletions test/error.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,14 @@
# This file is a part of Julia. License is MIT: http:https://julialang.org/license

@test length(ExponentialBackOff(n=10)) == 10
@test collect(ExponentialBackOff(n=10, first_delay=0.01))[1] == 0.01
@test maximum(ExponentialBackOff(n=10, max_delay=0.06)) == 0.06
ratio(x) = x[2:end]./x[1:end-1]
@test all(x->isapprox(x,10.0), ratio(collect(
ExponentialBackOff(n=10, max_delay=Inf, factor=10, jitter=0.0))))
srand(12345)
@test (mean(ratio(collect(ExponentialBackOff(n=100, max_delay=Inf, factor=1, jitter=0.1)))) - 1.0) < 1e-4

let
function foo_error(c, n)
c[1] += 1
Expand All @@ -16,42 +25,42 @@ let

# Success on second attempt
c = [0]
@test retry(foo_error;n=1)(c,1) == 7
@test retry(foo_error)(c,1) == 7
@test c[1] == 2

# 2 failed retry attempts, so exception is raised
c = [0]
ex = try retry(foo_error;n=2)(c,3) catch e; e end
ex = try retry(foo_error, delays=ExponentialBackOff(n=2))(c,3) catch e; e end
@test ex.msg == "foo"
@test c[1] == 3

c = [0]
ex = try retry(foo_error, ErrorException)(c,2) catch e; e end
ex = try retry(foo_error, check=(s,e)->(s,isa(e, ErrorException)))(c,2) catch e; e end
@test typeof(ex) == ErrorException
@test ex.msg == "foo"
@test c[1] == 2

c = [0]
ex = try retry(foo_error, e->e.msg == "foo")(c,2) catch e; e end
ex = try retry(foo_error, check=(s,e)->(s,e.msg == "foo"))(c,2) catch e; e end
@test typeof(ex) == ErrorException
@test ex.msg == "foo"
@test c[1] == 2

# No retry if condition does not match
c = [0]
ex = try retry(foo_error, e->e.msg == "bar"; n=3)(c,2) catch e; e end
ex = try retry(foo_error, check=(s,e)->(s,e.msg == "bar"))(c,2) catch e; e end
@test typeof(ex) == ErrorException
@test ex.msg == "foo"
@test c[1] == 1

c = [0]
ex = try retry(foo_error, e->e.http_status_code == "503")(c,2) catch e; e end
ex = try retry(foo_error, check=(s,e)->(s,try e.http_status_code == "503" end != true))(c,2) catch e; e end
@test typeof(ex) == ErrorException
@test ex.msg == "foo"
@test c[1] == 1
@test c[1] == 2

c = [0]
ex = try retry(foo_error, SystemError)(c,2) catch e; e end
ex = try retry(foo_error, check=(s,e)->(s,isa(e,SystemError)))(c,2) catch e; e end
@test typeof(ex) == ErrorException
@test ex.msg == "foo"
@test c[1] == 1
Expand Down
11 changes: 5 additions & 6 deletions test/parallel_exec.jl
Original file line number Diff line number Diff line change
Expand Up @@ -683,9 +683,8 @@ pmap_args = [
(:distributed, [:default, false]),
(:batch_size, [:default,2]),
(:on_error, [:default, e -> unmangle_exception(e).msg == "foobar"]),
(:retry_on, [:default, e -> unmangle_exception(e).msg == "foobar"]),
(:retry_n, [:default, typemax(Int)-1]),
(:retry_max_delay, [0, 0.001])
(:retry_delays, [:default, fill(0.01, 1000)]),
(:retry_check, [:default, (s,e) -> (s,unmangle_exception(e).msg == "foobar")]),
]

kwdict = Dict()
Expand All @@ -702,7 +701,7 @@ function walk_args(i)

testw = kwdict[:distributed] === false ? [1] : workers()

if (kwdict[:on_error] === :default) && (kwdict[:retry_n] === :default)
if (kwdict[:on_error] === :default) && (kwdict[:retry_delays] === :default)
mapf = x -> (x*2, myid())
results_test = pmap_res -> begin
results = [x[1] for x in pmap_res]
Expand All @@ -712,7 +711,7 @@ function walk_args(i)
@test p in pids
end
end
elseif kwdict[:retry_n] !== :default
elseif kwdict[:retry_delays] !== :default
mapf = x -> iseven(myid()) ? error("foobar") : (x*2, myid())
results_test = pmap_res -> begin
results = [x[1] for x in pmap_res]
Expand All @@ -726,7 +725,7 @@ function walk_args(i)
end
end
end
else (kwdict[:on_error] !== :default) && (kwdict[:retry_n] === :default)
else (kwdict[:on_error] !== :default) && (kwdict[:retry_delays] === :default)
mapf = x -> iseven(x) ? error("foobar") : (x*2, myid())
results_test = pmap_res -> begin
w = testw
Expand Down

0 comments on commit 31b52b4

Please sign in to comment.