Skip to content

Commit

Permalink
[FLINK-28139][docs] Add documentation for speculative execution
Browse files Browse the repository at this point in the history
This closes apache#20507.
  • Loading branch information
zhuzhurk committed Aug 23, 2022
1 parent 65046d5 commit 70d9f6c
Show file tree
Hide file tree
Showing 4 changed files with 251 additions and 0 deletions.
92 changes: 92 additions & 0 deletions docs/content.zh/docs/deployment/speculative_execution.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
---
title: Speculative Execution
weight: 5
type: docs

---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http:https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# 预测执行
这个文档描述了预测执行的背景,使用方法,以及如何验证其有效性。

## 背景
预测执行是一种用于缓解异常机器节点导致作业执行缓慢的机制。机器节点异常包括硬件异常,偶发的输入输出繁忙,高 CPU 负载等问题。
这些问题会导致运行在其上的任务比起在其他节点上运行的任务慢很多,从而影响到整个作业的执行时长。

在这种情况下,预测执行会为这些慢任务创建一些新的执行实例并部署在正常的机器节点上。这些新的执行实例和其对应的老执行实例(慢任务)
会消费相同的数据,并产出相同的结果。而那些老执行实例也会被保留继续执行。这些执行实例(包括新实例和老实例)中首先成功结束的执行
实例会被认可,其产出的结果会对下游任务可见,其他实例则会被取消掉。

为了实现这个机制,Flink 会通过一个慢任务检测器来检测慢任务。检测到的慢任务位于的机器节点会被识别为异常机器节点,并被加入机器
节点黑名单中。调度器则会为这些慢节点创建新的执行实例,并将其部署到未被加黑的机器节点上。

## 使用方法
本章节描述了如何使用预测执行,包含如何启用,调优,以及开发/改进自定义 source 来支持预测执行。

{{< hint warning >}}
注意: Flink 尚不支持 sink 的预测执行。这个能力会在后续版本中得到完善。
{{< /hint >}}

{{< hint warning >}}
注意:Flink 不支持 DataSet 作业的预测执行,因为 DataSet API 在不久的将来会被废弃。现在推荐使用 DataStream API 来开发 Flink 批处理作业。
{{< /hint >}}

### 启用预测执行
要启用预测执行,你需要设置以下配置项:
- `jobmanager.scheduler: AdaptiveBatch`
- 因为当前只有 [Adaptive Batch Scheduler]({{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler) 支持预测执行.
- `jobmanager.adaptive-batch-scheduler.speculative.enabled: true`

### 配置调优
考虑到不同作业的差异,为了让预测执行获得更好的效果,你可以调优下列调度器配置项:
- [`jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-speculative-max-concurrent-e)
- [`jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-speculative-block-slow-node)

你还可以调优下列慢任务检测器的配置项:
- [`slow-task-detector.check-interval`]({{< ref "docs/deployment/config" >}}#slow-task-detector-check-interval)
- [`slow-task-detector.execution-time.baseline-lower-bound`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-lower-bound)
- [`slow-task-detector.execution-time.baseline-multiplier`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-multiplier)
- [`slow-task-detector.execution-time.baseline-ratio`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-ratio)

### 让 Source 支持预测执行
如果你的作业有用到自定义 {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java" name="Source" >}},
并且这个 Source 用到了自定义的 {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceEvent.java" name="SourceEvent" >}},
你需要修改该 Source 的 {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java" name="SplitEnumerator" >}}
实现接口 {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsHandleExecutionAttemptSourceEvent.java" name="SupportsHandleExecutionAttemptSourceEvent" >}}。
```java
public interface SupportsHandleExecutionAttemptSourceEvent {
void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent);
}
```
这意味着 SplitEnumerator 需要知道是哪个执行实例发出了这个事件。否则,JobManager 会在收到 SourceEvent 的时候报错从而导致作业失败。

除此之外的 Source 不需要额外的改动就可以进行预测执行,包括
{{< gh_link file="/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java" name="SourceFunction Source" >}},
{{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java" name="InputFormat Source" >}},
和 {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java" name="新版 Source" >}}.
Apache Flink 官方提供的 Source 都支持预测执行。

## 检查预测执行的效果
在启用预测执行后,当出现慢任务触发预测执行时,Web UI 会在作业页面的节点信息的 `SubTasks` 分页展示预测执行实例。Web UI
还会在 `Overview``Task Managers` 页面展示当前被加黑的 TaskManager。

你还可以通过检查这些 [`指标`]({{< ref "docs/ops/metrics" >}}#预测执行) 来判断预测执行的有效性。

{{< top >}}
28 changes: 28 additions & 0 deletions docs/content.zh/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -2161,6 +2161,34 @@ logged by `SystemResourcesMetricsInitializer` during the startup.
</tbody>
</table>

### 预测执行

以下指标可以用来衡量预测执行的有效性。

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 18%">Scope</th>
<th class="text-left" style="width: 26%">Metrics</th>
<th class="text-left" style="width: 48%">Description</th>
<th class="text-left" style="width: 8%">Type</th>
</tr>
</thead>
<tbody>
<tr>
<th rowspan="2"><strong>Job (only available on JobManager)</strong></th>
<td>numSlowExecutionVertices</td>
<td>当前的慢执行节点数量。</td>
<td>Gauge</td>
</tr>
<tr>
<td>numEffectiveSpeculativeExecutions</td>
<td>有效的预测执行数量,即比初始执行实例更早结束的预测执行实例的数量。</td>
<td>Counter</td>
</tr>
</tbody>
</table>

## End-to-End latency tracking

Flink allows to track the latency of records travelling through the system. This feature is disabled by default.
Expand Down
102 changes: 102 additions & 0 deletions docs/content/docs/deployment/speculative_execution.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
---
title: Speculative Execution
weight: 5
type: docs

---
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http:https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing,
software distributed under the License is distributed on an
"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->

# Speculative Execution
This page describes the background of speculative execution, how to use it, and how to check the effectiveness of it.

## Background
Speculative execution is a mechanism to mitigate job slowness which is caused by problematic nodes.
A problematic node may have hardware problems, accident I/O busy, or high CPU load. These problems may
make the hosted tasks run much slower than tasks on other nodes, and affect the overall execution time
of a batch job.

In such cases, speculative execution will start new attempts of the slow task on nodes that are not
detected as problematic. The new attempts process the same input data and produces the same data as the
old one. The old attempt will not be affected and will keep running. The first finished attempt will be
admitted, its output will be seen and consumed by the downstream tasks, and the remaining attempts will be
canceled.

To achieve this, Flink uses the slow task detector to detect slow tasks. The nodes that the slow tasks
locate in will be identified as problematic nodes and get blocked via the blocklist mechanism. The scheduler
will create new attempts for the slow tasks and deploy them on nodes that are not blocked.

## Usage
This section describes how to use speculative execution, including how to enable it, how to tune it, and
how to develop/improve custom sources to work with speculative execution.

{{< hint warning >}}
Note: Flink does not support speculative execution of sinks yet and will support it in follow-up releases.
{{< /hint >}}

{{< hint warning >}}
Note: Flink does not support speculative execution of DataSet jobs because DataSet will be deprecated
in near future. DataStream API is now the recommended low level API to develop Flink batch jobs.
{{< /hint >}}

### Enable Speculative Execution
To enable speculative execution, you need to set the following configuration options:
- `jobmanager.scheduler: AdaptiveBatch`
- Because only [Adaptive Batch Scheduler]({{< ref "docs/deployment/elastic_scaling" >}}#adaptive-batch-scheduler) supports speculative execution.
- `jobmanager.adaptive-batch-scheduler.speculative.enabled: true`

### Tuning Configuration
To make speculative execution work better for different jobs, you can tune below configuration options of the scheduler:
- [`jobmanager.adaptive-batch-scheduler.speculative.max-concurrent-executions`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-speculative-max-concurrent-e)
- [`jobmanager.adaptive-batch-scheduler.speculative.block-slow-node-duration`]({{< ref "docs/deployment/config" >}}#jobmanager-adaptive-batch-scheduler-speculative-block-slow-node)

You can also tune below configuration options of the slow task detector:
- [`slow-task-detector.check-interval`]({{< ref "docs/deployment/config" >}}#slow-task-detector-check-interval)
- [`slow-task-detector.execution-time.baseline-lower-bound`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-lower-bound)
- [`slow-task-detector.execution-time.baseline-multiplier`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-multiplier)
- [`slow-task-detector.execution-time.baseline-ratio`]({{< ref "docs/deployment/config" >}}#slow-task-detector-execution-time-baseline-ratio)

### Enable Sources for Speculative Execution
If your job uses a custom {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java" name="Source" >}},
and the source uses custom {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/SourceEvent.java" name="SourceEvent" >}},
you need to change the {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/SplitEnumerator.java" name="SplitEnumerator" >}}
of that source to implement {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/SupportsHandleExecutionAttemptSourceEvent.java" name="SupportsHandleExecutionAttemptSourceEvent" >}}
interface.
```java
public interface SupportsHandleExecutionAttemptSourceEvent {
void handleSourceEvent(int subtaskId, int attemptNumber, SourceEvent sourceEvent);
}
```
This means the SplitEnumerator should be aware of the attempt which sends the event. Otherwise, exceptions
will happen when the job manager receives a source event from the tasks and lead to job failures.

No extra change is required for other sources to work with speculative execution, including
{{< gh_link file="/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/SourceFunction.java" name="SourceFunction sources" >}},
{{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/common/io/InputFormat.java" name="InputFormat sources" >}},
and {{< gh_link file="/flink-core/src/main/java/org/apache/flink/api/connector/source/Source.java" name="new sources" >}}.
All the source connectors offered by Apache Flink can work with speculative execution.

## Checking the Effectiveness of Speculative Execution
After enabling speculative execution, when there are slow tasks that trigger speculative execution,
the web UI will show the speculative attempts on the `SubTasks` tab of vertices on the job page. The web UI
also shows the blocked taskmanagers on the Flink cluster `Overview` and `Task Managers` pages.

You can also check these [`metrics`]({{< ref "docs/ops/metrics" >}}#speculative-execution) to see the effectiveness of speculative execution.

{{< top >}}
29 changes: 29 additions & 0 deletions docs/content/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -2105,6 +2105,35 @@ logged by `SystemResourcesMetricsInitializer` during the startup.
</tbody>
</table>

### Speculative Execution

Metrics below can be used to measure the effectiveness of speculative execution.

<table class="table table-bordered">
<thead>
<tr>
<th class="text-left" style="width: 18%">Scope</th>
<th class="text-left" style="width: 26%">Metrics</th>
<th class="text-left" style="width: 48%">Description</th>
<th class="text-left" style="width: 8%">Type</th>
</tr>
</thead>
<tbody>
<tr>
<th rowspan="2"><strong>Job (only available on JobManager)</strong></th>
<td>numSlowExecutionVertices</td>
<td>Number of slow execution vertices at the moment.</td>
<td>Gauge</td>
</tr>
<tr>
<td>numEffectiveSpeculativeExecutions</td>
<td>Number of effective speculative execution attempts, i.e. speculative execution attempts which
finish earlier than their corresponding original attempts.</td>
<td>Counter</td>
</tr>
</tbody>
</table>

## End-to-End latency tracking

Flink allows to track the latency of records travelling through the system. This feature is disabled by default.
Expand Down

0 comments on commit 70d9f6c

Please sign in to comment.