Skip to content

Commit

Permalink
[FLINK-29381][Connector/Pulsar] Add a document on how to use Key_Shar…
Browse files Browse the repository at this point in the history
…ed subscription.
  • Loading branch information
syhily authored and tisonkun committed Sep 30, 2022
1 parent 414e6ad commit 8027b3c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 5 deletions.
16 changes: 14 additions & 2 deletions docs/content.zh/docs/connectors/datastream/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,21 @@ PulsarSource.builder().set_subscription_name("my-exclusive").set_subscription_ty
{{< /tab >}}
{{< /tabs >}}

如果想在 Pulsar Source 里面使用 `key 共享` 订阅,需要提供 `RangeGenerator` 实例。`RangeGenerator` 会生成一组消息 key 的 hash 范围,Pulsar Source 会基于给定的范围来消费数据。
#### Key_Shared 订阅

Pulsar Source 也提供了一个名为 `UniformRangeGenerator` 的默认实现,它会基于 flink 数据源的并行度将 hash 范围均分。
当时用 Key_Shared 订阅时,Pulsar 将会基于 Message 的 key 去计算对应的 Hash 值,Hash 取值范围为(0~65535)。我们首先会使用 `Message.getOrderingKey()` 计算 Hash,如果没有则会依次使用 `Message.getKey()``Message.getKeyBytes()`。对于上述 key 都找不到的消息,我们会使用字符串 `"NO_KEY"` 来计算消息的 Hash 值。

在 Flink Connector 中针对 Key_Shared 订阅提供了两种消费模式,分别是 `KeySharedMode.SPLIT``KeySharedMode.JOIN`,它们的实际消费行为并不相同。`KeySharedMode.JOIN` 会把所有的给定的 Hash 范围放于一个 Reader 中进行消费,而 `KeySharedMode.SPLIT` 会打散给定的 Hash 范围于不同的 Reader 中消费。

之所以这么设计的主要原因是因为,在 Key_Shared 的订阅模式中,如果一条消息找不到对应的消费者,所有的消息都不会继续往下发送。所以我们提供了 `KeySharedMode.JOIN` 模式,允许用户只消费部分 Hash 范围的消息。

##### 定义 RangeGenerator

如果想在 Pulsar Source 里面使用 `Key_Shared` 订阅,需要提供 `RangeGenerator` 实例。`RangeGenerator` 会生成一组消息 key 的 hash 范围,Pulsar Source 会基于给定的范围来消费数据。

Pulsar Source 也提供了一个名为 `SplitRangeGenerator` 的默认实现,它会基于 flink 数据源的并行度将 hash 范围均分。

由于 Pulsar 并未提供 Key 的 Hash 计算方法,所以我们在 Flink 中提供了名为 `FixedKeysRangeGenerator` 的实现,你可以在 builder 中依次提供需要消费的 Key 内容即可。但需要注意的是,Pulsar 的 Key Hash 值并不对应唯一的一个 Key,所以如果你只想消费某几个 Key 的消息,还需要在后面的代码中使用 `DataStream.filter()` 方法来过滤出对应的消息。

### 起始消费位置

Expand Down
39 changes: 36 additions & 3 deletions docs/content/docs/connectors/datastream/pulsar.md
Original file line number Diff line number Diff line change
Expand Up @@ -301,12 +301,45 @@ PulsarSource.builder().set_subscription_name("my-exclusive").set_subscription_ty
{{< /tab >}}
{{< /tabs >}}

Ensure that you provide a `RangeGenerator` implementation if you want to use the `Key_Shared` subscription type on the Pulsar connector.
The `RangeGenerator` generates a set of key hash ranges so that a respective reader subtask only dispatches messages where the hash of the message key is contained in the specified range.
#### Key_Shared subscriptions

The Pulsar connector uses `UniformRangeGenerator` that divides the range by the Flink source
All the Pulsar's messages will be calculated with a key hash in Key_Shared subscription.
The hash range must be 0 to 65535. We try to compute the key hash in the order of `Message.getOrderingKey()`,
`Message.getKey()` or `Message.getKeyBytes()`. We will use `"NO_KEY"` str as the message key if none of these keys has been provided.

Pulsar's Key_Shared subscription comes in two forms in Connector, the `KeySharedMode.SPLIT` and `KeySharedMode.JOIN`.
Different `KeySharedMode` means different split assignment behaviors. If you only consume a subset of Pulsar's key hash range,
remember to use the `KeySharedMode.JOIN` which will subscribe all the range in only one reader.
Otherwise, when the ranges can join into a full Pulsar key hash range (0~65535) you should use `KeySharedMode.SPLIT`
mode for sharing the splits among all the backend readers.

In the `KeySharedMode.SPLIT` mode. The topic will be subscribed by multiple readers.
But Pulsar has one limit in this situation. That is if a Message can't find the corresponding reader by the key hash range.
No messages will be delivered to the current readers, until there is a reader which can subscribe to such messages.

##### Define a RangeGenerator

Ensure that you have provided a `RangeGenerator` implementation if you want to use the `Key_Shared` subscription type on the Pulsar connector.
The `RangeGenerator` generates a set of key hash ranges so that a respective reader subtask only dispatches
messages where the hash of the message key is contained in the specified range.

The Pulsar connector uses `SplitRangeGenerator` that divides the range by the Flink source
parallelism if no `RangeGenerator` is provided in the `Key_Shared` subscription type.

Since the Pulsar didn't expose the key hash range method. We have to provide an `FixedKeysRangeGenerator` for end-user.
You can add the keys you want to consume, no need to calculate any hash ranges.
The key's hash isn't specified to only one key, so the consuming results may contain the messages with
different keys comparing the keys you have defined in this range generator.
Remember to use flink's `DataStream.filter()` method after the Pulsar source.

```java
FixedKeysRangeGenerator.builder()
.supportNullKey()
.key("someKey")
.keys(Arrays.asList("key1", "key2"))
.build()
```

### Starting Position

The Pulsar source is able to consume messages starting from different positions by setting the `setStartCursor(StartCursor)` option.
Expand Down

0 comments on commit 8027b3c

Please sign in to comment.