Skip to content

Commit

Permalink
[FLINK-22244][docs] Adjust Reactive Mode docs
Browse files Browse the repository at this point in the history
This closes apache#15592
  • Loading branch information
rmetzger committed Apr 16, 2021
1 parent 50af465 commit 3e76dac
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 9 deletions.
10 changes: 7 additions & 3 deletions docs/content.zh/docs/deployment/elastic_scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Reactive Mode configures a job so that it always uses all resources available in

Reactive Mode restarts a job on a rescaling event, restoring it from the latest completed checkpoint. This means that there is no overhead of creating a savepoint (which is needed for manually rescaling a job). Also, the amount of data that is reprocessed after rescaling depends on the checkpointing interval, and the restore time depends on the state size.

The Reactive Mode allows Flink users to implement a powerful autoscaling mechanism, by having an external service monitor certain metrics, such as consumer lag, aggregate CPU utilization, throughput or latency. As soon as these metrics are above or below a certain threshold, additional TaskManagers can be added or removed from the Flink cluster. This could be implemented through changing the [replica factor](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#replicas) of a Kubernetes deployment, or an [autoscaling](https://docs.aws.amazon.com/autoscaling/ec2/userguide/AutoScalingGroup.html) group. This external service only needs to handle the resource allocation and deallocation. Flink will take care of keeping the job running with the resources available.
The Reactive Mode allows Flink users to implement a powerful autoscaling mechanism, by having an external service monitor certain metrics, such as consumer lag, aggregate CPU utilization, throughput or latency. As soon as these metrics are above or below a certain threshold, additional TaskManagers can be added or removed from the Flink cluster. This could be implemented through changing the [replica factor](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#replicas) of a Kubernetes deployment, or an [autoscaling group](https://docs.aws.amazon.com/autoscaling/ec2/userguide/AutoScalingGroup.html) on AWS. This external service only needs to handle the resource allocation and deallocation. Flink will take care of keeping the job running with the resources available.

### Getting started

Expand Down Expand Up @@ -93,11 +93,12 @@ the [best practices for parallelism]({{< ref "docs/ops/production_ready" >}}#set

Note that such a high max parallelism might affect performance of the job, since more internal structures are needed to maintain [some internal structures](https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html) of Flink.

When enabling Reactive Mode, the `jobmanager.adaptive-scheduler.resource-wait-timeout` configuration key will default to `-1`. This means that the JobManager will run forever waiting for sufficient resources.
When enabling Reactive Mode, the [`jobmanager.adaptive-scheduler.resource-wait-timeout`]({{< ref "docs/deployment/config">}}#jobmanager-adaptive-scheduler-resource-wait-timeout) configuration key will default to `-1`. This means that the JobManager will run forever waiting for sufficient resources.
If you want the JobManager to stop after a certain time without enough TaskManagers to run the job, configure `jobmanager.adaptive-scheduler.resource-wait-timeout`.

With Reactive Mode enabled, the `jobmanager.adaptive-scheduler.resource-stabilization-timeout` configuration key will default to `0`: Flink will start runnning the job, as soon as there are sufficient resources available.
With Reactive Mode enabled, the [`jobmanager.adaptive-scheduler.resource-stabilization-timeout`]({{< ref "docs/deployment/config">}}#jobmanager-adaptive-scheduler-resource-stabilization-timeout) configuration key will default to `0`: Flink will start runnning the job, as soon as there are sufficient resources available.
In scenarios where TaskManagers are not connecting at the same time, but slowly one after another, this behavior leads to a job restart whenever a TaskManager connects. Increase this configuration value if you want to wait for the resources to stabilize before scheduling the job.
Additionally, one can configure [`jobmanager.adaptive-scheduler.min-parallelism-increase`]({{< ref "docs/deployment/config">}}#jobmanager-adaptive-scheduler-min-parallelism-increase): This configuration option specifices the minumum amount of additional, aggregate parallelism increase before triggering a scale-up. For example if you have a job with a source (parallelism=2) and a sink (parallelism=2), the aggregate parallelism is 4. By default, the configuration key is set to 1, so any increase in the aggregate parallelism will trigger a restart.

#### Recommendations

Expand Down Expand Up @@ -139,6 +140,8 @@ The following configuration parameters need to be set:
- `jobmanager.scheduler: adaptive`: Change from the default scheduler to adaptive scheduler
- `cluster.declarative-resource-management.enabled` Declarative resource management must be enabled (enabled by default).

The behavior of Adaptive Scheduler is configured by [all configuration options containing `adaptive-scheduler`]({{< ref "docs/deployment/config">}}#advanced-scheduling-options) in their name.

### Limitations

- **Streaming jobs only**: The first version of Adaptive Scheduler runs with streaming jobs only. When submitting a batch job, we will automatically fall back to the default scheduler.
Expand All @@ -147,6 +150,7 @@ The following configuration parameters need to be set:
- **Limited integration with Flink's Web UI**: Adaptive Scheduler allows that a job's parallelism can change over its lifetime. The web UI only shows the current parallelism the job.
- **Limited Job metrics**: With the exception of `numRestarts` all [availability]({{< ref "docs/ops/metrics" >}}#availability) and [checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the `Job` scope are not working correctly.
- **Unused slots**: If the max parallelism for slot sharing groups is not equal, slots offered to Adaptive Scheduler might be unused.
- Scaling events trigger job and task restarts, which will increase the number of Task attempts.


{{< top >}}
10 changes: 7 additions & 3 deletions docs/content/docs/deployment/elastic_scaling.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ Reactive Mode configures a job so that it always uses all resources available in

Reactive Mode restarts a job on a rescaling event, restoring it from the latest completed checkpoint. This means that there is no overhead of creating a savepoint (which is needed for manually rescaling a job). Also, the amount of data that is reprocessed after rescaling depends on the checkpointing interval, and the restore time depends on the state size.

The Reactive Mode allows Flink users to implement a powerful autoscaling mechanism, by having an external service monitor certain metrics, such as consumer lag, aggregate CPU utilization, throughput or latency. As soon as these metrics are above or below a certain threshold, additional TaskManagers can be added or removed from the Flink cluster. This could be implemented through changing the [replica factor](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#replicas) of a Kubernetes deployment, or an [autoscaling](https://docs.aws.amazon.com/autoscaling/ec2/userguide/AutoScalingGroup.html) group. This external service only needs to handle the resource allocation and deallocation. Flink will take care of keeping the job running with the resources available.
The Reactive Mode allows Flink users to implement a powerful autoscaling mechanism, by having an external service monitor certain metrics, such as consumer lag, aggregate CPU utilization, throughput or latency. As soon as these metrics are above or below a certain threshold, additional TaskManagers can be added or removed from the Flink cluster. This could be implemented through changing the [replica factor](https://kubernetes.io/docs/concepts/workloads/controllers/deployment/#replicas) of a Kubernetes deployment, or an [autoscaling group](https://docs.aws.amazon.com/autoscaling/ec2/userguide/AutoScalingGroup.html) on AWS. This external service only needs to handle the resource allocation and deallocation. Flink will take care of keeping the job running with the resources available.

### Getting started

Expand Down Expand Up @@ -93,11 +93,12 @@ the [best practices for parallelism]({{< ref "docs/ops/production_ready" >}}#set

Note that such a high max parallelism might affect performance of the job, since more internal structures are needed to maintain [some internal structures](https://flink.apache.org/features/2017/07/04/flink-rescalable-state.html) of Flink.

When enabling Reactive Mode, the `jobmanager.adaptive-scheduler.resource-wait-timeout` configuration key will default to `-1`. This means that the JobManager will run forever waiting for sufficient resources.
When enabling Reactive Mode, the [`jobmanager.adaptive-scheduler.resource-wait-timeout`]({{< ref "docs/deployment/config">}}#jobmanager-adaptive-scheduler-resource-wait-timeout) configuration key will default to `-1`. This means that the JobManager will run forever waiting for sufficient resources.
If you want the JobManager to stop after a certain time without enough TaskManagers to run the job, configure `jobmanager.adaptive-scheduler.resource-wait-timeout`.

With Reactive Mode enabled, the `jobmanager.adaptive-scheduler.resource-stabilization-timeout` configuration key will default to `0`: Flink will start runnning the job, as soon as there are sufficient resources available.
With Reactive Mode enabled, the [`jobmanager.adaptive-scheduler.resource-stabilization-timeout`]({{< ref "docs/deployment/config">}}#jobmanager-adaptive-scheduler-resource-stabilization-timeout) configuration key will default to `0`: Flink will start runnning the job, as soon as there are sufficient resources available.
In scenarios where TaskManagers are not connecting at the same time, but slowly one after another, this behavior leads to a job restart whenever a TaskManager connects. Increase this configuration value if you want to wait for the resources to stabilize before scheduling the job.
Additionally, one can configure [`jobmanager.adaptive-scheduler.min-parallelism-increase`]({{< ref "docs/deployment/config">}}#jobmanager-adaptive-scheduler-min-parallelism-increase): This configuration option specifices the minumum amount of additional, aggregate parallelism increase before triggering a scale-up. For example if you have a job with a source (parallelism=2) and a sink (parallelism=2), the aggregate parallelism is 4. By default, the configuration key is set to 1, so any increase in the aggregate parallelism will trigger a restart.

#### Recommendations

Expand Down Expand Up @@ -139,6 +140,8 @@ The following configuration parameters need to be set:
- `jobmanager.scheduler: adaptive`: Change from the default scheduler to adaptive scheduler
- `cluster.declarative-resource-management.enabled` Declarative resource management must be enabled (enabled by default).

The behavior of Adaptive Scheduler is configured by [all configuration options containing `adaptive-scheduler`]({{< ref "docs/deployment/config">}}#advanced-scheduling-options) in their name.

### Limitations

- **Streaming jobs only**: The first version of Adaptive Scheduler runs with streaming jobs only. When submitting a batch job, we will automatically fall back to the default scheduler.
Expand All @@ -147,6 +150,7 @@ The following configuration parameters need to be set:
- **Limited integration with Flink's Web UI**: Adaptive Scheduler allows that a job's parallelism can change over its lifetime. The web UI only shows the current parallelism the job.
- **Limited Job metrics**: With the exception of `numRestarts` all [availability]({{< ref "docs/ops/metrics" >}}#availability) and [checkpointing]({{< ref "docs/ops/metrics" >}}#checkpointing) metrics with the `Job` scope are not working correctly.
- **Unused slots**: If the max parallelism for slot sharing groups is not equal, slots offered to Adaptive Scheduler might be unused.
- Scaling events trigger job and task restarts, which will increase the number of Task attempts.


{{< top >}}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available. Once this timeout has passed, the job will start executing with the available resources.<br />If <code class="highlighter-rouge">scheduler-mode</code> is configured to <code class="highlighter-rouge">REACTIVE</code>, this configuration value will default to 0, so that jobs are starting immediately with the available resources.</td>
<td>The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available. The timeout starts once sufficient resources for running the job are available. Once this timeout has passed, the job will start executing with the available resources.<br />If <code class="highlighter-rouge">scheduler-mode</code> is configured to <code class="highlighter-rouge">REACTIVE</code>, this configuration value will default to 0, so that jobs are starting immediately with the available resources.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-wait-timeout</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available. Once this timeout has passed, the job will start executing with the available resources.<br />If <code class="highlighter-rouge">scheduler-mode</code> is configured to <code class="highlighter-rouge">REACTIVE</code>, this configuration value will default to 0, so that jobs are starting immediately with the available resources.</td>
<td>The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available. The timeout starts once sufficient resources for running the job are available. Once this timeout has passed, the job will start executing with the available resources.<br />If <code class="highlighter-rouge">scheduler-mode</code> is configured to <code class="highlighter-rouge">REACTIVE</code>, this configuration value will default to 0, so that jobs are starting immediately with the available resources.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-wait-timeout</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
<td><h5>jobmanager.adaptive-scheduler.resource-stabilization-timeout</h5></td>
<td style="word-wrap: break-word;">10 s</td>
<td>Duration</td>
<td>The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available. Once this timeout has passed, the job will start executing with the available resources.<br />If <code class="highlighter-rouge">scheduler-mode</code> is configured to <code class="highlighter-rouge">REACTIVE</code>, this configuration value will default to 0, so that jobs are starting immediately with the available resources.</td>
<td>The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available. The timeout starts once sufficient resources for running the job are available. Once this timeout has passed, the job will start executing with the available resources.<br />If <code class="highlighter-rouge">scheduler-mode</code> is configured to <code class="highlighter-rouge">REACTIVE</code>, this configuration value will default to 0, so that jobs are starting immediately with the available resources.</td>
</tr>
<tr>
<td><h5>jobmanager.adaptive-scheduler.resource-wait-timeout</h5></td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -432,6 +432,7 @@ public enum SchedulerType {
Description.builder()
.text(
"The resource stabilization timeout defines the time the JobManager will wait if fewer than the desired but sufficient resources are available. "
+ "The timeout starts once sufficient resources for running the job are available. "
+ "Once this timeout has passed, the job will start executing with the available resources.")
.linebreak()
.text(
Expand Down

0 comments on commit 3e76dac

Please sign in to comment.