From c91662299993be8b4445df665251abe9669a1cd8 Mon Sep 17 00:00:00 2001 From: Robert Metzger Date: Tue, 23 Mar 2021 21:58:16 +0100 Subject: [PATCH] [FLINK-21076][docs] Add section about Adaptive Scheduler This closes #15355 --- .../docs/deployment/elastic_scaling.md | 45 ++++++++++++++++--- .../docs/deployment/elastic_scaling.md | 39 +++++++++++++--- 2 files changed, 74 insertions(+), 10 deletions(-) diff --git a/docs/content.zh/docs/deployment/elastic_scaling.md b/docs/content.zh/docs/deployment/elastic_scaling.md index 9d0881c821749..0a99be9628c33 100644 --- a/docs/content.zh/docs/deployment/elastic_scaling.md +++ b/docs/content.zh/docs/deployment/elastic_scaling.md @@ -88,10 +88,20 @@ If you manually set a parallelism in your job for individual operators or the en Note that such a high maxParallelism might affect performance of the job, since more internal structures are needed to maintain [some internal structures](https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html) of Flink. +When enabling Reactive Mode, the `jobmanager.adaptive-scheduler.resource-wait-timeout` configuration key will default to `-1`. This means that the JobManager will run forever waiting for sufficient resources. +If you want the JobManager to stop after a certain time without enough TaskManagers to run the job, configure `jobmanager.adaptive-scheduler.resource-wait-timeout`. + +With Reactive Mode enabled, the `jobmanager.adaptive-scheduler.resource-stabilization-timeout` configuration key will default to `0`: Flink will start runnning the job, as soon as there are sufficient resources available. +In scenarios where TaskManagers are not connecting at the same time, but slowly one after another, this behavior leads to a job restart whenever a TaskManager connects. Increase this configuration value if you want to wait for the resources to stabilize before scheduling the job. + #### Recommendations - **Configure periodic checkpointing for stateful jobs**: Reactive mode restores from the latest completed checkpoint on a rescale event. If no periodic checkpointing is enabled, your program will loose its state. Checkpointing also configures a **restart strategy**. Reactive mode will respect the configured restarting strategy: If no restarting strategy is configured, reactive mode will fail your job, instead of scaling it. +- Downscaling in Reactive Mode might cause longer stalls in your processing because Flink waits for the heartbeat between JobManager and the stopped TaskManager(s) to time out. You will see that your Flink job is stuck for roughly 50 seconds before redeploying your job with a lower parallelism. + + The default timeout is configured to 50 seconds. Adjust the [`heartbeat.timeout`]({{< ref "docs/deployment/config">}}#heartbeat-timeout) configuration to a lower value, if your infrastructure permits this. Setting a low heartbeat timeout can lead to failures if a TaskManager fails to respond to a heartbeat, for example due to a network congestion or a long garbage collection pause. Note that the [`heartbeat.interval`]({{< ref "docs/deployment/config">}}#heartbeat-interval) always needs to be lower than the timeout. + ### Limitations @@ -100,13 +110,38 @@ Since Reactive Mode is a new, experimental feature, not all features supported b - **Deployment is only supported as a standalone application deployment**. Active resource providers (such as native Kubernetes, YARN or Mesos) are explicitly not supported. Standalone session clusters are not supported either. The application deployment is limited to single job applications. The only supported deployment options are [Standalone in Application Mode]({{< ref "docs/deployment/resource-providers/standalone/overview" >}}#application-mode) ([described](#getting-started) on this page), [Docker in Application Mode]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#application-mode-on-docker) and [Standalone Kubernetes Application Cluster]({{< ref "docs/deployment/resource-providers/standalone/kubernetes" >}}#deploy-application-cluster). -- **Streaming jobs only**: The first version of Reactive Mode runs with streaming jobs only. When submitting a batch job, then the default scheduler will be used. -- **No support for [local recovery]({{< ref "docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery is a feature that schedules tasks to machines so that the state on that machine gets re-used if possible. The lack of this feature means that Reactive Mode will always need to download the entire state from the checkpoint storage. -- **No support for local failover**: Local failover means that the scheduler is able to restart parts ("regions" in Flink's internals) of a failed job, instead of the entire job. This limitation impacts only recovery time of embarrassingly parallel jobs: Flink's default scheduler can restart failed parts, while Reactive Mode will restart the entire job. -- **Limited integration with Flink's Web UI**: Reactive Mode allows that a job's parallelism can change over its lifetime. The web UI only shows the current parallelism the job. -- **Limited Job metrics**: With the exception of `numRestarts` all [availability]({{< ref "docs/ops/metrics" >}}#availability) and [checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the `Job` scope are not working correctly. +The [limitations of Adaptive Scheduler](#limitations-1) also apply to Reactive Mode. + + +## Adaptive Scheduler + +{{< hint danger >}} +Using Adaptive Scheduler directly (not through Reactive Mode) is only advised for advanced users. +{{< /hint >}} +The Adaptive Scheduler can adjust the parallelism of a job based on available slots. It will automatically reduce the parallelism if not enough slots are available to run the job with the originally configured parallelism; be it due to not enough resources being available at the time of submission, or TaskManager outages during the job execution. If new slots become available the job will be scaled up again, up to the configured parallelism. +In Reactive Mode (see above) the configured parallelism is ignored and treated as if it was set to infinity, letting the job always use as many resources as possible. +You can also use Adaptive Scheduler without Reactive Mode, but there are some practical limitations: +- If you are using Adaptive Scheduler on a session cluster, there are no guarantees regarding the distribution of slots between multiple running jobs in the same session. + +One benefit of Adpative Scheduler over the default scheduler is that it can handle TaskManager losses gracefully, since it would just scale down in these cases. + +### Usage + +The following configuration parameters need to be set: + +- `jobmanager.scheduler: adaptive`: Change from the default scheduler to adaptive scheduler +- `cluster.declarative-resource-management.enabled` Declarative resource management must be enabled (enabled by default). + +### Limitations + +- **Streaming jobs only**: The first version of Adaptive Scheduler runs with streaming jobs only. When submitting a batch job, we will automatically fall back to the default scheduler. +- **No support for [local recovery]({{< ref "docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery is a feature that schedules tasks to machines so that the state on that machine gets re-used if possible. The lack of this feature means that Adaptive Scheduler will always need to download the entire state from the checkpoint storage. +- **No support for local failover**: Local failover means that the scheduler is able to restart parts ("regions" in Flink's internals) of a failed job, instead of the entire job. This limitation impacts only recovery time of embarrassingly parallel jobs: Flink's default scheduler can restart failed parts, while Adaptive Scheduler will restart the entire job. +- **Limited integration with Flink's Web UI**: Adaptive Scheduler allows that a job's parallelism can change over its lifetime. The web UI only shows the current parallelism the job. +- **Limited Job metrics**: With the exception of `numRestarts` all [availability]({{< ref "docs/ops/metrics" >}}#availability) and [checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the `Job` scope are not working correctly. +- **Unused slots**: If the max parallelism for slot sharing groups is not equal, slots offered to Adaptive Scheduler might be unused. {{< top >}} diff --git a/docs/content/docs/deployment/elastic_scaling.md b/docs/content/docs/deployment/elastic_scaling.md index a6a78fce1139b..0a99be9628c33 100644 --- a/docs/content/docs/deployment/elastic_scaling.md +++ b/docs/content/docs/deployment/elastic_scaling.md @@ -98,6 +98,10 @@ In scenarios where TaskManagers are not connecting at the same time, but slowly - **Configure periodic checkpointing for stateful jobs**: Reactive mode restores from the latest completed checkpoint on a rescale event. If no periodic checkpointing is enabled, your program will loose its state. Checkpointing also configures a **restart strategy**. Reactive mode will respect the configured restarting strategy: If no restarting strategy is configured, reactive mode will fail your job, instead of scaling it. +- Downscaling in Reactive Mode might cause longer stalls in your processing because Flink waits for the heartbeat between JobManager and the stopped TaskManager(s) to time out. You will see that your Flink job is stuck for roughly 50 seconds before redeploying your job with a lower parallelism. + + The default timeout is configured to 50 seconds. Adjust the [`heartbeat.timeout`]({{< ref "docs/deployment/config">}}#heartbeat-timeout) configuration to a lower value, if your infrastructure permits this. Setting a low heartbeat timeout can lead to failures if a TaskManager fails to respond to a heartbeat, for example due to a network congestion or a long garbage collection pause. Note that the [`heartbeat.interval`]({{< ref "docs/deployment/config">}}#heartbeat-interval) always needs to be lower than the timeout. + ### Limitations @@ -106,13 +110,38 @@ Since Reactive Mode is a new, experimental feature, not all features supported b - **Deployment is only supported as a standalone application deployment**. Active resource providers (such as native Kubernetes, YARN or Mesos) are explicitly not supported. Standalone session clusters are not supported either. The application deployment is limited to single job applications. The only supported deployment options are [Standalone in Application Mode]({{< ref "docs/deployment/resource-providers/standalone/overview" >}}#application-mode) ([described](#getting-started) on this page), [Docker in Application Mode]({{< ref "docs/deployment/resource-providers/standalone/docker" >}}#application-mode-on-docker) and [Standalone Kubernetes Application Cluster]({{< ref "docs/deployment/resource-providers/standalone/kubernetes" >}}#deploy-application-cluster). -- **Streaming jobs only**: The first version of Reactive Mode runs with streaming jobs only. When submitting a batch job, then the default scheduler will be used. -- **No support for [local recovery]({{< ref "docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery is a feature that schedules tasks to machines so that the state on that machine gets re-used if possible. The lack of this feature means that Reactive Mode will always need to download the entire state from the checkpoint storage. -- **No support for local failover**: Local failover means that the scheduler is able to restart parts ("regions" in Flink's internals) of a failed job, instead of the entire job. This limitation impacts only recovery time of embarrassingly parallel jobs: Flink's default scheduler can restart failed parts, while Reactive Mode will restart the entire job. -- **Limited integration with Flink's Web UI**: Reactive Mode allows that a job's parallelism can change over its lifetime. The web UI only shows the current parallelism the job. -- **Limited Job metrics**: With the exception of `numRestarts` all [availability]({{< ref "docs/ops/metrics" >}}#availability) and [checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the `Job` scope are not working correctly. +The [limitations of Adaptive Scheduler](#limitations-1) also apply to Reactive Mode. + + +## Adaptive Scheduler + +{{< hint danger >}} +Using Adaptive Scheduler directly (not through Reactive Mode) is only advised for advanced users. +{{< /hint >}} + +The Adaptive Scheduler can adjust the parallelism of a job based on available slots. It will automatically reduce the parallelism if not enough slots are available to run the job with the originally configured parallelism; be it due to not enough resources being available at the time of submission, or TaskManager outages during the job execution. If new slots become available the job will be scaled up again, up to the configured parallelism. +In Reactive Mode (see above) the configured parallelism is ignored and treated as if it was set to infinity, letting the job always use as many resources as possible. +You can also use Adaptive Scheduler without Reactive Mode, but there are some practical limitations: +- If you are using Adaptive Scheduler on a session cluster, there are no guarantees regarding the distribution of slots between multiple running jobs in the same session. +One benefit of Adpative Scheduler over the default scheduler is that it can handle TaskManager losses gracefully, since it would just scale down in these cases. + +### Usage + +The following configuration parameters need to be set: + +- `jobmanager.scheduler: adaptive`: Change from the default scheduler to adaptive scheduler +- `cluster.declarative-resource-management.enabled` Declarative resource management must be enabled (enabled by default). + +### Limitations + +- **Streaming jobs only**: The first version of Adaptive Scheduler runs with streaming jobs only. When submitting a batch job, we will automatically fall back to the default scheduler. +- **No support for [local recovery]({{< ref "docs/ops/state/large_state_tuning">}}#task-local-recovery)**: Local recovery is a feature that schedules tasks to machines so that the state on that machine gets re-used if possible. The lack of this feature means that Adaptive Scheduler will always need to download the entire state from the checkpoint storage. +- **No support for local failover**: Local failover means that the scheduler is able to restart parts ("regions" in Flink's internals) of a failed job, instead of the entire job. This limitation impacts only recovery time of embarrassingly parallel jobs: Flink's default scheduler can restart failed parts, while Adaptive Scheduler will restart the entire job. +- **Limited integration with Flink's Web UI**: Adaptive Scheduler allows that a job's parallelism can change over its lifetime. The web UI only shows the current parallelism the job. +- **Limited Job metrics**: With the exception of `numRestarts` all [availability]({{< ref "docs/ops/metrics" >}}#availability) and [checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the `Job` scope are not working correctly. +- **Unused slots**: If the max parallelism for slot sharing groups is not equal, slots offered to Adaptive Scheduler might be unused. {{< top >}}