Skip to content

Commit

Permalink
[FLINK-21076][docs] Add section about Adaptive Scheduler
Browse files Browse the repository at this point in the history
This closes apache#15355
  • Loading branch information
rmetzger committed Mar 25, 2021
1 parent 18a32e8 commit c916622
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 10 deletions.
45 changes: 40 additions & 5 deletions docs/content.zh/docs/deployment/elastic_scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,20 @@ If you manually set a parallelism in your job for individual operators or the en

Note that such a high maxParallelism might affect performance of the job, since more internal structures are needed to maintain [some internal structures](https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html) of Flink.

When enabling Reactive Mode, the `jobmanager.adaptive-scheduler.resource-wait-timeout` configuration key will default to `-1`. This means that the JobManager will run forever waiting for sufficient resources.
If you want the JobManager to stop after a certain time without enough TaskManagers to run the job, configure `jobmanager.adaptive-scheduler.resource-wait-timeout`.

With Reactive Mode enabled, the `jobmanager.adaptive-scheduler.resource-stabilization-timeout` configuration key will default to `0`: Flink will start runnning the job, as soon as there are sufficient resources available.
In scenarios where TaskManagers are not connecting at the same time, but slowly one after another, this behavior leads to a job restart whenever a TaskManager connects. Increase this configuration value if you want to wait for the resources to stabilize before scheduling the job.

#### Recommendations

- **Configure periodic checkpointing for stateful jobs**: Reactive mode restores from the latest completed checkpoint on a rescale event. If no periodic checkpointing is enabled, your program will loose its state. Checkpointing also configures a **restart strategy**. Reactive mode will respect the configured restarting strategy: If no restarting strategy is configured, reactive mode will fail your job, instead of scaling it.

- Downscaling in Reactive Mode might cause longer stalls in your processing because Flink waits for the heartbeat between JobManager and the stopped TaskManager(s) to time out. You will see that your Flink job is stuck for roughly 50 seconds before redeploying your job with a lower parallelism.

The default timeout is configured to 50 seconds. Adjust the [`heartbeat.timeout`]({{< ref "docs/deployment/config">}}#heartbeat-timeout) configuration to a lower value, if your infrastructure permits this. Setting a low heartbeat timeout can lead to failures if a TaskManager fails to respond to a heartbeat, for example due to a network congestion or a long garbage collection pause. Note that the [`heartbeat.interval`]({{< ref "docs/deployment/config">}}#heartbeat-interval) always needs to be lower than the timeout.


### Limitations

Expand All @@ -100,13 +110,38 @@ Since Reactive Mode is a new, experimental feature, not all features supported b
- **Deployment is only supported as a standalone application deployment**. Active resource providers (such as native Kubernetes, YARN or Mesos) are explicitly not supported. Standalone session clusters are not supported either. The application deployment is limited to single job applications.

The only supported deployment options are [Standalone in Application Mode]({{< ref "docs/deployment/resource-providers/standalone/overview" >}}#application-mode) ([described](#getting-started) on this page), [Docker in Application Mode]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#application-mode-on-docker) and [Standalone Kubernetes Application Cluster]({{< ref "docs/deployment/resource-providers/standalone/kubernetes" >}}#deploy-application-cluster).
- **Streaming jobs only**: The first version of Reactive Mode runs with streaming jobs only. When submitting a batch job, then the default scheduler will be used.
- **No support for [local recovery]({{< ref "docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery is a feature that schedules tasks to machines so that the state on that machine gets re-used if possible. The lack of this feature means that Reactive Mode will always need to download the entire state from the checkpoint storage.
- **No support for local failover**: Local failover means that the scheduler is able to restart parts ("regions" in Flink's internals) of a failed job, instead of the entire job. This limitation impacts only recovery time of embarrassingly parallel jobs: Flink's default scheduler can restart failed parts, while Reactive Mode will restart the entire job.
- **Limited integration with Flink's Web UI**: Reactive Mode allows that a job's parallelism can change over its lifetime. The web UI only shows the current parallelism the job.
- **Limited Job metrics**: With the exception of `numRestarts` all [availability]({{< ref "docs/ops/metrics" >}}#availability) and [checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the `Job` scope are not working correctly.

The [limitations of Adaptive Scheduler](#limitations-1) also apply to Reactive Mode.


## Adaptive Scheduler

{{< hint danger >}}
Using Adaptive Scheduler directly (not through Reactive Mode) is only advised for advanced users.
{{< /hint >}}

The Adaptive Scheduler can adjust the parallelism of a job based on available slots. It will automatically reduce the parallelism if not enough slots are available to run the job with the originally configured parallelism; be it due to not enough resources being available at the time of submission, or TaskManager outages during the job execution. If new slots become available the job will be scaled up again, up to the configured parallelism.
In Reactive Mode (see above) the configured parallelism is ignored and treated as if it was set to infinity, letting the job always use as many resources as possible.
You can also use Adaptive Scheduler without Reactive Mode, but there are some practical limitations:
- If you are using Adaptive Scheduler on a session cluster, there are no guarantees regarding the distribution of slots between multiple running jobs in the same session.

One benefit of Adpative Scheduler over the default scheduler is that it can handle TaskManager losses gracefully, since it would just scale down in these cases.

### Usage

The following configuration parameters need to be set:

- `jobmanager.scheduler: adaptive`: Change from the default scheduler to adaptive scheduler
- `cluster.declarative-resource-management.enabled` Declarative resource management must be enabled (enabled by default).

### Limitations

- **Streaming jobs only**: The first version of Adaptive Scheduler runs with streaming jobs only. When submitting a batch job, we will automatically fall back to the default scheduler.
- **No support for [local recovery]({{< ref "docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery is a feature that schedules tasks to machines so that the state on that machine gets re-used if possible. The lack of this feature means that Adaptive Scheduler will always need to download the entire state from the checkpoint storage.
- **No support for local failover**: Local failover means that the scheduler is able to restart parts ("regions" in Flink's internals) of a failed job, instead of the entire job. This limitation impacts only recovery time of embarrassingly parallel jobs: Flink's default scheduler can restart failed parts, while Adaptive Scheduler will restart the entire job.
- **Limited integration with Flink's Web UI**: Adaptive Scheduler allows that a job's parallelism can change over its lifetime. The web UI only shows the current parallelism the job.
- **Limited Job metrics**: With the exception of `numRestarts` all [availability]({{< ref "docs/ops/metrics" >}}#availability) and [checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the `Job` scope are not working correctly.
- **Unused slots**: If the max parallelism for slot sharing groups is not equal, slots offered to Adaptive Scheduler might be unused.


{{< top >}}
39 changes: 34 additions & 5 deletions docs/content/docs/deployment/elastic_scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,10 @@ In scenarios where TaskManagers are not connecting at the same time, but slowly

- **Configure periodic checkpointing for stateful jobs**: Reactive mode restores from the latest completed checkpoint on a rescale event. If no periodic checkpointing is enabled, your program will loose its state. Checkpointing also configures a **restart strategy**. Reactive mode will respect the configured restarting strategy: If no restarting strategy is configured, reactive mode will fail your job, instead of scaling it.

- Downscaling in Reactive Mode might cause longer stalls in your processing because Flink waits for the heartbeat between JobManager and the stopped TaskManager(s) to time out. You will see that your Flink job is stuck for roughly 50 seconds before redeploying your job with a lower parallelism.

The default timeout is configured to 50 seconds. Adjust the [`heartbeat.timeout`]({{< ref "docs/deployment/config">}}#heartbeat-timeout) configuration to a lower value, if your infrastructure permits this. Setting a low heartbeat timeout can lead to failures if a TaskManager fails to respond to a heartbeat, for example due to a network congestion or a long garbage collection pause. Note that the [`heartbeat.interval`]({{< ref "docs/deployment/config">}}#heartbeat-interval) always needs to be lower than the timeout.


### Limitations

Expand All @@ -106,13 +110,38 @@ Since Reactive Mode is a new, experimental feature, not all features supported b
- **Deployment is only supported as a standalone application deployment**. Active resource providers (such as native Kubernetes, YARN or Mesos) are explicitly not supported. Standalone session clusters are not supported either. The application deployment is limited to single job applications.

The only supported deployment options are [Standalone in Application Mode]({{< ref "docs/deployment/resource-providers/standalone/overview" >}}#application-mode) ([described](#getting-started) on this page), [Docker in Application Mode]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#application-mode-on-docker) and [Standalone Kubernetes Application Cluster]({{< ref "docs/deployment/resource-providers/standalone/kubernetes" >}}#deploy-application-cluster).
- **Streaming jobs only**: The first version of Reactive Mode runs with streaming jobs only. When submitting a batch job, then the default scheduler will be used.
- **No support for [local recovery]({{< ref "docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery is a feature that schedules tasks to machines so that the state on that machine gets re-used if possible. The lack of this feature means that Reactive Mode will always need to download the entire state from the checkpoint storage.
- **No support for local failover**: Local failover means that the scheduler is able to restart parts ("regions" in Flink's internals) of a failed job, instead of the entire job. This limitation impacts only recovery time of embarrassingly parallel jobs: Flink's default scheduler can restart failed parts, while Reactive Mode will restart the entire job.
- **Limited integration with Flink's Web UI**: Reactive Mode allows that a job's parallelism can change over its lifetime. The web UI only shows the current parallelism the job.
- **Limited Job metrics**: With the exception of `numRestarts` all [availability]({{< ref "docs/ops/metrics" >}}#availability) and [checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the `Job` scope are not working correctly.

The [limitations of Adaptive Scheduler](#limitations-1) also apply to Reactive Mode.


## Adaptive Scheduler

{{< hint danger >}}
Using Adaptive Scheduler directly (not through Reactive Mode) is only advised for advanced users.
{{< /hint >}}

The Adaptive Scheduler can adjust the parallelism of a job based on available slots. It will automatically reduce the parallelism if not enough slots are available to run the job with the originally configured parallelism; be it due to not enough resources being available at the time of submission, or TaskManager outages during the job execution. If new slots become available the job will be scaled up again, up to the configured parallelism.
In Reactive Mode (see above) the configured parallelism is ignored and treated as if it was set to infinity, letting the job always use as many resources as possible.
You can also use Adaptive Scheduler without Reactive Mode, but there are some practical limitations:
- If you are using Adaptive Scheduler on a session cluster, there are no guarantees regarding the distribution of slots between multiple running jobs in the same session.

One benefit of Adpative Scheduler over the default scheduler is that it can handle TaskManager losses gracefully, since it would just scale down in these cases.

### Usage

The following configuration parameters need to be set:

- `jobmanager.scheduler: adaptive`: Change from the default scheduler to adaptive scheduler
- `cluster.declarative-resource-management.enabled` Declarative resource management must be enabled (enabled by default).

### Limitations

- **Streaming jobs only**: The first version of Adaptive Scheduler runs with streaming jobs only. When submitting a batch job, we will automatically fall back to the default scheduler.
- **No support for [local recovery]({{< ref "docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery is a feature that schedules tasks to machines so that the state on that machine gets re-used if possible. The lack of this feature means that Adaptive Scheduler will always need to download the entire state from the checkpoint storage.
- **No support for local failover**: Local failover means that the scheduler is able to restart parts ("regions" in Flink's internals) of a failed job, instead of the entire job. This limitation impacts only recovery time of embarrassingly parallel jobs: Flink's default scheduler can restart failed parts, while Adaptive Scheduler will restart the entire job.
- **Limited integration with Flink's Web UI**: Adaptive Scheduler allows that a job's parallelism can change over its lifetime. The web UI only shows the current parallelism the job.
- **Limited Job metrics**: With the exception of `numRestarts` all [availability]({{< ref "docs/ops/metrics" >}}#availability) and [checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the `Job` scope are not working correctly.
- **Unused slots**: If the max parallelism for slot sharing groups is not equal, slots offered to Adaptive Scheduler might be unused.


{{< top >}}

0 comments on commit c916622

Please sign in to comment.