Skip to content

Commit

Permalink
Tweaked prose in coroutines-guide-reactive a bit (and reknit)
Browse files Browse the repository at this point in the history
  • Loading branch information
elizarov committed Apr 24, 2019
1 parent 0685dc4 commit c961fb6
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 22 deletions.
38 changes: 20 additions & 18 deletions reactive/coroutines-guide-reactive.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,8 @@ Let us rewrite this code using [publish] coroutine builder from `kotlinx-corouti
instead of [produce] from `kotlinx-coroutines-core` module. The code stays the same,
but where `source` used to have [ReceiveChannel] type, it now has reactive streams
[Publisher](https://www.reactive-streams.org/reactive-streams-1.0.0-javadoc/org/reactivestreams/Publisher.html)
type.
type, where [consumeEach] was used to _consume_ elements from the channel,
now [collect][org.reactivestreams.Publisher.collect] is used to _collect_ elements from the publisher.

<!--- INCLUDE
import kotlinx.coroutines.*
Expand Down Expand Up @@ -194,15 +195,15 @@ Begin

This example highlights the key difference between a reactive stream and a channel. A reactive stream is a higher-order
functional concept. While the channel _is_ a stream of elements, the reactive stream defines a recipe on how the stream of
elements is produced. It becomes the actual stream of elements on _subscription_. Each subscriber may receive the same or
elements is produced. It becomes the actual stream of elements when _collected_. Each collector may receive the same or
a different stream of elements, depending on how the corresponding implementation of `Publisher` works.

The [publish] coroutine builder, that is used in the above example, launches a fresh coroutine on each subscription.
Every [Publisher.collect][org.reactivestreams.Publisher.collect] invocation creates a fresh subscription.
The [publish] coroutine builder, that is used in the above example, does not launch a coroutine,
but every [collect][org.reactivestreams.Publisher.collect] invocation launches a coroutine.
We have two of them in this code and that is why we see "Begin" printed twice.

In Rx lingo this is called a _cold_ publisher. Many standard Rx operators produce cold streams, too. We can iterate
over them from a coroutine, and every subscription produces the same stream of elements.
In Rx lingo this is called a _cold_ publisher. Many standard Rx operators produce cold streams, too. We can collect
them from a coroutine, and every collector gets the same stream of elements.

**WARNING**: It is planned that in the future a second invocation of `consumeEach` method
on an channel that is already being consumed is going to fail fast, that is
Expand All @@ -217,10 +218,10 @@ method with it.

### Subscription and cancellation

An example in the previous section uses `source.collect { ... }` snippet to open a subscription
and receive all the elements from it. If we need more control on how what to do with
the elements that are being received from the channel, we can use [Publisher.collect][org.reactivestreams.Publisher.collect]
as shown in the following example:
An example in the previous section uses `source.collect { ... }` to collect all elements.
Instead of collecting elements, we can open a channel using [openSubscription][org.reactivestreams.Publisher.openSubscription]
and iterate over it, so that we have more finer-grained control on our iteration,
for example using `break`, as shown below:

<!--- INCLUDE
import io.reactivex.*
Expand Down Expand Up @@ -259,17 +260,16 @@ Finally
```

<!--- TEST -->

With an explicit `openSubscription` we should [cancel][ReceiveChannel.cancel] the corresponding
subscription to unsubscribe from the source. There is no need to invoke `cancel` explicitly -- under the hood
`consume` does that for us.
subscription to unsubscribe from the source, but there is no need to call `cancel` explicitly -- under the hood
[consume] does that for us.
The installed
[doFinally](https://reactivex.io/RxJava/2.x/javadoc/io/reactivex/Flowable.html#doFinally(io.reactivex.functions.Action))
listener prints "Finally" to confirm that the subscription is actually being closed. Note that "OnComplete"
is never printed because we did not consume all of the elements.

We do not need to use an explicit `cancel` either if iteration is performed over all the items that are emitted
by the publisher, because it is being cancelled automatically by `collect`:
We do not need to use an explicit `cancel` either if we `collect` all the elements:

<!--- INCLUDE
import io.reactivex.*
Expand All @@ -284,7 +284,7 @@ fun main() = runBlocking<Unit> {
.doOnSubscribe { println("OnSubscribe") } // provide some insight
.doOnComplete { println("OnComplete") } // ...
.doFinally { println("Finally") } // ... into what's going on
// iterate over the source fully
// collect the source fully
source.collect { println(it) }
}
```
Expand Down Expand Up @@ -684,8 +684,8 @@ fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>
other.openSubscription().consume { // explicitly open channel to Publisher<U>
val other = this
whileSelect {
other.onReceive { false } // bail out on any received element from `other`
current.onReceive { send(it); true } // resend element from this channel and continue
other.onReceive { false } // bail out on any received element from `other`
current.onReceive { send(it); true } // resend element from this channel and continue
}
}
}
Expand Down Expand Up @@ -1068,6 +1068,7 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker
[consumeEach]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume-each.html
[ReceiveChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/index.html
[ReceiveChannel.cancel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-receive-channel/cancel.html
[consume]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/consume.html
[SendChannel.send]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-send-channel/send.html
[BroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-broadcast-channel/index.html
[ConflatedBroadcastChannel]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.channels/-conflated-broadcast-channel/index.html
Expand All @@ -1078,6 +1079,7 @@ coroutines for complex pipelines with fan-in and fan-out between multiple worker
<!--- INDEX kotlinx.coroutines.reactive -->
[publish]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/kotlinx.coroutines.-coroutine-scope/publish.html
[org.reactivestreams.Publisher.collect]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/collect.html
[org.reactivestreams.Publisher.openSubscription]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-reactive/kotlinx.coroutines.reactive/org.reactivestreams.-publisher/open-subscription.html
<!--- MODULE kotlinx-coroutines-rx2 -->
<!--- INDEX kotlinx.coroutines.rx2 -->
[rxFlowable]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-rx2/kotlinx.coroutines.rx2/kotlinx.coroutines.-coroutine-scope/rx-flowable.html
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,6 @@ fun main() = runBlocking<Unit> {
.doOnSubscribe { println("OnSubscribe") } // provide some insight
.doOnComplete { println("OnComplete") } // ...
.doFinally { println("Finally") } // ... into what's going on
// iterate over the source fully
// collect the source fully
source.collect { println(it) }
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ fun <T, R> Publisher<T>.fusedFilterMap(
predicate: (T) -> Boolean, // the filter predicate
mapper: (T) -> R // the mapper function
) = GlobalScope.publish<R>(context) {
collect { // consume the source stream
collect { // collect the source stream
if (predicate(it)) // filter part
send(mapper(it)) // map part
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ fun <T, U> Publisher<T>.takeUntil(context: CoroutineContext, other: Publisher<U>
other.openSubscription().consume { // explicitly open channel to Publisher<U>
val other = this
whileSelect {
other.onReceive { false } // bail out on any received element from `other`
current.onReceive { send(it); true } // resend element from this channel and continue
other.onReceive { false } // bail out on any received element from `other`
current.onReceive { send(it); true } // resend element from this channel and continue
}
}
}
Expand Down

0 comments on commit c961fb6

Please sign in to comment.