Skip to content

Commit

Permalink
Provide CoroutineScope into flowViaChannel block, but make it non-sus…
Browse files Browse the repository at this point in the history
…pending.

    * It allows using flowViaChannel for integration with Java callbacks
    * CoroutineScope is provided to provide a lifecycle object (that is not otherwise accessible)
    * Suspending use-cases are covered with flow builder

Fixes Kotlin#1081
  • Loading branch information
qwwdfsad authored and elizarov committed Apr 24, 2019
1 parent be467e3 commit bb7b3c2
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 9 deletions.
10 changes: 5 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -185,18 +185,17 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
/**
* Creates an instance of the cold [Flow] with elements that are sent to a [SendChannel]
* that is provided to the builder's [block] of code. It allows elements to be
* produced by the code that is running in a different context,
* e.g. from a callback-based API.
* produced by the code that is running in a different context, e.g. from a callback-based API.
*
* The resulting flow is _cold_, which means that [block] is called on each call of a terminal operator
* on the resulting flow.
* on the resulting flow. The [block] is not suspending deliberately, if you need suspending scope, [flow] builder
* should be used instead.
*
* To control backpressure, [bufferSize] is used and matches directly the `capacity` parameter of [Channel] factory.
* The provided channel can later be used by any external service to communicate with flow and its buffer determines
* backpressure buffer size or its behaviour (e.g. in case when [Channel.CONFLATED] was used).
*
* Example of usage:
*
* ```
* fun flowFrom(api: CallbackBasedApi): Flow<T> = flowViaChannel { channel ->
* val callback = object : Callback { // implementation of some callback interface
Expand All @@ -206,6 +205,7 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
* override fun onApiError(cause: Throwable) {
* channel.cancel("API Error", CancellationException(cause))
* }
* override fun onCompleted() = channel.close()
* }
* api.register(callback)
* channel.invokeOnClose {
Expand All @@ -217,7 +217,7 @@ public fun LongRange.asFlow(): Flow<Long> = flow {
@FlowPreview
public fun <T> flowViaChannel(
bufferSize: Int = 16,
@BuilderInference block: suspend (SendChannel<T>) -> Unit
@BuilderInference block: CoroutineScope.(channel: SendChannel<T>) -> Unit
): Flow<T> {
return flow {
coroutineScope {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,20 @@ class FlowViaChannelTest : TestBase() {
@Test
fun testRegular() = runTest {
val flow = flowViaChannel<Int> {
it.send(1)
it.send(2)
assertTrue(it.offer(1))
assertTrue(it.offer(2))
assertTrue(it.offer(3))
it.close()
}
assertEquals(listOf(1, 2, 3), flow.toList())
}

@Test
fun testBuffer() = runTest {
val flow = flowViaChannel<Int>(bufferSize = 1) {
assertTrue(it.offer(1))
assertTrue(it.offer(2))
assertFalse(it.offer(3))
it.close()
}
assertEquals(listOf(1, 2), flow.toList())
Expand All @@ -22,10 +34,51 @@ class FlowViaChannelTest : TestBase() {
@Test
fun testConflated() = runTest {
val flow = flowViaChannel<Int>(bufferSize = Channel.CONFLATED) {
it.send(1)
it.send(2)
assertTrue(it.offer(1))
assertTrue(it.offer(2))
it.close()
}
assertEquals(listOf(1), flow.toList())
}

@Test
fun testFailureCancelsChannel() = runTest {
val flow = flowViaChannel<Int> {
it.offer(1)
it.invokeOnClose {
expect(2)
}
}.onEach { throw TestException() }

expect(1)
assertFailsWith<TestException>(flow)
finish(3)
}

@Test
fun testFailureInSourceCancelsConsumer() = runTest {
val flow = flowViaChannel<Int> {
expect(2)
throw TestException()
}.onEach { expectUnreached() }

expect(1)
assertFailsWith<TestException>(flow)
finish(3)
}

@Test
fun testScopedCancellation() = runTest {
val flow = flowViaChannel<Int> {
expect(2)
launch(start = CoroutineStart.ATOMIC) {
hang { expect(3) }
}
throw TestException()
}.onEach { expectUnreached() }

expect(1)
assertFailsWith<TestException>(flow)
finish(4)
}
}

0 comments on commit bb7b3c2

Please sign in to comment.