Skip to content

Commit

Permalink
Merge pull request hltj#25 from qiaoyuang/master
Browse files Browse the repository at this point in the history
将《响应式流与协程》中“响应式流与通道的区别”大节翻译完成
  • Loading branch information
hltj committed Jan 14, 2019
2 parents a947641 + 01ca5bb commit 7bc4d73
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 75 deletions.
2 changes: 1 addition & 1 deletion docs/shared-mutable-state-and-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ Kotlin 的[密封类](https://kotlinlang.org/docs/reference/sealed-classes.html)
我们使用 `IncCounter` 消息(用来递增计数器)和 `GetCounter` 消息(用来获取值)来定义 `CounterMsg` 密封类。
后者需要发送回复。[CompletableDeferred] 通信<!--
-->原语表示未来可知(可传达)的单个值,
因该特征它被用于此处
这里被用于此目的

<div class="sample" markdown="1" theme="idea" data-highlight-only>

Expand Down
134 changes: 67 additions & 67 deletions reactive/coroutines-guide-reactive.md
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ import kotlin.coroutines.*

```kotlin
fun main() = runBlocking<Unit> {
// 创建一个 publisher,每200毫秒生产一个数字,从 1 到 3
// 创建一个 publisher,每 200 毫秒生产一个数字,从 1 到 3
val source = publish<Int> {
// ^^^^^^^ <--- 这里与先前的示例不同
println("Begin") // 在输出中标记协程开始运行
Expand Down Expand Up @@ -316,21 +316,21 @@ _恢复_ 了主协程,它被分派到主线程上打印出来

### 背压

Backpressure is one of the most interesting and complex aspects of reactive streams. Coroutines can
_suspend_ and they provide a natural answer to handling backpressure.
背压是响应式流中最有趣最复杂的概念之一。协程可以<!--
-->_挂起_ 并且它提供了一个自然的解决方式来处理背压。

In Rx Java 2.x a backpressure-capable class is called
[Flowable](http:https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html).
In the following example we use [rxFlowable] coroutine builder from `kotlinx-coroutines-rx2` module to define a
flowable that sends three integers from 1 to 3.
It prints a message to the output before invocation of
suspending [send][SendChannel.send] function, so that we can study how it operates.
Rx Java 2.x 中一个支持背压的类被称为
[Flowable](http:https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html)
在下面的示例中我们可以使用 `kotlinx-coroutines-rx2` 模块中的协程构建器 [rxFlowable] 来定义一个
发送从 1 到 3 三个整数的 flowable。
在调用挂起的 [send][SendChannel.send] 函数之前,
它在输出中打印了一条消息,所以我们可以来研究它是如何操作的。

The integers are generated in the context of the main thread, but subscription is shifted
to another thread using Rx
这些整数在主线程的上下文中被产生,
但是在使用 Rx 的
[observeOn](http:https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#observeOn(io.reactivex.Scheduler,%20boolean,%20int))
operator with a buffer of size 1.
The subscriber is slow. It takes 500 ms to process each item, which is simulated using `Thread.sleep`.
操作符后缓冲区大小为 1 的订阅被转移到了另一个线程。
为了模拟订阅者很慢,它使用了 `Thread.sleep` 来模拟消耗 500 毫秒来处理每个元素。

<!--- INCLUDE
import io.reactivex.schedulers.*
Expand All @@ -341,28 +341,28 @@ import kotlin.coroutines.*

```kotlin
fun main() = runBlocking<Unit> {
// coroutine -- fast producer of elements in the context of the main thread
// 协程 —— 在主线程的上下文中快速生成元素
val source = rxFlowable {
for (x in 1..3) {
send(x) // this is a suspending function
println("Sent $x") // print after successfully sent item
send(x) // 这是一个挂起函数
println("Sent $x") // 在成功发送元素后打印
}
}
// subscribe on another thread with a slow subscriber using Rx
// 使用 Rx 让一个处理速度很慢的订阅者在另一个线程订阅
source
.observeOn(Schedulers.io(), false, 1) // specify buffer size of 1 item
.observeOn(Schedulers.io(), false, 1) // 指定缓冲区大小为 1 个元素
.doOnComplete { println("Complete") }
.subscribe { x ->
Thread.sleep(500) // 500ms to process each item
Thread.sleep(500) // 处理每个元素消耗 500 毫秒
println("Processed $x")
}
delay(2000) // suspend the main thread for a few seconds
delay(2000) // 挂起主线程几秒钟
}
```

> 你可以从[这里](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-05.kt)获得完整代码
The output of this code nicely illustrates how backpressure works with coroutines:
这段代码的输出更好地说明了背压是如何在协程中工作的:

```text
Sent 1
Expand All @@ -376,17 +376,17 @@ Complete

<!--- TEST -->

We see here how producer coroutine puts the first element in the buffer and is suspended while trying to send another
one. Only after consumer processes the first item, producer sends the second one and resumes, etc.
当尝试发送另一个元素的时候,我们看到这里的处理者协程是如何将第一个元素放入缓冲区并挂起的。
只有当消费者处理了第一个元素,处理者才会发送第二个元素并恢复,等等。


### Rx 主题 vs 广播通道

RxJava has a concept of [Subject](https://github.com/ReactiveX/RxJava/wiki/Subject) which is an object that
effectively broadcasts elements to all its subscribers. The matching concept in coroutines world is called a
[BroadcastChannel]. There is a variety of subjects in Rx with
[BehaviorSubject](http:https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/BehaviorSubject.html) being
the one used to manage state:
RxJava 有一个 [主题(Subject](https://github.com/ReactiveX/RxJava/wiki/Subject) 的概念:一个对象可以有效地向所有<!--
-->订阅者广播元素。与此相匹配的概念在协程的世界中被称为
[BroadcastChannel]。在 Rx 中有一种主题——
[BehaviorSubject](http:https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/BehaviorSubject.html)
被用来管理状态:

<!--- INCLUDE
import io.reactivex.subjects.BehaviorSubject
Expand All @@ -396,17 +396,17 @@ import io.reactivex.subjects.BehaviorSubject
fun main() {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two") // updates the state of BehaviorSubject, "one" value is lost
// now subscribe to this subject and print everything
subject.onNext("two") // 更新 BehaviorSubject 的状态,“one"” 变量被丢弃
// 现在订阅这个主题并打印所有信息
subject.subscribe(System.out::println)
subject.onNext("three")
subject.onNext("four")
}
```

> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-06.kt)
> 你可以从[这里](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-06.kt)获得完整代码
This code prints the current state of the subject on subscription and all its further updates:
这段代码打印订阅时主题的当前状态及其所有后续更新:


```text
Expand All @@ -417,7 +417,7 @@ four

<!--- TEST -->

You can subscribe to subjects from a coroutine just as with any other reactive stream:
您可以像使用任何其他响应式流一样从协程订阅主题:

<!--- INCLUDE
import io.reactivex.subjects.BehaviorSubject
Expand All @@ -431,17 +431,17 @@ fun main() = runBlocking<Unit> {
subject.onNext("one")
subject.onNext("two")
// now launch a coroutine to print everything
GlobalScope.launch(Dispatchers.Unconfined) { // launch coroutine in unconfined context
GlobalScope.launch(Dispatchers.Unconfined) { // 在不受限的上下文中启动协程
subject.consumeEach { println(it) }
}
subject.onNext("three")
subject.onNext("four")
}
```

> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt)
> 你可以从[这里](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-07.kt)获得完整代码
The result is the same:
结果是相同的:

```text
two
Expand All @@ -451,16 +451,16 @@ four

<!--- TEST -->

Here we use [Dispatchers.Unconfined] coroutine context to launch consuming coroutine with the same behaviour as subscription in Rx.
It basically means that the launched coroutine is going to be immediately executed in the same thread that
is emitting elements. Contexts are covered in more details in a [separate section](#coroutine-context).
这里我们使用 [Dispatchers.Unconfined] 协程上下文以与 Rx 中的订阅相同的行为启动消费协程。
它基本上意味着启动的协程将立即在同一个线程中执行<!--
-->并发射元素。上下文的更多细节被包含在[单独的小节](#coroutine-context)

The advantage of coroutines is that it is easy to get conflation behavior for single-threaded UI updates.
A typical UI application does not need to react to every state change. Only the most recent state is relevant.
A sequence of back-to-back updates to the application state needs to get reflected in UI only once,
as soon as the UI thread is free. For the following example we are going to simulate this by launching
consuming coroutine in the context of the main thread and use [yield] function to simulate a break in the
sequence of updates and to release the main thread:
协程的优点是很容易获得单线程 UI 更新的混合行为。
一个典型的 UI 应用程序不需要响应每一个状态改变。只有最近的状态需要被响应。
应用程序状态的一系列背靠背更新只需在UI中反映一次,
尽可能保证 UI 线程是空闲的。在以下的示例中我们将通过模拟在主线程上下文中<!--
-->启动消费者协程并使用 [yield] 函数来模拟中断更新序列<!--
-->并释放主线程:

<!--- INCLUDE
import io.reactivex.subjects.*
Expand All @@ -474,30 +474,30 @@ fun main() = runBlocking<Unit> {
val subject = BehaviorSubject.create<String>()
subject.onNext("one")
subject.onNext("two")
// now launch a coroutine to print the most recent update
launch { // use the context of the main thread for a coroutine
// 现在启动一个协程来打印最近的更新
launch { // 为协程使用主线程的上下文
subject.consumeEach { println(it) }
}
subject.onNext("three")
subject.onNext("four")
yield() // yield the main thread to the launched coroutine <--- HERE
subject.onComplete() // now complete subject's sequence to cancel consumer, too
yield() // 使主线程让步来启动协程 <--- 这里
subject.onComplete() // 现在也结束主题的序列来取消消费者
}
```

> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt)
> 你可以从[这里](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-08.kt)获得完整代码
Now coroutine process (prints) only the most recent update:
现在协程只处理(打印)最近的更新:

```text
four
```

<!--- TEST -->

The corresponding behavior in a pure coroutines world is implemented by [ConflatedBroadcastChannel]
that provides the same logic on top of coroutine channels directly,
without going through the bridge to the reactive streams:
纯协程世界中的相应行为由 [ConflatedBroadcastChannel] 实现<!--
-->在协程通道上直接提供相同的逻辑,
没有桥接到响应式流:

<!--- INCLUDE
import kotlinx.coroutines.channels.*
Expand All @@ -510,34 +510,34 @@ fun main() = runBlocking<Unit> {
val broadcast = ConflatedBroadcastChannel<String>()
broadcast.offer("one")
broadcast.offer("two")
// now launch a coroutine to print the most recent update
launch { // use the context of the main thread for a coroutine
// 现在启动一个协程来打印最近的更新
launch { // 为协程使用主线程的上下文
broadcast.consumeEach { println(it) }
}
broadcast.offer("three")
broadcast.offer("four")
yield() // yield the main thread to the launched coroutine
broadcast.close() // now close broadcast channel to cancel consumer, too
yield() // 使主线程让步来启动协程
broadcast.close() // 现在也结束主题的序列来取消消费者
}
```

> You can get full code [here](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt)
> 你可以从[这里](kotlinx-coroutines-rx2/test/guide/example-reactive-basic-09.kt)获得完整代码
It produces the same output as the previous example based on `BehaviorSubject`:
它与基于 `BehaviorSubject` 的先前的示例产生了相同的输出:

```text
four
```

<!--- TEST -->

Another implementation of [BroadcastChannel] is `ArrayBroadcastChannel` with an array-based buffer of
a specified `capacity`. It can be created with `BroadcastChannel(capacity)`.
It delivers every event to every
subscriber since the moment the corresponding subscription is open. It corresponds to
[PublishSubject](http:https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/PublishSubject.html) in Rx.
The capacity of the buffer in the constructor of `ArrayBroadcastChannel` controls the numbers of elements
that can be sent before the sender is suspended waiting for receiver to receive those elements.
另一个 [BroadcastChannel] 的实现是 `ArrayBroadcastChannel` ——一个使用数组的缓冲区<!--
-->来规定 `capacity`。它可以使用 `BroadcastChannel(capacity)` 来启动。
它为每一个订阅者<!--
-->自相应订阅开放之时起提供每一个事件。它对应于 Rx 中的
[PublishSubject](http:https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/subjects/PublishSubject.html)
`ArrayBroadcastChannel` 构造函数中的缓冲区的 capacity 参数控制<!--
-->发送者在挂起等待接收者接收这些元素之前的元素的数量。

## Operators

Expand Down
14 changes: 7 additions & 7 deletions ui/coroutines-guide-ui.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,7 @@ fun View.onClick(action: suspend (View) -> Unit) {

```kotlin
fun Node.onClick(action: suspend (MouseEvent) -> Unit) {
// launch one actor to handle all events on this node
// 在这个 node 上启动一个 actor 来处理所有的事件
val eventActor = GlobalScope.actor<MouseEvent>(Dispatchers.Main, capacity = Channel.CONFLATED) { // <--- 修改这里
for (event in channel) action(event) // 将事件传递给 action
}
Expand Down Expand Up @@ -404,7 +404,7 @@ fun Node.onClick(action: suspend (MouseEvent) -> Unit) {

如果所有 API 都被编写为永不阻塞执行线程的挂起函数,
那就太好了。然而,通常情况并非如此。有时你需要做一些消耗 CPU 的运算<!--
-->或者只是需要调用第三部分的 API 来进行网络访问,比如说,那将阻塞调用它的线程。
-->或者只是需要调用第三方的 API 来进行网络访问,比如说,那将阻塞调用它的线程。
你不能在 UI 主线程中那样做,也不能直接在 UI 限定的协程中直接调用,因为那将<!--
-->阻塞 UI 主线程并冻结 UI。

Expand Down Expand Up @@ -437,7 +437,7 @@ fun fib(x: Int): Int =
```kotlin
fun setup(hello: Text, fab: Circle) {
var result = "none" // 最后一个结果
// counting animation
// 计数动画
GlobalScope.launch(Dispatchers.Main) {
var counter = 0
while (true) {
Expand Down Expand Up @@ -472,8 +472,8 @@ fun setup(hello: Text, fab: Circle) {
UI 对象的 job 对象。但是通过关联每一个协程构建器的 job 对象是容易出错的,
它是非常容易被忘记的。对于这个目的,UI 的所有者可以实现 [CoroutineScope] 接口,那么每一个<!--
-->协程构建器被定义为了 [CoroutineScope] 上的扩展并承袭了没有显示声明的 UI job。
For the sake of simplicity, [MainScope()] factory can be used. It automatically provides `Dispatchers.Main` and parent
job.
为了简单起见,可以使用 [MainScope()] 工厂函数。它将会自动提供 `Dispatchers.Main` 以及父级
任务。

举例来说,在 Android 应用程序中一个 `Activity` 最初被 _created_ 以及被当它不再被<!--
-->需要时 _destroyed_ 并且当内存必须被释放时。一个自然的解决方式是绑定一个
Expand Down Expand Up @@ -548,8 +548,8 @@ suspend fun CoroutineScope.launchInIO() = launch(Dispatchers.IO) {

Job 之间的父子关系形成层次结构。代表执行某些后台工作的协程<!--
-->视图及其上下文可以创建更多的子协程。当父任务被取消时,
整个协程树都会被取消。请参见协程指南中
[“子协程”](../docs/coroutine-context-and-dispatchers.md#children-of-a-coroutine)这一小节的示例。
整个协程树都会被取消。请参见协程指南中<!--
-->[“子协程”](../docs/coroutine-context-and-dispatchers.md#children-of-a-coroutine)这一小节的示例。
<!--- CLEAR -->

### 阻塞操作
Expand Down

0 comments on commit 7bc4d73

Please sign in to comment.