Skip to content

Commit

Permalink
Flow operators:
Browse files Browse the repository at this point in the history
    * switchMap
    * debounce
    * sample
    * Update RxJava version to 2.2.8

Partially fixes #1107
  • Loading branch information
qwwdfsad committed Apr 22, 2019
1 parent 1748ce1 commit 9f2e574
Show file tree
Hide file tree
Showing 14 changed files with 848 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun delayFlow (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -833,8 +834,10 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun take (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun takeWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun toCollection (Lkotlinx/coroutines/flow/Flow;Ljava/util/Collection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bintray_version=1.8.4-jetbrains-5
byte_buddy_version=1.9.3
reactor_vesion=3.2.5.RELEASE
reactive_streams_version=1.0.2
rxjava2_version=2.2.8
artifactory_plugin_version=4.7.3

# JS
Expand Down
127 changes: 126 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow

/**
* Delays the emission of values from this flow for the given [timeMillis].
Expand All @@ -32,3 +35,125 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
emit(value)
}
}

/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMs].
* The latest value is always emitted.
* Example:
* ```
* flow {
* emit(1)
* delay(99)
* emit(2)
* delay(99)
* emit(3)
* delay(1001)
* emit(4)
* delay(1001)
* emit(5)
* }.debounce(1000)
* ```
* will produce `3, 4, 5`.
*/
public fun <T> Flow<T>.debounce(timeoutMs: Long): Flow<T> {
require(timeoutMs > 0) { "Debounce timeout should be positive" }
return flow {
coroutineScope {
val values = Channel<Any>(Channel.CONFLATED)
// Channel is not closed deliberately as there is no close with value
val collector = launch {
try {
collect { value -> values.send(value ?: NullSurrogate) }
} catch (e: Throwable) {
values.close(e) // Workaround for #1130
throw e
}
}

var isDone = false
var lastValue: Any? = null
while (!isDone) {
select<Unit> {
values.onReceive {
lastValue = it
}

onTimeout(timeoutMs) {
val value = lastValue ?: return@onTimeout
lastValue = null // Consume the value
emit(NullSurrogate.unbox(value))
}

// Close with value 'idiom'
collector.onJoin {
if (lastValue != null) emit(NullSurrogate.unbox(lastValue))
isDone = true
}
}
}
}
}
}

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMs].
* Example:
* ```
* flow {
* repeat(10) {
* emit(it)
* delay(50)
* }
* }.sample(100)
* ```
* will produce `1, 3, 5, 7, 9`.
* Note that the latest element is not emitted if it does not fit into the sampling window.
*/
public fun <T> Flow<T>.sample(periodMs: Long): Flow<T> {
require(periodMs > 0) { "Sample period should be positive" }
return flow {
coroutineScope {
val values = produce<Any>(capacity = Channel.CONFLATED) {
collect { value -> send(value ?: NullSurrogate) }
}

var isDone = false
var lastValue: Any? = null
val ticker = fixedPeriodTicker(periodMs, periodMs)
while (!isDone) {
select<Unit> {
values.onReceiveOrNull {
if (it == null) {
ticker.cancel()
isDone = true
} else {
lastValue = it
}
}

ticker.onReceive {
val value = lastValue ?: return@onReceive
lastValue = null // Consume the value
emit(NullSurrogate.unbox(value))
}
}
}
}
}
}

/*
* TODO this design (and design of the corresponding operator) depends on #540
*/
internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel<Unit> {
require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
return produce(capacity = 0) {
delay(initialDelayMillis)
while (true) {
channel.send(Unit)
delay(delayMillis)
}
}
}
45 changes: 40 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int =
require(bufferSize >= 0) { "Expected non-negative buffer size, but had $bufferSize" }
require(concurrency >= 0) { "Expected non-negative concurrency level, but had $concurrency" }
return flow {
val semaphore = Channel<Unit>(concurrency)
val flatMap = SerializingFlatMapCollector(this, bufferSize)
coroutineScope {
val semaphore = Channel<Unit>(concurrency)
val flatMap = SerializingFlatMapCollector(this@flow, bufferSize)
collect { outerValue ->
// TODO real semaphore (#94)
semaphore.send(Unit) // Acquire concurrency permit
Expand Down Expand Up @@ -89,15 +89,50 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
@FlowPreview
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMapMerge(concurrency, bufferSize) { it }

/**
* Returns a flow that switches to the new flow produced by [transform] function every time the original flow emits a value.
* When switch on the newer flow is performed, the previous one is cancelled.
*
* For example, the following flow:
* ```
* flow {
* emit("a")
* delay(100)
* emit("b")
* }.switchMap { value ->
* flow {
* emit(value + value)
* delay(200)
* emit(value + "_last")
* }
* }
* ```
* will produce `aa bb b_last`
*/
@FlowPreview
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flow {
coroutineScope {
var previousFlow: Job? = null
collect { value ->
// Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
previousFlow?.cancelAndJoin()
// Undispatched to have better user experience in case of synchronous flows
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
transform(value).collect { innerValue ->
emit(innerValue)
}
}
}
}
}

// Effectively serializes access to downstream collector from flatMap
private class SerializingFlatMapCollector<T>(
private val downstream: FlowCollector<T>,
private val bufferSize: Int
private val downstream: FlowCollector<T>, bufferSize: Int
) {

// Let's try to leverage the fact that flatMapMerge is never contended
// TODO 1.2.1 do not allocate channel
// TODO do not allocate channel
private val channel = Channel<Any?>(bufferSize) // Should be any, but KT-30796
private val inProgressLock = atomic(false)

Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/test/TestBase.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public expect open class TestBase constructor() {
public fun expect(index: Int)
public fun expectUnreached()
public fun finish(index: Int)
public fun ensureFinished() // Ensures that 'finish' was invoked
public fun reset() // Resets counter and finish flag. Workaround for parametrized tests absence in common

public fun runTest(
Expand Down
85 changes: 85 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/VirtualTime.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.coroutines.*
import kotlin.jvm.*

private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay {

private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher
private val heap = ArrayList<TimedTask>() // TODO use MPP heap/ordered set implementation
private var currentTime = 0L

init {
/*
* Launch "event-loop-owning" task on start of the virtual time event loop.
* It ensures the progress of the enclosing event-loop and polls the timed queue
* when the enclosing event loop is empty, emulating virtual time.
*/
enclosingScope.launch {
while (true) {
val secret = ThreadLocalEventLoop.currentOrNull()?.processNextEvent()
?: error("Event loop is missing, virtual time source works only as part of event loop")
if (secret <= 0) continue
if (secret > 0 && secret != Long.MAX_VALUE) error("Unexpected external delay: $secret")
val nextTask = heap.minBy { it.deadline } ?: return@launch
heap.remove(nextTask)
currentTime = nextTask.deadline
nextTask.run()
}
}
}

private inner class TimedTask(
private val runnable: Runnable,
@JvmField val deadline: Long
) : DisposableHandle, Runnable by runnable {

override fun dispose() {
heap.remove(this)
}
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
originalDispatcher.dispatch(context, block)
}

@ExperimentalCoroutinesApi
override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context)

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
val task = TimedTask(block, currentTime + timeMillis)
heap += task
return task
}

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val task = TimedTask(Runnable { with(continuation) { resumeUndispatched(Unit) } }, currentTime + timeMillis)
heap += task
continuation.invokeOnCancellation { task.dispose() }
}
}

/**
* Runs a test ([TestBase.runTest]) with a virtual time source.
* This runner has the following constraints:
* 1) It works only in the event-loop environment and it is relying on it.
* None of the coroutines should be launched in any dispatcher different from a current
* 2) Regular tasks always dominate delayed ones. It means that
* `launch { while(true) yield() }` will block the progress of the delayed tasks
* 3) [TestBase.finish] should always be invoked.
* Given all the constraints into account, it is easy to mess up with a test and actually
* return from [withVirtualTime] before the test is executed completely.
* To decrease the probability of such error, additional `finish` constraint is added.
*/
public fun TestBase.withVirtualTime(block: suspend CoroutineScope.() -> Unit) = runTest {
withContext(Dispatchers.Unconfined) {
// Create a platform-independent event loop
val dispatcher = VirtualTimeDispatcher(this)
withContext(dispatcher) { block() }
ensureFinished()
}
}
Loading

0 comments on commit 9f2e574

Please sign in to comment.