Skip to content

Commit

Permalink
[FLINK-21136] Adjust timeouts for reactive mode
Browse files Browse the repository at this point in the history
This closes apache#15159
  • Loading branch information
rmetzger committed Mar 23, 2021
1 parent 049f0e9 commit 9635730
Show file tree
Hide file tree
Showing 15 changed files with 610 additions and 67 deletions.
6 changes: 6 additions & 0 deletions docs/content/docs/deployment/elastic_scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,12 @@ If you manually set a parallelism in your job for individual operators or the en

Note that such a high maxParallelism might affect performance of the job, since more internal structures are needed to maintain [some internal structures](https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html) of Flink.

When enabling Reactive Mode, the `jobmanager.adaptive-scheduler.resource-wait-timeout` configuration key will default to `-1`. This means that the JobManager will run forever waiting for sufficient resources.
If you want the JobManager to stop after a certain time without enough TaskManagers to run the job, configure `jobmanager.adaptive-scheduler.resource-wait-timeout`.

With Reactive Mode enabled, the `jobmanager.adaptive-scheduler.resource-stabilization-timeout` configuration key will default to `0`: Flink will start runnning the job, as soon as there are sufficient resources available.
In scenarios where TaskManagers are not connecting at the same time, but slowly one after another, this behavior leads to a job restart whenever a TaskManager connects. Increase this configuration value if you want to wait for the resources to stabilize before scheduling the job.

#### Recommendations

- **Configure periodic checkpointing for stateful jobs**: Reactive mode restores from the latest completed checkpoint on a rescale event. If no periodic checkpointing is enabled, your program will loose its state. Checkpointing also configures a **restart strategy**. Reactive mode will respect the configured restarting strategy: If no restarting strategy is configured, reactive mode will fail your job, instead of scaling it.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to scale up.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available. Once this timeout has passed, the job will start executing with the available resources.<br />If <span markdown="span">`scheduler-mode`</span> is configured to <span markdown="span">`REACTIVE`</span>, this configuration value will default to 0, so that jobs are starting immediately with the available resources.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-wait-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted), while decreasing this value reduces downtime of a job (provided that enough slots are available to still run the job).</td>
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).<br />Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.<br />If <span markdown="span">`scheduler-mode`</span> is configured to <span markdown="span">`REACTIVE`</span>, this configuration value will default to a negative value to disable the resource timeout.</td>
</tr>
<tr>
<td><h5>jobmanager.archive.fs.dir</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,17 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to scale up.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available. Once this timeout has passed, the job will start executing with the available resources.<br />If <span markdown="span">`scheduler-mode`</span> is configured to <span markdown="span">`REACTIVE`</span>, this configuration value will default to 0, so that jobs are starting immediately with the available resources.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-wait-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted), while decreasing this value reduces downtime of a job (provided that enough slots are available to still run the job).</td>
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).<br />Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.<br />If <span markdown="span">`scheduler-mode`</span> is configured to <span markdown="span">`REACTIVE`</span>, this configuration value will default to a negative value to disable the resource timeout.</td>
</tr>
<tr>
<td><h5>scheduler-mode</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,17 @@
<td>Integer</td>
<td>Configure the minimum increase in parallelism for a job to scale up.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available. Once this timeout has passed, the job will start executing with the available resources.<br />If <span markdown="span">`scheduler-mode`</span> is configured to <span markdown="span">`REACTIVE`</span>, this configuration value will default to 0, so that jobs are starting immediately with the available resources.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-wait-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted), while decreasing this value reduces downtime of a job (provided that enough slots are available to still run the job).</td>
<td>The maximum time the JobManager will wait to acquire all required resources after a job submission or restart. Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.<br />Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).<br />Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.<br />If <span markdown="span">`scheduler-mode`</span> is configured to <span markdown="span">`REACTIVE`</span>, this configuration value will default to a negative value to disable the resource timeout.</td>
</tr>
<tr>
<td><h5>jobmanager.archive.fs.dir</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
package org.apache.flink.api.common.time;

import org.apache.flink.annotation.Internal;
import org.apache.flink.util.clock.Clock;
import org.apache.flink.util.clock.SystemClock;

import java.time.Duration;
import java.util.concurrent.TimeoutException;
Expand All @@ -30,20 +32,24 @@ public class Deadline {
/** The deadline, relative to {@link System#nanoTime()}. */
private final long timeNanos;

private Deadline(long deadline) {
/** Clock providing the time for this deadline. */
private final Clock clock;

private Deadline(long deadline, Clock clock) {
this.timeNanos = deadline;
this.clock = clock;
}

public Deadline plus(Duration other) {
return new Deadline(Math.addExact(timeNanos, other.toNanos()));
return new Deadline(Math.addExact(timeNanos, other.toNanos()), this.clock);
}

/**
* Returns the time left between the deadline and now. The result is negative if the deadline
* has passed.
*/
public Duration timeLeft() {
return Duration.ofNanos(Math.subtractExact(timeNanos, System.nanoTime()));
return Duration.ofNanos(Math.subtractExact(timeNanos, clock.relativeTimeNanos()));
}

/**
Expand All @@ -53,7 +59,7 @@ public Duration timeLeft() {
* @throws TimeoutException if no time is left
*/
public Duration timeLeftIfAny() throws TimeoutException {
long nanos = Math.subtractExact(timeNanos, System.nanoTime());
long nanos = Math.subtractExact(timeNanos, clock.relativeTimeNanos());
if (nanos <= 0) {
throw new TimeoutException();
}
Expand All @@ -67,7 +73,7 @@ public boolean hasTimeLeft() {

/** Determines whether the deadline is in the past, i.e. whether the time left is negative. */
public boolean isOverdue() {
return timeNanos < System.nanoTime();
return timeNanos < clock.relativeTimeNanos();
}

// ------------------------------------------------------------------------
Expand All @@ -79,11 +85,23 @@ public boolean isOverdue() {
* {@link #plus(Duration)} to specify a deadline in the future.
*/
public static Deadline now() {
return new Deadline(System.nanoTime());
return new Deadline(System.nanoTime(), SystemClock.getInstance());
}

/** Constructs a Deadline that is a given duration after now. */
public static Deadline fromNow(Duration duration) {
return new Deadline(Math.addExact(System.nanoTime(), duration.toNanos()));
return new Deadline(
Math.addExact(System.nanoTime(), duration.toNanos()), SystemClock.getInstance());
}

/**
* Constructs a Deadline that is a given duration after now, where now and all other times from
* this deadline are defined by the given {@link Clock}.
*
* @param duration Duration for this deadline.
* @param clock Time provider for this deadline.
*/
public static Deadline fromNowWithClock(Duration duration, Clock clock) {
return new Deadline(Math.addExact(clock.relativeTimeNanos(), duration.toNanos()), clock);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -418,8 +418,35 @@ public enum SchedulerType {
+ "Once elapsed it will try to run the job with a lower parallelism, or fail if the minimum amount of resources could not be acquired.")
.linebreak()
.text(
"Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted), "
+ "while decreasing this value reduces downtime of a job (provided that enough slots are available to still run the job).")
"Increasing this value will make the cluster more resilient against temporary resources shortages (e.g., there is more time for a failed TaskManager to be restarted).")
.linebreak()
.text(
"Setting a negative duration will disable the resource timeout: The JobManager will wait indefinitely for resources to appear.")
.linebreak()
.text(
"If %s is configured to %s, this configuration value will default to a negative value to disable the resource timeout.",
code(SCHEDULER_MODE.key()),
code(SchedulerExecutionMode.REACTIVE.name()))
.build());

@Documentation.Section({
Documentation.Sections.EXPERT_SCHEDULING,
Documentation.Sections.ALL_JOB_MANAGER
})
public static final ConfigOption<Duration> RESOURCE_STABILIZATION_TIMEOUT =
key("jobmanager.adaptive-scheduler.resource-stabilization-timeout")
.durationType()
.defaultValue(RESOURCE_WAIT_TIMEOUT.defaultValue())
.withDescription(
Description.builder()
.text(
"The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available. "
+ "Once this timeout has passed, the job will start executing with the available resources.")
.linebreak()
.text(
"If %s is configured to %s, this configuration value will default to 0, so that jobs are starting immediately with the available resources.",
code(SCHEDULER_MODE.key()),
code(SchedulerExecutionMode.REACTIVE.name()))
.build());

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.flink.configuration.ClusterOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.SchedulerExecutionMode;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
Expand All @@ -49,6 +50,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.concurrent.ScheduledExecutorService;

/** Default {@link SlotPoolServiceSchedulerFactory} implementation. */
Expand Down Expand Up @@ -167,7 +169,8 @@ public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration(
batchSlotTimeout);
break;
case Adaptive:
schedulerNGFactory = new AdaptiveSchedulerFactory();
schedulerNGFactory =
getAdaptiveSchedulerFactoryFromConfiguration(configuration);
slotPoolServiceFactory =
new DeclarativeSlotPoolServiceFactory(
SystemClock.getInstance(), slotIdleTimeout, rpcTimeout);
Expand All @@ -193,4 +196,30 @@ public static DefaultSlotPoolServiceSchedulerFactory fromConfiguration(
return new DefaultSlotPoolServiceSchedulerFactory(
slotPoolServiceFactory, schedulerNGFactory);
}

private static AdaptiveSchedulerFactory getAdaptiveSchedulerFactoryFromConfiguration(
Configuration configuration) {
Duration allocationTimeoutDefault = JobManagerOptions.RESOURCE_WAIT_TIMEOUT.defaultValue();
Duration stabilizationTimeoutDefault =
JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT.defaultValue();

if (configuration.get(JobManagerOptions.SCHEDULER_MODE)
== SchedulerExecutionMode.REACTIVE) {
allocationTimeoutDefault = Duration.ofMillis(-1);
stabilizationTimeoutDefault = Duration.ZERO;
}

final Duration initialResourceAllocationTimeout =
configuration
.getOptional(JobManagerOptions.RESOURCE_WAIT_TIMEOUT)
.orElse(allocationTimeoutDefault);

final Duration resourceStabilizationTimeout =
configuration
.getOptional(JobManagerOptions.RESOURCE_STABILIZATION_TIMEOUT)
.orElse(stabilizationTimeoutDefault);

return new AdaptiveSchedulerFactory(
initialResourceAllocationTimeout, resourceStabilizationTimeout);
}
}
Loading

0 comments on commit 9635730

Please sign in to comment.