Skip to content

Latest commit

 

History

History
1088 lines (877 loc) · 40 KB

coroutines-guide-reactive.md

File metadata and controls

1088 lines (877 loc) · 40 KB

响应式流与协程指南

这篇教程介绍了 Kotlin 协程与响应式流的不同点并展示了如何将它们更好的一起使用。在此之前熟悉包含在协程指南中的基础协程概念不是必须的, 但如果熟悉它将会是个很大的加分。如果你熟悉响应式流,你可能发现本指南会更好地介绍协程的世界。

kotlinx.coroutines 项目中有一系列和响应式流相关的模块:

本指南主要基于 Reactive Streams 的规范并使用 Publisher 接口和一些基于 RxJava 2.x 的示例, 该示例实现了响应式流的规范。

欢迎你在 Github 上 clone kotlinx.coroutines 项目 到你的工作站中,这是为了可以运行所有在本指南中展示的示例。它们被包含在项目的 reactive/kotlinx-coroutines-rx2/test/guide 路径中。

目录

响应式流与通道的区别

本节主要包含响应式流与以协程为基础的通道的不同点。

迭代的基础

Channel 与如下所示的响应式流类有类似的概念:

它们都描述了一个异步的有限或无限的元素流(在 Rx 中又名 items), 并且都支持背压。

然而,使用 Rx 的术语的话,Channel 总是表示了一个条目的 流。元素被生产者协程发送到通道并被消费者协程所接收。 在通道中每调用一次 receive 就消费一个元素。 让我们用以下的例子来说明:

fun main() = runBlocking<Unit> {
    // 创建一个通道,该通道每200毫秒生产一个数字,从 1 到 3
    val source = produce<Int> {
        println("Begin") // 在输出中标记协程开始运行
        for (x in 1..3) {
            delay(200) // 等待 200 毫秒
            send(x) // 将数字 x 发送到通道中
        }
    }
    // 从 source 中打印元素
    println("Elements:")
    source.consumeEach { // 在 source 中消费元素
        println(it)
    }
    // 再次从 source 中打印元素
    println("Again:")
    source.consumeEach { // 从 source 中消费元素
        println(it)
    }
}

可以在这里获取完整代码。

这段代码产生了如下输出:

Elements:
Begin
1
2
3
Again:

注意,“Begin” 只被打印了一次,因为 produce 协程构建器 被执行的时候, 只创建了一个协程来生产元素流。所有被生产的元素都被 ReceiveChannel.consumeEach 扩展函数消费。没有办法从这个通道重复接收元素。当生产者协程结束时该通道被关闭, 再次尝试接收元素将不会接收到任何东西。

让我们使用 kotlinx-coroutines-reactive 模块中的 publish 协程构建器 代替 kotlinx-coroutines-core 模块中的 produce 来重写这段代码。代码保持相似, 但是在 source 接收 ReceiveChannel 类型的地方,现在它接收响应式流的 Publisher 类型,where consumeEach was used to consume elements from the channel, now collect is used to collect elements from the publisher.

fun main() = runBlocking<Unit> {
    // 创建一个 publisher,每 200 毫秒生产一个数字,从 1 到 3
    val source = publish<Int> {
    //           ^^^^^^^  <--- 这里与先前的示例不同
        println("Begin") // 在输出中标记协程开始运行
        for (x in 1..3) {
            delay(200) // 等待 200 毫秒
            send(x) // 将数字 x 发送到通道中
        }
    }
    // 从 source 中打印元素
    println("Elements:")
    source.collect { // collect elements from it
        println(it)
    }
    // 再次从 source 中打印元素
    println("Again:")
    source.collect { // collect elements from it
        println(it)
    }
}

可以在这里获取完整代码。

现在这段代码的输出变为:

Elements:
Begin
1
2
3
Again:
Begin
1
2
3

This example highlights the key difference between a reactive stream and a channel. A reactive stream is a higher-order functional concept. While the channel is a stream of elements, the reactive stream defines a recipe on how the stream of elements is produced. It becomes the actual stream of elements when collected. Each collector may receive the same or a different stream of elements, depending on how the corresponding implementation of Publisher works.

The publish coroutine builder, that is used in the above example, does not launch a coroutine, but every collect invocation launches a coroutine. We have two of them in this code and that is why we see "Begin" printed twice.

In Rx lingo this is called a cold publisher. Many standard Rx operators produce cold streams, too. We can collect them from a coroutine, and every collector gets the same stream of elements.

警告:它计划在未来的一秒钟内在通道上调用 consumeEach 方法来准备好消费元素可以快速的失败,这会立即抛出一个 IllegalStateException。 查看这个提案的细节。

注意,我们可以使用 Rx 中的 publish 操作符与 connect 方法来替换我们在通道中所看到的类似的行为。

订阅与取消

An example in the previous section uses source.collect { ... } to collect all elements. Instead of collecting elements, we can open a channel using openSubscription and iterate over it, so that we have more finer-grained control on our iteration, for example using break, as shown below:

fun main() = runBlocking<Unit> {
    val source = Flowable.range(1, 5) // 五个数字的区间
        .doOnSubscribe { println("OnSubscribe") } // 提供了一些可被观察的点
        .doOnComplete { println("OnComplete") }   // ...
        .doFinally { println("Finally") }         // ... 在正在执行的代码中
    var cnt = 0 
    source.openSubscription().consume { // 在源中打开通道
        for (x in this) { // 迭代通道以从中接收元素
            println(x)
            if (++cnt >= 3) break // 当三个元素被打印出来的时候,执行 break
        }
        // 注意:当这段代码执行完成并阻塞的时候 `consume` 取消了该通道
    }
}

可以在这里获取完整代码。

它将产生如下输出:

OnSubscribe
1
2
3
Finally

With an explicit openSubscription we should cancel the corresponding subscription to unsubscribe from the source, but there is no need to call cancel explicitly -- under the hood consume does that for us. The installed doFinally 监听器并打印“Finally”来确认订阅确实被取消了。注意“OnComplete” 永远不会被打印因为我们没有消费所有的元素。

We do not need to use an explicit cancel either if we collect all the elements:

fun main() = runBlocking<Unit> {
    val source = Flowable.range(1, 5) // 五个数字的区间
        .doOnSubscribe { println("OnSubscribe") } // 提供了一些可被观察的点
        .doOnComplete { println("OnComplete") }   // ……
        .doFinally { println("Finally") }         // …… 在正在执行的代码中
    // collect the source fully
    source.collect { println(it) }
}

可以在这里获取完整代码。

我们得到如下输出:

OnSubscribe
1
2
3
4
OnComplete
Finally
5

注意,如何使“OnComplete”与“Finally”在最后一个元素“5”之前被打印。在这个示例中它将发生在我们的 main 函数在协程中执行时,使用 runBlocking 协程构建器来启动它。 我们的主协程在 flowable 中使用 source.collect { …… } 扩展函数来接收通道。 当它等待源发射元素的时候该主协程是 挂起的 , 当最后一个元素被 Flowable.range(1, 5) 发射时它 恢复 了主协程,它被分派到主线程上打印出来 最后一个元素在稍后的时间点打印,而 source 执行完成并打印“Finally”。

背压

背压是响应式流中最有趣最复杂的概念之一。协程可以挂起 并且它提供了一个自然的解决方式来处理背压。

在 Rx Java 2.x 中一个支持背压的类被称为 Flowable。 在下面的示例中我们可以使用 kotlinx-coroutines-rx2 模块中的协程构建器 rxFlowable 来定义一个 发送从 1 到 3 三个整数的 flowable。 在调用挂起的 send 函数之前, 它在输出中打印了一条消息,所以我们可以来研究它是如何操作的。

这些整数在主线程的上下文中被产生, 但是在使用 Rx 的 observeOn 操作符后缓冲区大小为 1 的订阅被转移到了另一个线程。 为了模拟订阅者很慢,它使用了 Thread.sleep 来模拟消耗 500 毫秒来处理每个元素。

fun main() = runBlocking<Unit> { 
    // 协程 —— 在主线程的上下文中快速生成元素
    val source = rxFlowable {
        for (x in 1..3) {
            send(x) // 这是一个挂起函数
            println("Sent $x") // 在成功发送元素后打印
        }
    }
    // 使用 Rx 让一个处理速度很慢的订阅者在另一个线程订阅
    source
        .observeOn(Schedulers.io(), false, 1) // 指定缓冲区大小为 1 个元素
        .doOnComplete { println("Complete") }
        .subscribe { x ->
            Thread.sleep(500) // 处理每个元素消耗 500 毫秒
            println("Processed $x")
        }
    delay(2000) // 挂起主线程几秒钟
}

可以在这里获取完整代码。

这段代码的输出更好地说明了背压是如何在协程中工作的:

Sent 1
Processed 1
Sent 2
Processed 2
Sent 3
Processed 3
Complete

当尝试发送另一个元素的时候,我们看到这里的处理者协程是如何将第一个元素放入缓冲区并挂起的。 只有当消费者处理了第一个元素,处理者才会发送第二个元素并恢复,等等。

Rx 主题 vs 广播通道

RxJava 有一个 主题(Subject) 的概念:一个对象可以有效地向所有订阅者广播元素。与此相匹配的概念在协程的世界中被称为 BroadcastChannel。在 Rx 中有一种主题—— BehaviorSubject 被用来管理状态:

fun main() {
    val subject = BehaviorSubject.create<String>()
    subject.onNext("one")
    subject.onNext("two") // 更新 BehaviorSubject 的状态,“one”变量被丢弃
    // 现在订阅这个主题并打印所有信息
    subject.subscribe(System.out::println)
    subject.onNext("three")
    subject.onNext("four")
}

可以在这里获取完整代码。

这段代码打印订阅时主题的当前状态及其所有后续更新:

two
three
four

您可以像使用任何其他响应式流一样从协程订阅主题:

fun main() = runBlocking<Unit> {
    val subject = BehaviorSubject.create<String>()
    subject.onNext("one")
    subject.onNext("two")
    // 现在启动一个协程来打印所有东西
    GlobalScope.launch(Dispatchers.Unconfined) { // 在不受限的上下文中启动协程
        subject.collect { println(it) }
    }
    subject.onNext("three")
    subject.onNext("four")
}

可以在这里获取完整代码。

结果是相同的:

two
three
four

这里我们使用 Dispatchers.Unconfined 协程上下文以与 Rx 中的订阅相同的行为启动消费协程。 它基本上意味着启动的协程将立即在同一个线程中执行并发射元素。上下文的更多细节被包含在单独的小节

协程的优点是很容易获得单线程 UI 更新的混合行为。 一个典型的 UI 应用程序不需要响应每一个状态改变。只有最近的状态需要被响应。 应用程序状态的一系列背靠背更新只需在UI中反映一次, 尽可能保证 UI 线程是空闲的。在以下的示例中我们将通过模拟在主线程上下文中启动消费者协程并使用 yield 函数来模拟中断更新序列并释放主线程:

fun main() = runBlocking<Unit> {
    val subject = BehaviorSubject.create<String>()
    subject.onNext("one")
    subject.onNext("two")
    // 现在启动一个协程来打印最近的更新
    launch { // 为协程使用主线程的上下文
        subject.collect { println(it) }
    }
    subject.onNext("three")
    subject.onNext("four")
    yield() // 使主线程让步来启动协程 <--- 这里
    subject.onComplete() // 现在也结束主题的序列来取消消费者
}

可以在这里获取完整代码。

现在协程只处理(打印)最近的更新:

four

纯协程世界中的相应行为由 ConflatedBroadcastChannel 实现在协程通道上直接提供相同的逻辑, 没有桥接到响应式流:

fun main() = runBlocking<Unit> {
    val broadcast = ConflatedBroadcastChannel<String>()
    broadcast.offer("one")
    broadcast.offer("two")
    // 现在启动一个协程来打印最近的更新
    launch { // 为协程使用主线程的上下文
        broadcast.consumeEach { println(it) }
    }
    broadcast.offer("three")
    broadcast.offer("four")
    yield() // 使主线程让步来启动协程
    broadcast.close() // 现在也结束主题的序列来取消消费者
}

可以在这里获取完整代码。

它与基于 BehaviorSubject 的先前的示例产生了相同的输出:

four

另一个 BroadcastChannel 的实现是 ArrayBroadcastChannel ——一个使用数组的缓冲区来规定 capacity。它可以使用 BroadcastChannel(capacity) 来启动。 它为每一个订阅者自相应订阅开放之时起提供每一个事件。它对应于 Rx 中的 PublishSubjectArrayBroadcastChannel 构造函数中的缓冲区的 capacity 参数控制发送者在挂起等待接收者接收这些元素之前的元素的数量。

操作符

全功能的响应式流库,比如 Rx,都伴随着非常大量的操作符用于创建、变换、合并以及反转来处理相关的流。创建你自己的并且支持背压的操作符是非常臭名昭著以及困难的。

协程与通道则被设计为提供完全相反的体验。这里没有内建的操作符, 但是处理元素流是非常简单并且自动支持背压的, 即使是在你没有明确思考这一点的情况下。

本节将展示以协程为基础而实现的一系列响应式流操作符。

Range

让我们推出自己的为响应式流 Publisher 接口实现的 range 操作符。为响应式流提供的本操作符从零开始的异步实现被包含在这篇博客中。 它需要很多代码。 以下是与协同程序相对应的代码:

fun CoroutineScope.range(context: CoroutineContext, start: Int, count: Int) = publish<Int>(context) {
    for (x in start until start + count) send(x)
}

在这段代码中 CoroutineScopecontext 被用来替代一个 Executor 并且所有的背压方面都被小心的用于协程机制。注意,此实现仅依赖于那些定义了 Publisher 接口和它的朋友们的小型响应式流库。

它可以直接在协程中被使用:

fun main() = runBlocking<Unit> {
    // Range 从 runBlocking 中承袭了父 job,但是使用 Dispatchers.Default 来覆盖调度器
    range(Dispatchers.Default, 1, 5).collect { println(it) }
}

可以在这里获取完整代码。

这段代码的结果非常值得我们期待:

1
2
3
4
5

熔合 filter 与 map 操作符

响应式操作符比如: filter 以及 map 使用协程实现是非常琐碎的。对于一些挑战和展示,让我们将它们合并到单个的 fusedFilterMap 操作符中:

fun <T, R> Publisher<T>.fusedFilterMap(
    context: CoroutineContext,   // 协程执行的上下文
    predicate: (T) -> Boolean,   // 过滤器 predicate
    mapper: (T) -> R             // mapper 函数
) = GlobalScope.publish<R>(context) {
    collect {                    // collect the source stream
        if (predicate(it))       // filter part
            send(mapper(it))     // map part
    }        
}

使用先前 range 中的示例我们可以测试我们的 fusedFilterMap 来过滤偶数以及将它们映射到字符串:

fun main() = runBlocking<Unit> {
   range(1, 5)
       .fusedFilterMap(coroutineContext, { it % 2 == 0}, { "$it is even" })
       .collect { println(it) } // 打印所有的字符串结果
}

可以在这里获取完整代码。

不难看出,结果将是:

2 is even
4 is even

Take until

让我们为 takeUntil 操作符实现自己的版本。它是非常难于去实现的,因为需要跟踪和管理两个流的订阅。 我们需要以来源流中的所有元素直到另一个流也执行完成或发射了任何东西。然而,我们有 select 表达式可以在协程的实现中拯救我们:

fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>) = GlobalScope.publish<T>(context) {
    this@takeUntil.openSubscription().consume { // 显式地打开 Publisher<T> 的通道
        val current = this
        other.openSubscription().consume { // 显式地打开 Publisher<U> 的通道
            val other = this
            whileSelect {
                other.onReceive { false }            // 释放任何从 `other` 接收到的元素
                current.onReceive { send(it); true } // 在这个通道上重新发送元素并继续
            }
        }
    }
}

这段代码使用 whileSelect 作为比 while(select{...}) {} 循环更好的快捷方式,并且 Kotlin 的 use 表达式会在退出时关闭通道,并取消订阅相应的发布者。

在下面手写的 rangeinterval 的组合被用来测试。它在编码中使用 publish 协程构建器 (在下一小节中它将是纯 Rx 实现的):

fun CoroutineScope.rangeWithInterval(time: Long, start: Int, count: Int) = publish<Int> {
    for (x in start until start + count) { 
        delay(time) // 在每次发送数字之前等待
        send(x)
    }
}

下面的代码展示了 takeUntil 是如何工作的:

fun main() = runBlocking<Unit> {
    val slowNums = rangeWithInterval(200, 1, 10)         // 数字之间有 200 毫秒的间隔
    val stop = rangeWithInterval(500, 1, 10)             // 第一个在 500 毫秒之后
    slowNums.takeUntil(coroutineContext, stop).collect { println(it) } // 让我们测试它
}

可以在这里获取完整代码。

执行

1
2

Merge

使用协程处理多个数据流总是至少有两种方法。一种方法是调用 select,这被展示在先前的示例中。另一种方法是只是启动过个协程。让我们使用 merge 操作符来使用第二种的方法:

fun <T> Publisher<Publisher<T>>.merge(context: CoroutineContext) = GlobalScope.publish<T>(context) {
  collect { pub -> // for each publisher collected
      launch {  // launch a child coroutine
          pub.collect { send(it) } // resend all element from this publisher
      }
  }
}

注意, coroutineContext 在调用 launch 协程构建器中的用途。它被用来指定 publish 协程的上下文。这种方法,所有被启动的协程在这里都是 publish 协程的子协程并且当 publish 协程被取消或以其它的方式执行完毕时将会被取消。 此外,父协程会等待至所有子协程执行完毕没,这个实现会完全合并所有的响应式流。

对于测试,让我们在先前的示例中使用 rangeWithInterval 函数来启动并编写一个生产者在一段时间的延时后发送结果两次:

fun CoroutineScope.testPub() = publish<Publisher<Int>> {
    send(rangeWithInterval(250, 1, 4)) // 数字 1 在 250 毫秒发射,2 在 500 毫秒,3 在 750 毫秒,4 在 1000 毫秒
    delay(100) // 等待 100 毫秒
    send(rangeWithInterval(500, 11, 3)) // 数字 11 在 600 毫秒,12 在 1100 毫秒,13 在 1600 毫秒
    delay(1100) // 在启动完成后的 1.2 秒之后等待 1.1 秒
}

这段测试代码在 testPub 上使用了 merge 并且展示结果:

fun main() = runBlocking<Unit> {
    testPub().merge(coroutineContext).collect { println(it) } // 打印整个流
}

可以在这里获取完整代码。

并且结果应该是:

1
2
11
3
4
12
13

协程上下文

所有的示例操作符都在先前的示例中显式地设置了 CoroutineContext 参数。在 Rx 的世界中它大概对应于一个 Scheduler

线程与 Rx

下面的示例中展示了基本的在 Rx 中管理线程上下文。 这里的 rangeWithIntervalRxrangeWithInterval 函数使用 Rx 的 ziprange 以及 interval 操作符的一个实现。

fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> = 
    Flowable.zip(
        Flowable.range(start, count),
        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
        BiFunction { x, _ -> x })

fun main() {
    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
    Thread.sleep(1000)
}

可以在这里获取完整代码。

我们显式地通过 Schedulers.computation() 调度器,并将它用于 rangeWithIntervalRx 操作符,所以它将执行在 Rx 的计算线程池。输出将类似于以下内容:

1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1

线程与协程

在协程的世界中 Schedulers.computation() 大致对应于 Dispatchers.Default, 所以先前的示例将变成下面这样:

fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish<Int>(context) {
    for (x in start until start + count) { 
        delay(time) // 在每次数字发射前等待
        send(x)
    }
}

fun main() {
    Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
    Thread.sleep(1000)
}

可以在这里获取完整代码。

产生的输出将类似于:

1 on thread ForkJoinPool.commonPool-worker-1
2 on thread ForkJoinPool.commonPool-worker-1
3 on thread ForkJoinPool.commonPool-worker-1

这里我们使用了 Rx 的 subscribe 操作符,没有自己的调度器和操作符并且运行在同一个线程上,而发布者在本示例中运行在共享的线程池上。

Rx observeOn

在 Rx 中你操作使用了特别的操作符来为调用链修改线程上下文。 如果你不熟悉它的话, 你可以从这篇很棒的教程中获得指导。

举例来说,这里使用了 observeOn 操作符。让我们修改先前的示例并观察使用 Schedulers.computation() 的效果:

fun rangeWithInterval(context: CoroutineContext, time: Long, start: Int, count: Int) = GlobalScope.publish<Int>(context) {
    for (x in start until start + count) { 
        delay(time) // 在每次数字发射前等待
        send(x)
    }
}

fun main() {
    Flowable.fromPublisher(rangeWithInterval(Dispatchers.Default, 100, 1, 3))
        .observeOn(Schedulers.computation())                           // <-- 添加了这一行
        .subscribe { println("$it on thread ${Thread.currentThread().name}") }
    Thread.sleep(1000)
}

可以在这里获取完整代码。

这里的输出有所不同了,提示了“RxComputationThreadPool”:

1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1

使用协程上下文来管理它们

一个协程总是运行在一些上下文中。举例来说,让我们使用 runBlocking 在主线程中启动一个协程,并且使用 Rx 版本的 rangeWithIntervalRx 操作符, 替代使用 Rx 的 subscribe 操作符遍历结果:

fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
    Flowable.zip(
        Flowable.range(start, count),
        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
        BiFunction { x, _ -> x })

fun main() = runBlocking<Unit> {
    rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
        .collect { println("$it on thread ${Thread.currentThread().name}") }
}

可以在这里获取完整代码。

结果信息将会被打印在主线程中:

1 on thread main
2 on thread main
3 on thread main

不受限的上下文

大多数 Rx 操作符都没有特别地指定相关线程(调度器)并且正在运行在它们恰好被调用的任何线程中。我们在线程与 Rx 这一小节中看到了 subscribe 运算符的示例。

在协程的世界中,Dispatchers.Unconfined 则承担了类似的任务。让我们修改先前的示例, 但是将源 Flowable 的遍历替换到 runBlocking 协程中后,它被限制在了主线程中,我们在 Dispatchers.Unconfined 上下文中启动了一个新协程,当主协程只是等待的时候使用 Job.join

fun rangeWithIntervalRx(scheduler: Scheduler, time: Long, start: Int, count: Int): Flowable<Int> =
    Flowable.zip(
        Flowable.range(start, count),
        Flowable.interval(time, TimeUnit.MILLISECONDS, scheduler),
        BiFunction { x, _ -> x })

fun main() = runBlocking<Unit> {
    val job = launch(Dispatchers.Unconfined) { // 在不受限的山下文中启动一个新协程(没有它自己的线程池)
        rangeWithIntervalRx(Schedulers.computation(), 100, 1, 3)
            .collect { println("$it on thread ${Thread.currentThread().name}") }
    }
    job.join() // 等待我们的协程结束
}

可以在这里获取完整代码。

现在,这段代码中协程执行在了 Rx 的计算线程池并输出,只是就像我们初始的示例中使用了 Rx 的 subscribe 操作符。

1 on thread RxComputationThreadPool-1
2 on thread RxComputationThreadPool-1
3 on thread RxComputationThreadPool-1

注意,该 Dispatchers.Unconfined 上下文应该被谨慎使用。由于降低了局部的堆栈操作以及开销调度的减少, 它也许会在某些测试中提高总体性能,但它也会产生更深的堆栈并且更难以推断使用它的代码的异步性。

如果一个协程将一个元素发送到一个通道,那么调用的线程 send 可能会开始使用 Dispatchers.Unconfined 调度程序执行协程的代码。 原本的生产者协程调用 send 后会暂停直至不受限的消费者协程运行至下一个挂起点。这与缺乏 Rx 世界中的锁步单线程 onNext 执行线程切换操作符是非常类似的。这在 Rx 中是默认正常的,因为操作符经常做一些非常小块的工作并且你必须做一些复杂处理来合并大量的操作符。然而,这对于协程来说是不常见的, 你可以在一个协程中进行任意复杂的处理。通常,你只需要在多个工作协程之间使用扇入和扇出来为复杂的流水线链接流处理协程。