Skip to content

Commit

Permalink
[FLINK-19773][runtime] Implement ExponentialDelayRestartBackoffTimeSt…
Browse files Browse the repository at this point in the history
…rategy

Apply spotless formatting

This closes apache#14425.
  • Loading branch information
Marek Sabo authored and tillrohrmann committed Jan 5, 2021
1 parent d2e9aeb commit 13b7b3e
Show file tree
Hide file tree
Showing 10 changed files with 879 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 20%">Key</th>
<th class="text-left" style="width: 15%">Default</th>
<th class="text-left" style="width: 10%">Type</th>
<th class="text-left" style="width: 55%">Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><h5>restart-strategy.exponential-delay.backoff-multiplier</h5></td>
<td style="word-wrap: break-word;">2.0</td>
<td>Double</td>
<td>Backoff value is multiplied by this value after every failure,until max backoff is reached if <span markdown="span">`restart-strategy`</span> has been set to <span markdown="span">`exponential-delay`</span>.</td>
</tr>
<tr>
<td><h5>restart-strategy.exponential-delay.initial-backoff</h5></td>
<td style="word-wrap: break-word;">1 s</td>
<td>Duration</td>
<td>Starting duration between restarts if <span markdown="span">`restart-strategy`</span> has been set to <span markdown="span">`exponential-delay`</span>. It can be specified using notation: "1 min", "20 s"</td>
</tr>
<tr>
<td><h5>restart-strategy.exponential-delay.jitter-factor</h5></td>
<td style="word-wrap: break-word;">0.1</td>
<td>Double</td>
<td>Jitter specified as a portion of the backoff if <span markdown="span">`restart-strategy`</span> has been set to <span markdown="span">`exponential-delay`</span>. It represents how large random value will be added or subtracted to the backoff. Useful when you want to avoid restarting multiple jobs at the same time.</td>
</tr>
<tr>
<td><h5>restart-strategy.exponential-delay.max-backoff</h5></td>
<td style="word-wrap: break-word;">5 min</td>
<td>Duration</td>
<td>The highest possible duration between restarts if <span markdown="span">`restart-strategy`</span> has been set to <span markdown="span">`exponential-delay`</span>. It can be specified using notation: "1 min", "20 s"</td>
</tr>
<tr>
<td><h5>restart-strategy.exponential-delay.reset-backoff-threshold</h5></td>
<td style="word-wrap: break-word;">1 h</td>
<td>Duration</td>
<td>Threshold when the backoff is reset to its initial value if <span markdown="span">`restart-strategy`</span> has been set to <span markdown="span">`exponential-delay`</span>. It specifies how long the job must be running without failure to reset the exponentially increasing backoff to its initial value. It can be specified using notation: "1 min", "20 s"</td>
</tr>
</tbody>
</table>
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
<td><h5>restart-strategy</h5></td>
<td style="word-wrap: break-word;">(none)</td>
<td>String</td>
<td>Defines the restart strategy to use in case of job failures.<br />Accepted values are:<ul><li><span markdown="span">`none`</span>, <span markdown="span">`off`</span>, <span markdown="span">`disable`</span>: No restart strategy.</li><li><span markdown="span">`fixeddelay`</span>, <span markdown="span">`fixed-delay`</span>: Fixed delay restart strategy. More details can be found <a href="../dev/task_failure_recovery.html#fixed-delay-restart-strategy">here</a>.</li><li><span markdown="span">`failurerate`</span>, <span markdown="span">`failure-rate`</span>: Failure rate restart strategy. More details can be found <a href="../dev/task_failure_recovery.html#failure-rate-restart-strategy">here</a>.</li></ul>If checkpointing is disabled, the default value is <span markdown="span">`none`</span>. If checkpointing is enabled, the default value is <span markdown="span">`fixed-delay`</span> with <span markdown="span">`Integer.MAX_VALUE`</span> restart attempts and '<span markdown="span">`1 s`</span>' delay.</td>
<td>Defines the restart strategy to use in case of job failures.<br />Accepted values are:<ul><li><span markdown="span">`none`</span>, <span markdown="span">`off`</span>, <span markdown="span">`disable`</span>: No restart strategy.</li><li><span markdown="span">`fixeddelay`</span>, <span markdown="span">`fixed-delay`</span>: Fixed delay restart strategy. More details can be found <a href="../dev/task_failure_recovery.html#fixed-delay-restart-strategy">here</a>.</li><li><span markdown="span">`failurerate`</span>, <span markdown="span">`failure-rate`</span>: Failure rate restart strategy. More details can be found <a href="../dev/task_failure_recovery.html#failure-rate-restart-strategy">here</a>.</li><li><span markdown="span">`exponentialdelay`</span>, <span markdown="span">`exponential-delay`</span>: Exponential delay restart strategy. More details can be found <a href="../dev/task_failure_recovery.html#exponential-delay-restart-strategy">here</a>.</li></ul>If checkpointing is disabled, the default value is <span markdown="span">`none`</span>. If checkpointing is enabled, the default value is <span markdown="span">`fixed-delay`</span> with <span markdown="span">`Integer.MAX_VALUE`</span> restart attempts and '<span markdown="span">`1 s`</span>' delay.</td>
</tr>
</tbody>
</table>
54 changes: 54 additions & 0 deletions docs/dev/task_failure_recovery.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,60 @@ env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
</div>


### Exponential Delay Restart Strategy

The exponential delay restart strategy attempts to restart the job infinitely, with increasing delay up to the maximum delay.
The job never fails.
In-between two consecutive restart attempts, the restart strategy keeps exponentially increasing until the maximum number is reached.
Then, it keeps the delay at the maximum number.

When the job executes correctly, the exponential delay value resets after some time; this threshold is configurable.

{% highlight yaml %}
restart-strategy: exponential-delay
{% endhighlight %}

{% include generated/exponential_delay_restart_strategy_configuration.html %}

For example:

{% highlight yaml %}
restart-strategy.exponential-delay.initial-backoff: 10 s
restart-strategy.exponential-delay.max-backoff: 2 min
restart-strategy.exponential-delay.backoff-multiplier: 2.0
restart-strategy.exponential-delay.reset-backoff-threshold: 10 min
restart-strategy.exponential-delay.jitter-factor: 0.1
{% endhighlight %}

The exponential delay restart strategy can also be set programmatically:

<div class="codetabs" markdown="1">
<div data-lang="java" markdown="1">
{% highlight java %}
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.milliseconds(1),
Time.milliseconds(1000),
1.1, // exponential multiplier
Time.milliseconds(2000), // threshold duration to reset delay to its initial value
0.1 // jitter
));
{% endhighlight %}
</div>
<div data-lang="scala" markdown="1">
{% highlight scala %}
val env = ExecutionEnvironment.getExecutionEnvironment()
env.setRestartStrategy(RestartStrategies.exponentialDelayRestart(
Time.of(1, TimeUnit.MILLISECONDS), // initial delay between restarts
Time.of(1000, TimeUnit.MILLISECONDS), // maximum delay between restarts
1.1, // exponential multiplier
Time.of(2, TimeUnit.SECONDS), // threshold duration to reset delay to its initial value
0.1 // jitter
))
{% endhighlight %}
</div>
</div>

### Failure Rate Restart Strategy

The failure rate restart strategy restarts job after failure, but when `failure rate` (failures per time interval) is exceeded, the job eventually fails.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,25 @@ public static FailureRateRestartStrategyConfiguration failureRateRestart(
failureRate, failureInterval, delayInterval);
}

/**
* Generates a ExponentialDelayRestartStrategyConfiguration.
*
* @param initialBackoff Starting duration between restarts
* @param maxBackoff The highest possible duration between restarts
* @param backoffMultiplier Delay multiplier how many times is the delay longer than before
* @param resetBackoffThreshold How long the job must run smoothly to reset the time interval
* @param jitterFactor How much the delay may differ (in percentage)
*/
public static ExponentialDelayRestartStrategyConfiguration exponentialDelayRestart(
Time initialBackoff,
Time maxBackoff,
double backoffMultiplier,
Time resetBackoffThreshold,
double jitterFactor) {
return new ExponentialDelayRestartStrategyConfiguration(
initialBackoff, maxBackoff, backoffMultiplier, resetBackoffThreshold, jitterFactor);
}

/** Abstract configuration for restart strategies. */
public abstract static class RestartStrategyConfiguration implements Serializable {
private static final long serialVersionUID = 6285853591578313960L;
Expand Down Expand Up @@ -188,6 +207,90 @@ public String getDescription() {
}
}

/** Configuration representing an exponential delay restart strategy. */
public static final class ExponentialDelayRestartStrategyConfiguration
extends RestartStrategyConfiguration {
private static final long serialVersionUID = 1467941615941965194L;

private final Time initialBackoff;
private final Time maxBackoff;
private final double backoffMultiplier;
private final Time resetBackoffThreshold;
private final double jitterFactor;

public ExponentialDelayRestartStrategyConfiguration(
Time initialBackoff,
Time maxBackoff,
double backoffMultiplier,
Time resetBackoffThreshold,
double jitterFactor) {
this.initialBackoff = initialBackoff;
this.maxBackoff = maxBackoff;
this.backoffMultiplier = backoffMultiplier;
this.resetBackoffThreshold = resetBackoffThreshold;
this.jitterFactor = jitterFactor;
}

public Time getInitialBackoff() {
return initialBackoff;
}

public Time getMaxBackoff() {
return maxBackoff;
}

public double getBackoffMultiplier() {
return backoffMultiplier;
}

public Time getResetBackoffThreshold() {
return resetBackoffThreshold;
}

public double getJitterFactor() {
return jitterFactor;
}

@Override
public String getDescription() {
return String.format(
"Restart with exponential delay: starting at %s, increasing by %f, with maximum %s. "
+ "Delay resets after %s with jitter %f",
initialBackoff,
backoffMultiplier,
maxBackoff,
resetBackoffThreshold,
jitterFactor);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
ExponentialDelayRestartStrategyConfiguration that =
(ExponentialDelayRestartStrategyConfiguration) o;
return Double.compare(that.backoffMultiplier, backoffMultiplier) == 0
&& Double.compare(that.jitterFactor, jitterFactor) == 0
&& Objects.equals(initialBackoff, that.initialBackoff)
&& Objects.equals(maxBackoff, that.maxBackoff)
&& Objects.equals(resetBackoffThreshold, that.resetBackoffThreshold);
}

@Override
public int hashCode() {
int result = initialBackoff.hashCode();
result = 31 * result + maxBackoff.hashCode();
result = 31 * result + (int) backoffMultiplier;
result = 31 * result + resetBackoffThreshold.hashCode();
result = 31 * result + (int) jitterFactor;
return result;
}
}

/** Configuration representing a failure rate restart strategy. */
public static final class FailureRateRestartStrategyConfiguration
extends RestartStrategyConfiguration {
Expand Down Expand Up @@ -304,6 +407,34 @@ private static RestartStrategyConfiguration parseConfiguration(
configuration.get(
RestartStrategyOptions.RESTART_STRATEGY_FIXED_DELAY_DELAY);
return fixedDelayRestart(attempts, delay.toMillis());
case "exponentialdelay":
case "exponential-delay":
Duration initialBackoff =
configuration.get(
RestartStrategyOptions
.RESTART_STRATEGY_EXPONENTIAL_DELAY_INITIAL_BACKOFF);
Duration maxBackoff =
configuration.get(
RestartStrategyOptions
.RESTART_STRATEGY_EXPONENTIAL_DELAY_MAX_BACKOFF);
double backoffMultiplier =
configuration.get(
RestartStrategyOptions
.RESTART_STRATEGY_EXPONENTIAL_DELAY_BACKOFF_MULTIPLIER);
Duration resetBackoffThreshold =
configuration.get(
RestartStrategyOptions
.RESTART_STRATEGY_EXPONENTIAL_DELAY_RESET_BACKOFF_THRESHOLD);
double jitter =
configuration.get(
RestartStrategyOptions
.RESTART_STRATEGY_EXPONENTIAL_DELAY_JITTER_FACTOR);
return exponentialDelayRestart(
Time.milliseconds(initialBackoff.toMillis()),
Time.milliseconds(maxBackoff.toMillis()),
backoffMultiplier,
Time.milliseconds(resetBackoffThreshold.toMillis()),
jitter);
case "failurerate":
case "failure-rate":
int maxFailures =
Expand Down
Loading

0 comments on commit 13b7b3e

Please sign in to comment.