Skip to content

Commit

Permalink
Support context in Flow.asPublisher and similar methods (Kotlin#2156)
Browse files Browse the repository at this point in the history
Fixes Kotlin#2155

Co-authored-by: Vsevolod Tolstopyatov <[email protected]>
  • Loading branch information
elizarov and qwwdfsad authored Aug 10, 2020
1 parent 3cc9b94 commit 1a6beba
Show file tree
Hide file tree
Showing 16 changed files with 562 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ public final class kotlinx/coroutines/jdk9/PublishKt {
public final class kotlinx/coroutines/jdk9/ReactiveFlowKt {
public static final fun asFlow (Ljava/util/concurrent/Flow$Publisher;)Lkotlinx/coroutines/flow/Flow;
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Ljava/util/concurrent/Flow$Publisher;
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Ljava/util/concurrent/Flow$Publisher;
public static synthetic fun asPublisher$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Ljava/util/concurrent/Flow$Publisher;
public static final fun collect (Ljava/util/concurrent/Flow$Publisher;Lkotlin/jvm/functions/Function1;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
}

14 changes: 11 additions & 3 deletions reactive/kotlinx-coroutines-jdk9/src/ReactiveFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@

package kotlinx.coroutines.jdk9

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.asFlow
import kotlinx.coroutines.reactive.asPublisher
import kotlinx.coroutines.reactive.collect
import org.reactivestreams.*
import kotlin.coroutines.*
import java.util.concurrent.Flow as JFlow
import org.reactivestreams.FlowAdapters

/**
* Transforms the given reactive [Publisher] into [Flow].
Expand All @@ -25,9 +27,15 @@ public fun <T : Any> JFlow.Publisher<T>.asFlow(): Flow<T> =

/**
* Transforms the given flow to a reactive specification compliant [Publisher].
*
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
public fun <T : Any> Flow<T>.asPublisher(): JFlow.Publisher<T> {
val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>()
@JvmOverloads // binary compatibility
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): JFlow.Publisher<T> {
val reactivePublisher : org.reactivestreams.Publisher<T> = this.asPublisher<T>(context)
return FlowAdapters.toFlowPublisher(reactivePublisher)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public final class kotlinx/coroutines/reactive/FlowKt {
public final class kotlinx/coroutines/reactive/FlowSubscription : kotlinx/coroutines/AbstractCoroutine, org/reactivestreams/Subscription {
public final field flow Lkotlinx/coroutines/flow/Flow;
public final field subscriber Lorg/reactivestreams/Subscriber;
public fun <init> (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;)V
public fun <init> (Lkotlinx/coroutines/flow/Flow;Lorg/reactivestreams/Subscriber;Lkotlin/coroutines/CoroutineContext;)V
public fun cancel ()V
public fun request (J)V
}
Expand Down Expand Up @@ -65,5 +65,7 @@ public final class kotlinx/coroutines/reactive/PublisherCoroutine : kotlinx/coro
public final class kotlinx/coroutines/reactive/ReactiveFlowKt {
public static final fun asFlow (Lorg/reactivestreams/Publisher;)Lkotlinx/coroutines/flow/Flow;
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;)Lorg/reactivestreams/Publisher;
public static final fun asPublisher (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lorg/reactivestreams/Publisher;
public static synthetic fun asPublisher$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lorg/reactivestreams/Publisher;
}

23 changes: 17 additions & 6 deletions reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,15 @@ public fun <T : Any> Publisher<T>.asFlow(): Flow<T> =
*
* This function is integrated with `ReactorContext` from `kotlinx-coroutines-reactor` module,
* see its documentation for additional details.
*
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
public fun <T : Any> Flow<T>.asPublisher(): Publisher<T> = FlowAsPublisher(this)
@JvmOverloads // binary compatibility
public fun <T : Any> Flow<T>.asPublisher(context: CoroutineContext = EmptyCoroutineContext): Publisher<T> =
FlowAsPublisher(this, Dispatchers.Unconfined + context)

private class PublisherAsFlow<T : Any>(
private val publisher: Publisher<T>,
Expand Down Expand Up @@ -153,20 +160,24 @@ internal fun <T> Publisher<T>.injectCoroutineContext(coroutineContext: Coroutine
* Adapter that transforms [Flow] into TCK-complaint [Publisher].
* [cancel] invocation cancels the original flow.
*/
@Suppress("PublisherImplementation")
private class FlowAsPublisher<T : Any>(private val flow: Flow<T>) : Publisher<T> {
@Suppress("ReactiveStreamsPublisherImplementation")
private class FlowAsPublisher<T : Any>(
private val flow: Flow<T>,
private val context: CoroutineContext
) : Publisher<T> {
override fun subscribe(subscriber: Subscriber<in T>?) {
if (subscriber == null) throw NullPointerException()
subscriber.onSubscribe(FlowSubscription(flow, subscriber))
subscriber.onSubscribe(FlowSubscription(flow, subscriber, context))
}
}

/** @suppress */
@InternalCoroutinesApi
public class FlowSubscription<T>(
@JvmField public val flow: Flow<T>,
@JvmField public val subscriber: Subscriber<in T>
) : Subscription, AbstractCoroutine<Unit>(Dispatchers.Unconfined, true) {
@JvmField public val subscriber: Subscriber<in T>,
context: CoroutineContext
) : Subscription, AbstractCoroutine<Unit>(context, true) {
private val requested = atomic(0L)
private val producer = atomic<Continuation<Unit>?>(createInitialContinuation())

Expand Down
76 changes: 75 additions & 1 deletion reactive/kotlinx-coroutines-reactive/test/FlowAsPublisherTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import org.junit.Test
import org.reactivestreams.*
import java.util.concurrent.*
import kotlin.test.*

class FlowAsPublisherTest : TestBase() {

@Test
fun testErrorOnCancellationIsReported() {
expect(1)
Expand Down Expand Up @@ -75,4 +75,78 @@ class FlowAsPublisherTest : TestBase() {
})
finish(4)
}

@Test
fun testUnconfinedDefaultContext() {
expect(1)
val thread = Thread.currentThread()
fun checkThread() {
assertSame(thread, Thread.currentThread())
}
flowOf(42).asPublisher().subscribe(object : Subscriber<Int> {
private lateinit var subscription: Subscription

override fun onSubscribe(s: Subscription) {
expect(2)
subscription = s
subscription.request(2)
}

override fun onNext(t: Int) {
checkThread()
expect(3)
assertEquals(42, t)
}

override fun onComplete() {
checkThread()
expect(4)
}

override fun onError(t: Throwable?) {
expectUnreached()
}
})
finish(5)
}

@Test
fun testConfinedContext() {
expect(1)
val threadName = "FlowAsPublisherTest.testConfinedContext"
fun checkThread() {
val currentThread = Thread.currentThread()
assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
}
val completed = CountDownLatch(1)
newSingleThreadContext(threadName).use { dispatcher ->
flowOf(42).asPublisher(dispatcher).subscribe(object : Subscriber<Int> {
private lateinit var subscription: Subscription

override fun onSubscribe(s: Subscription) {
expect(2)
subscription = s
subscription.request(2)
}

override fun onNext(t: Int) {
checkThread()
expect(3)
assertEquals(42, t)
}

override fun onComplete() {
checkThread()
expect(4)
completed.countDown()
}

override fun onError(t: Throwable?) {
expectUnreached()
}
})
completed.await()
}
finish(5)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public final class kotlinx/coroutines/reactor/ReactorContextKt {

public final class kotlinx/coroutines/reactor/ReactorFlowKt {
public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;)Lreactor/core/publisher/Flux;
public static final fun asFlux (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lreactor/core/publisher/Flux;
public static synthetic fun asFlux$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lreactor/core/publisher/Flux;
}

public final class kotlinx/coroutines/reactor/SchedulerCoroutineDispatcher : kotlinx/coroutines/CoroutineDispatcher, kotlinx/coroutines/Delay {
Expand Down
19 changes: 16 additions & 3 deletions reactive/kotlinx-coroutines-reactor/src/ReactorFlow.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,25 +4,38 @@

package kotlinx.coroutines.reactor

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.reactive.FlowSubscription
import org.reactivestreams.*
import reactor.core.CoreSubscriber
import reactor.core.publisher.Flux
import kotlin.coroutines.*

/**
* Converts the given flow to a cold flux.
* The original flow is cancelled when the flux subscriber is disposed.
*
* This function is integrated with [ReactorContext], see its documentation for additional details.
*
* An optional [context] can be specified to control the execution context of calls to [Subscriber] methods.
* You can set a [CoroutineDispatcher] to confine them to a specific thread and/or various [ThreadContextElement] to
* inject additional context into the caller thread. By default, the [Unconfined][Dispatchers.Unconfined] dispatcher
* is used, so calls are performed from an arbitrary thread.
*/
public fun <T: Any> Flow<T>.asFlux(): Flux<T> = FlowAsFlux(this)
@JvmOverloads // binary compatibility
public fun <T: Any> Flow<T>.asFlux(context: CoroutineContext = EmptyCoroutineContext): Flux<T> =
FlowAsFlux(this, Dispatchers.Unconfined + context)

private class FlowAsFlux<T : Any>(private val flow: Flow<T>) : Flux<T>() {
private class FlowAsFlux<T : Any>(
private val flow: Flow<T>,
private val context: CoroutineContext
) : Flux<T>() {
override fun subscribe(subscriber: CoreSubscriber<in T>?) {
if (subscriber == null) throw NullPointerException()
val hasContext = !subscriber.currentContext().isEmpty
val source = if (hasContext) flow.flowOn(subscriber.currentContext().asCoroutineContext()) else flow
subscriber.onSubscribe(FlowSubscription(source, subscriber))
subscriber.onSubscribe(FlowSubscription(source, subscriber, context))
}
}
79 changes: 78 additions & 1 deletion reactive/kotlinx-coroutines-reactor/test/FlowAsFluxTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,13 @@ import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.reactive.*
import org.junit.Test
import org.reactivestreams.*
import reactor.core.publisher.*
import reactor.util.context.Context
import java.util.concurrent.*
import kotlin.test.*

@Suppress("ReactiveStreamsSubscriberImplementation")
class FlowAsFluxTest : TestBase() {
@Test
fun testFlowAsFluxContextPropagation() {
Expand Down Expand Up @@ -68,4 +71,78 @@ class FlowAsFluxTest : TestBase() {
}
finish(4)
}
}

@Test
fun testUnconfinedDefaultContext() {
expect(1)
val thread = Thread.currentThread()
fun checkThread() {
assertSame(thread, Thread.currentThread())
}
flowOf(42).asFlux().subscribe(object : Subscriber<Int> {
private lateinit var subscription: Subscription

override fun onSubscribe(s: Subscription) {
expect(2)
subscription = s
subscription.request(2)
}

override fun onNext(t: Int) {
checkThread()
expect(3)
assertEquals(42, t)
}

override fun onComplete() {
checkThread()
expect(4)
}

override fun onError(t: Throwable?) {
expectUnreached()
}
})
finish(5)
}

@Test
fun testConfinedContext() {
expect(1)
val threadName = "FlowAsFluxTest.testConfinedContext"
fun checkThread() {
val currentThread = Thread.currentThread()
assertTrue(currentThread.name.startsWith(threadName), "Unexpected thread $currentThread")
}
val completed = CountDownLatch(1)
newSingleThreadContext(threadName).use { dispatcher ->
flowOf(42).asFlux(dispatcher).subscribe(object : Subscriber<Int> {
private lateinit var subscription: Subscription

override fun onSubscribe(s: Subscription) {
expect(2)
subscription = s
subscription.request(2)
}

override fun onNext(t: Int) {
checkThread()
expect(3)
assertEquals(42, t)
}

override fun onComplete() {
checkThread()
expect(4)
completed.countDown()
}

override fun onError(t: Throwable?) {
expectUnreached()
}
})
completed.await()
}
finish(5)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ public final class kotlinx/coroutines/rx2/RxConvertKt {
public static final fun asSingle (Lkotlinx/coroutines/Deferred;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Single;
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Flowable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;)Lio/reactivex/Observable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Flowable;
public static final fun from (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;)Lio/reactivex/Observable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Flowable;
public static synthetic fun from$default (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/CoroutineContext;ILjava/lang/Object;)Lio/reactivex/Observable;
}

public final class kotlinx/coroutines/rx2/RxFlowableKt {
Expand Down
Loading

0 comments on commit 1a6beba

Please sign in to comment.