Skip to content

Commit

Permalink
[FLINK-16075][docs-zh] Translate "The Broadcast State Pattern" page i…
Browse files Browse the repository at this point in the history
…nto Chinese
  • Loading branch information
Zakelly authored and carp84 committed May 16, 2020
1 parent 8bc7442 commit 399519d
Showing 1 changed file with 69 additions and 114 deletions.
183 changes: 69 additions & 114 deletions docs/dev/stream/state/broadcast_state.zh.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,96 +25,78 @@ under the License.
* ToC
{:toc}

[Working with State](state.html) describes operator state which upon restore is either evenly distributed among the
parallel tasks of an operator, or unioned, with the whole state being used to initialize the restored parallel tasks.
你将在本节中了解到如何实际使用 broadcast state。想了解更多有状态流处理的概念,请参考
[Stateful Stream Processing]({% link concepts/stateful-stream-processing.zh.md %})。

A third type of supported *operator state* is the *Broadcast State*. Broadcast state was introduced to support use cases
where some data coming from one stream is required to be broadcasted to all downstream tasks, where it is stored locally
and is used to process all incoming elements on the other stream. As an example where broadcast state can emerge as a
natural fit, one can imagine a low-throughput stream containing a set of rules which we want to evaluate against all
elements coming from another stream. Having the above type of use cases in mind, broadcast state differs from the rest
of operator states in that:
1. it has a map format,
2. it is only available to specific operators that have as inputs a *broadcasted* stream and a *non-broadcasted* one, and
3. such an operator can have *multiple broadcast states* with different names.
## 提供的 API

## Provided APIs
在这里我们使用一个例子来展现 broadcast state 提供的接口。假设存在一个序列,序列中的元素是具有不同颜色与形状的图形,我们希望在序列里相同颜色的图形中寻找满足一定顺序模式的图形对(比如在红色的图形里,有一个长方形跟着一个三角形)。
同时,我们希望寻找的模式也会随着时间而改变。

To show the provided APIs, we will start with an example before presenting their full functionality. As our running
example, we will use the case where we have a stream of objects of different colors and shapes and we want to find pairs
of objects of the same color that follow a certain pattern, *e.g.* a rectangle followed by a triangle. We assume that
the set of interesting patterns evolves over time.
在这个例子中,我们定义两个流,一个流包含`图形(Item)`,具有`颜色``形状`两个属性。另一个流包含特定的`规则(Rule)`,代表希望寻找的模式。

In this example, the first stream will contain elements of type `Item` with a `Color` and a `Shape` property. The other
stream will contain the `Rules`.

Starting from the stream of `Items`, we just need to *key it* by `Color`, as we want pairs of the same color. This will
make sure that elements of the same color end up on the same physical machine.
`图形`流中,我们需要首先使用`颜色`将流进行进行分区(keyBy),这能确保相同颜色的图形会流转到相同的物理机上。

{% highlight java %}
// key the items by color
// 将图形使用颜色进行划分
KeyedStream<Item, Color> colorPartitionedStream = itemStream
.keyBy(new KeySelector<Item, Color>(){...});
{% endhighlight %}

Moving on to the `Rules`, the stream containing them should be broadcasted to all downstream tasks, and these tasks
should store them locally so that they can evaluate them against all incoming `Items`. The snippet below will i) broadcast
the stream of rules and ii) using the provided `MapStateDescriptor`, it will create the broadcast state where the rules
will be stored.
对于`规则`流,它应该被广播到所有的下游 task 中,下游 task 应当存储这些规则并根据它寻找满足规则的图形对。下面这段代码会完成:
i) 将`规则`广播给所有下游 task;
ii) 使用 `MapStateDescriptor` 来描述并创建 broadcast state 在下游的存储结构

{% highlight java %}

// a map descriptor to store the name of the rule (string) and the rule itself.
// 一个 map descriptor,它描述了用于存储规则名称与规则本身的 map 存储结构
MapStateDescriptor<String, Rule> ruleStateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Rule>() {}));

// broadcast the rules and create the broadcast state
// 广播流,广播规则并且创建 broadcast state
BroadcastStream<Rule> ruleBroadcastStream = ruleStream
.broadcast(ruleStateDescriptor);
{% endhighlight %}

Finally, in order to evaluate the `Rules` against the incoming elements from the `Item` stream, we need to:
1. connect the two streams, and
2. specify our match detecting logic.
最终,为了使用`规则`来筛选`图形`序列,我们需要:
1. 将两个流关联起来
2. 完成我们的模式识别逻辑

为了关联一个非广播流 (keyed 或者 non-keyed) 与一个广播流 `BroadcastStream`,我们可以调用非广播流的方法 `connect()`,并将 `BroadcastStream` 当做参数传入。
这个方法的返回参数是 `BroadcastConnectedStream`,具有类型方法 `process()`,传入一个特殊的 `CoProcessFunction` 来书写我们的模式识别逻辑。
具体传入 `process()` 的是哪个类型取决于非广播流的类型:
- 如果流是一个 **keyed** 流,那就是 `KeyedBroadcastProcessFunction` 类型;
- 如果流是一个 **non-keyed** 流,那就是 `BroadcastProcessFunction` 类型。

Connecting a stream (keyed or non-keyed) with a `BroadcastStream` can be done by calling `connect()` on the
non-broadcasted stream, with the `BroadcastStream` as an argument. This will return a `BroadcastConnectedStream`, on
which we can call `process()` with a special type of `CoProcessFunction`. The function will contain our matching logic.
The exact type of the function depends on the type of the non-broadcasted stream:
- if that is **keyed**, then the function is a `KeyedBroadcastProcessFunction`.
- if it is **non-keyed**, the function is a `BroadcastProcessFunction`.

Given that our non-broadcasted stream is keyed, the following snippet includes the above calls:
在我们的例子中,`图形`流是一个keyed stream,所以我们书写的代码如下:

<div class="alert alert-info">
<strong>Attention:</strong> The connect should be called on the non-broadcasted stream, with the BroadcastStream
as an argument.
<strong>注意:</strong>`connect()` 方法需要由非广播流来进行调用,`BroadcastStream` 作为参数传入。
</div>

{% highlight java %}
DataStream<String> output = colorPartitionedStream
.connect(ruleBroadcastStream)
.process(

// type arguments in our KeyedBroadcastProcessFunction represent:
// 1. the key of the keyed stream
// 2. the type of elements in the non-broadcast side
// 3. the type of elements in the broadcast side
// 4. the type of the result, here a string
// KeyedBroadcastProcessFunction 中的类型参数表示:
// 1. key stream 中的 key 类型
// 2. 非广播流中的元素类型
// 3. 广播流中的元素类型
// 4. 结果的类型,在这里是 string

new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
// my matching logic
// 模式匹配逻辑
}
);
{% endhighlight %}

### BroadcastProcessFunction and KeyedBroadcastProcessFunction
### BroadcastProcessFunction KeyedBroadcastProcessFunction

As in the case of a `CoProcessFunction`, these functions have two process methods to implement; the `processBroadcastElement()`
which is responsible for processing incoming elements in the broadcasted stream and the `processElement()` which is used
for the non-broadcasted one. The full signatures of the methods are presented below:
在传入的 `BroadcastProcessFunction``KeyedBroadcastProcessFunction` 中,我们需要实现两个方法。`processBroadcastElement()` 方法负责处理广播流中的元素,`processElement()` 负责处理非广播流中的元素。
两个子类型定义如下:

{% highlight java %}
public abstract class BroadcastProcessFunction<IN1, IN2, OUT> extends BaseBroadcastProcessFunction {
Expand All @@ -136,66 +118,50 @@ public abstract class KeyedBroadcastProcessFunction<KS, IN1, IN2, OUT> {
}
{% endhighlight %}

The first thing to notice is that both functions require the implementation of the `processBroadcastElement()` method
for processing elements in the broadcast side and the `processElement()` for elements in the non-broadcasted side.

The two methods differ in the context they are provided. The non-broadcast side has a `ReadOnlyContext`, while the
broadcasted side has a `Context`.
需要注意的是 `processBroadcastElement()` 是处理的广播流的元素,而 `processElement()` 处理的是另一个流的元素。两个方法的第二个参数(Context)不同,均有以下方法:
1. 得到广播流的存储状态:`ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)`
2. 查询元素的时间戳:`ctx.timestamp()`
3. 查询目前的Watermark:`ctx.currentWatermark()`
4. 目前的处理时间(processing time):`ctx.currentProcessingTime()`
5. 产生旁路输出:`ctx.output(OutputTag<X> outputTag, X value)`

Both of these contexts (`ctx` in the following enumeration):
1. give access to the broadcast state: `ctx.getBroadcastState(MapStateDescriptor<K, V> stateDescriptor)`
2. allow to query the timestamp of the element: `ctx.timestamp()`,
3. get the current watermark: `ctx.currentWatermark()`
4. get the current processing time: `ctx.currentProcessingTime()`, and
5. emit elements to side-outputs: `ctx.output(OutputTag<X> outputTag, X value)`.
`getBroadcastState()` 方法中传入的 `stateDescriptor` 应该与调用 `.broadcast(ruleStateDescriptor)` 的参数相同。

The `stateDescriptor` in the `getBroadcastState()` should be identical to the one in the `.broadcast(ruleStateDescriptor)`
above.

The difference lies in the type of access each one gives to the broadcast state. The broadcasted side has
**read-write access** to it, while the non-broadcast side has **read-only access** (thus the names). The reason for this
is that in Flink there is no cross-task communication. So, to guarantee that the contents in the Broadcast State are the
same across all parallel instances of our operator, we give read-write access only to the broadcast side, which sees the
same elements across all tasks, and we require the computation on each incoming element on that side to be identical
across all tasks. Ignoring this rule would break the consistency guarantees of the state, leading to inconsistent and
often difficult to debug results.
这两个方法的区别就是对于 broadcast state 的访问权限不同。在处理广播流元素这端,是**具有读写权限的**,而对于处理非广播流元素这端是**只读**的。
这样做的原因是,Flink 中是不存在跨 task 通讯的。所以为了保证 broadcast state 在所有的并发实例中是一致的,我们在处理广播流元素的时候给予写权限,在所有的 task 中均可以看到这些元素,并且要求对这些元素处理是一致的,
那么最终所有 task 得到的 broadcast state 是一致的。

<div class="alert alert-info">
<strong>Attention:</strong> The logic implemented in `processBroadcast()` must have the same deterministic behavior
across all parallel instances!
<strong>注意:</strong>`processBroadcastElement()` 的实现必须在所有的并发实例中具有确定性的结果。
</div>

Finally, due to the fact that the `KeyedBroadcastProcessFunction` is operating on a keyed stream, it
exposes some functionality which is not available to the `BroadcastProcessFunction`. That is:
1. the `ReadOnlyContext` in the `processElement()` method gives access to Flink's underlying timer service, which allows
to register event and/or processing time timers. When a timer fires, the `onTimer()` (shown above) is invoked with an
`OnTimerContext` which exposes the same functionality as the `ReadOnlyContext` plus
- the ability to ask if the timer that fired was an event or processing time one and
- to query the key associated with the timer.
2. the `Context` in the `processBroadcastElement()` method contains the method
`applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)`. This allows to
register a `KeyedStateFunction` to be **applied to all states of all keys** associated with the provided `stateDescriptor`.
同时,`KeyedBroadcastProcessFunction` 在 Keyed Stream 上工作,所以它提供了一些 `BroadcastProcessFunction` 没有的功能:
1. `processElement()` 的参数 `ReadOnlyContext` 提供了方法能够访问 Flink 的定时器服务,可以注册事件或者处理时间的定时器。当定时器触发时,`onTimer()` 方法会被调用,
提供了 `OnTimerContext`,它具有 `ReadOnlyContext` 的全部功能,并且提供:
- 查询当前触发的是一个事件还是处理时间的定时器
- 查询定时器关联的key
2. `processBroadcastElement()` 方法中的参数 `Context` 会提供方法 `applyToKeyedState(StateDescriptor<S, VS> stateDescriptor, KeyedStateFunction<KS, S> function)`
这个方法使用一个 `KeyedStateFunction` 能够对 `stateDescriptor` 对应的 state 的**所有key所有存储的状态**进行某些操作。

<div class="alert alert-info">
<strong>Attention:</strong> Registering timers is only possible at `processElement()` of the `KeyedBroadcastProcessFunction`
and only there. It is not possible in the `processBroadcastElement()` method, as there is no key associated to the
broadcasted elements.
<strong>注意:</strong>注册一个定时器只能在 `KeyedBroadcastProcessFunction``processElement()` 方法中进行。
`processBroadcastElement()` 方法中不能注册定时器,因为广播的元素中并没有关联的key。
</div>
Coming back to our original example, our `KeyedBroadcastProcessFunction` could look like the following:

回到我们当前的例子中,`KeyedBroadcastProcessFunction` 应该实现如下:

{% highlight java %}
new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {

// store partial matches, i.e. first elements of the pair waiting for their second element
// we keep a list as we may have many first elements waiting
// 存储部分匹配的结果,即匹配了一个元素,正在等待第二个元素
// 我们用一个数组来存储,因为同时可能有很多第一个元素正在等待
private final MapStateDescriptor<String, List<Item>> mapStateDesc =
new MapStateDescriptor<>(
"items",
BasicTypeInfo.STRING_TYPE_INFO,
new ListTypeInfo<>(Item.class));

// identical to our ruleStateDescriptor above
// 与之前的 ruleStateDescriptor 相同
private final MapStateDescriptor<String, Rule> ruleStateDescriptor =
new MapStateDescriptor<>(
"RulesBroadcastState",
Expand Down Expand Up @@ -234,7 +200,7 @@ new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
stored.clear();
}

// there is no else{} to cover if rule.first == rule.second
// 不需要额外的 else{} 段来考虑 rule.first == rule.second 的情况
if (shape.equals(rule.first)) {
stored.add(value);
}
Expand All @@ -249,31 +215,20 @@ new KeyedBroadcastProcessFunction<Color, Item, Rule, String>() {
}
{% endhighlight %}

## Important Considerations
## 重要注意事项

After describing the offered APIs, this section focuses on the important things to keep in mind when using broadcast
state. These are:
这里有一些 broadcast state 的重要注意事项,在使用它时需要时刻清楚:

- **There is no cross-task communication:** As stated earlier, this is the reason why only the broadcast side of a
`(Keyed)-BroadcastProcessFunction` can modify the contents of the broadcast state. In addition, the user has to make
sure that all tasks modify the contents of the broadcast state in the same way for each incoming element. Otherwise,
different tasks might have different contents, leading to inconsistent results.
- **没有跨 task 通讯:**如上所述,这就是为什么在 `(Keyed)-BroadcastProcessFunction` 中处理广播流元素的方法里可以更改 broadcast state 的内容。
同时,用户需要保证所有 task 对于 broadcast state 的处理方式是一致的,否则会造成不同 task 读取 broadcast state 时内容不一致的情况,最终导致结果不一致。

- **Order of events in Broadcast State may differ across tasks:** Although broadcasting the elements of a stream
guarantees that all elements will (eventually) go to all downstream tasks, elements may arrive in a different order
to each task. So the state updates for each incoming element *MUST NOT depend on the ordering* of the incoming
events.
- **broadcast state 在不同的 task 的事件顺序是不同的:**虽然广播流中元素的过程能够保证所有的下游 task 全部能够收到,但在不同 task 中元素的到达顺序可能不同。
所以 broadcast state 的更新*不能依赖于流中元素的到达顺序*

- **All tasks checkpoint their broadcast state:** Although all tasks have the same elements in their broadcast state
when a checkpoint takes place (checkpoint barriers do not overpass elements), all tasks checkpoint their broadcast state,
and not just one of them. This is a design decision to avoid having all tasks read from the same file during a restore
(thus avoiding hotspots), although it comes at the expense of increasing the size of the checkpointed state by a factor
of p (= parallelism). Flink guarantees that upon restoring/rescaling there will be **no duplicates** and **no missing data**.
In case of recovery with the same or smaller parallelism, each task reads its checkpointed state. Upon scaling up, each
task reads its own state, and the remaining tasks (`p_new`-`p_old`) read checkpoints of previous tasks in a round-robin
manner.
- **所有的 task 均会对 broadcast state 进行 checkpoint:**虽然所有 task 中的 broadcast state 是一致的,但当 checkpoint 来临时所有 task 均会对 broadcast state 做 checkpoint。
这个设计是为了防止在作业恢复后读文件造成的文件热点。当然这种方式会造成 checkpoint 一定程度的写放大,放大倍数为 p (=并行度)。Flink 会保证在恢复状态/改变并发的时候数据**没有重复****没有缺失**
在作业恢复时,如果与之前具有相同或更小的并发度,所有的 task 读取之前已经 checkpoint 过的 state。在并发度上升的情况下,task 会读取本身的 state,多出来的并发 (`p_new` - `p_old`) 会使用轮询调度算法读取之前 task 的 state。

- **No RocksDB state backend:** Broadcast state is kept in-memory at runtime and memory provisioning should be done
accordingly. This holds for all operator states.
- **不使用 RocksDB state backend:** broadcast state 在运行时保存在内存中,需要保证内存充足。这一特性同样适用于 Operator State。

{% top %}

0 comments on commit 399519d

Please sign in to comment.