Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flow operators: #1132

Merged
merged 3 commits into from
Apr 24, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -796,6 +796,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun count (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun debounce (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun delayEach (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun delayFlow (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun distinctUntilChanged (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -833,8 +834,10 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun reduce (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun retry (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;)Lkotlinx/coroutines/flow/Flow;
public static synthetic fun retry$default (Lkotlinx/coroutines/flow/Flow;ILkotlin/jvm/functions/Function1;ILjava/lang/Object;)Lkotlinx/coroutines/flow/Flow;
public static final fun sample (Lkotlinx/coroutines/flow/Flow;J)Lkotlinx/coroutines/flow/Flow;
public static final fun single (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun singleOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun switchMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun take (Lkotlinx/coroutines/flow/Flow;I)Lkotlinx/coroutines/flow/Flow;
public static final fun takeWhile (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun toCollection (Lkotlinx/coroutines/flow/Flow;Ljava/util/Collection;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
1 change: 1 addition & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ bintray_version=1.8.4-jetbrains-5
byte_buddy_version=1.9.3
reactor_vesion=3.2.5.RELEASE
reactive_streams_version=1.0.2
rxjava2_version=2.2.8
artifactory_plugin_version=4.7.3

# JS
Expand Down
127 changes: 126 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@
package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.unsafeFlow as flow
import kotlinx.coroutines.channels.*
import kotlinx.coroutines.flow.internal.*
import kotlinx.coroutines.selects.*
import kotlin.jvm.*
import kotlinx.coroutines.flow.unsafeFlow as flow

/**
* Delays the emission of values from this flow for the given [timeMillis].
Expand All @@ -32,3 +35,125 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {
emit(value)
}
}

/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMs].
* The latest value is always emitted.
* Example:
* ```
* flow {
* emit(1)
elizarov marked this conversation as resolved.
Show resolved Hide resolved
* delay(99)
* emit(2)
* delay(99)
* emit(3)
* delay(1001)
* emit(4)
* delay(1001)
* emit(5)
* }.debounce(1000)
* ```
* will produce `3, 4, 5`.
*/
public fun <T> Flow<T>.debounce(timeoutMs: Long): Flow<T> {
require(timeoutMs > 0) { "Debounce timeout should be positive" }
return flow {
coroutineScope {
val values = Channel<Any>(Channel.CONFLATED)
// Channel is not closed deliberately as there is no close with value
val collector = launch {
try {
collect { value -> values.send(value ?: NullSurrogate) }
} catch (e: Throwable) {
values.close(e) // Workaround for #1130
throw e
}
}

var isDone = false
var lastValue: Any? = null
while (!isDone) {
select<Unit> {
values.onReceive {
lastValue = it
}

onTimeout(timeoutMs) {
elizarov marked this conversation as resolved.
Show resolved Hide resolved
val value = lastValue ?: return@onTimeout
lastValue = null // Consume the value
emit(NullSurrogate.unbox(value))
}

// Close with value 'idiom'
collector.onJoin {
if (lastValue != null) emit(NullSurrogate.unbox(lastValue))
isDone = true
}
}
}
}
}
}

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMs].
* Example:
* ```
* flow {
* repeat(10) {
elizarov marked this conversation as resolved.
Show resolved Hide resolved
* emit(it)
* delay(50)
* }
* }.sample(100)
* ```
* will produce `1, 3, 5, 7, 9`.
* Note that the latest element is not emitted if it does not fit into the sampling window.
*/
public fun <T> Flow<T>.sample(periodMs: Long): Flow<T> {
require(periodMs > 0) { "Sample period should be positive" }
return flow {
coroutineScope {
val values = produce<Any>(capacity = Channel.CONFLATED) {
collect { value -> send(value ?: NullSurrogate) }
}

var isDone = false
var lastValue: Any? = null
val ticker = fixedPeriodTicker(periodMs, periodMs)
while (!isDone) {
select<Unit> {
values.onReceiveOrNull {
if (it == null) {
ticker.cancel()
isDone = true
} else {
lastValue = it
}
}

ticker.onReceive {
val value = lastValue ?: return@onReceive
lastValue = null // Consume the value
emit(NullSurrogate.unbox(value))
}
}
}
}
}
}

/*
* TODO this design (and design of the corresponding operator) depends on #540
*/
internal fun CoroutineScope.fixedPeriodTicker(delayMillis: Long, initialDelayMillis: Long = delayMillis): ReceiveChannel<Unit> {
require(delayMillis >= 0) { "Expected non-negative delay, but has $delayMillis ms" }
require(initialDelayMillis >= 0) { "Expected non-negative initial delay, but has $initialDelayMillis ms" }
return produce(capacity = 0) {
delay(initialDelayMillis)
while (true) {
channel.send(Unit)
delay(delayMillis)
}
}
}
45 changes: 40 additions & 5 deletions kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public fun <T, R> Flow<T>.flatMapMerge(concurrency: Int = 16, bufferSize: Int =
require(bufferSize >= 0) { "Expected non-negative buffer size, but had $bufferSize" }
require(concurrency >= 0) { "Expected non-negative concurrency level, but had $concurrency" }
return flow {
val semaphore = Channel<Unit>(concurrency)
val flatMap = SerializingFlatMapCollector(this, bufferSize)
coroutineScope {
val semaphore = Channel<Unit>(concurrency)
val flatMap = SerializingFlatMapCollector(this@flow, bufferSize)
collect { outerValue ->
// TODO real semaphore (#94)
semaphore.send(Unit) // Acquire concurrency permit
Expand Down Expand Up @@ -89,15 +89,50 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
@FlowPreview
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMapMerge(concurrency, bufferSize) { it }

/**
* Returns a flow that switches to the new flow produced by [transform] function every time the original flow emits a value.
* When switch on the newer flow is performed, the previous one is cancelled.
*
* For example, the following flow:
* ```
* flow {
* emit("a")
* delay(100)
* emit("b")
* }.switchMap { value ->
* flow {
* emit(value + value)
* delay(200)
* emit(value + "_last")
* }
* }
* ```
* will produce `aa bb b_last`
*/
@FlowPreview
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flow {
coroutineScope {
var previousFlow: Job? = null
collect { value ->
// Linearize calls to emit as alternative to the channel. Bonus points for never-overlapping channels.
previousFlow?.cancelAndJoin()
elizarov marked this conversation as resolved.
Show resolved Hide resolved
// Undispatched to have better user experience in case of synchronous flows
previousFlow = launch(start = CoroutineStart.UNDISPATCHED) {
transform(value).collect { innerValue ->
emit(innerValue)
}
}
}
}
}

// Effectively serializes access to downstream collector from flatMap
private class SerializingFlatMapCollector<T>(
private val downstream: FlowCollector<T>,
private val bufferSize: Int
private val downstream: FlowCollector<T>, bufferSize: Int
) {

// Let's try to leverage the fact that flatMapMerge is never contended
// TODO 1.2.1 do not allocate channel
// TODO do not allocate channel
private val channel = Channel<Any?>(bufferSize) // Should be any, but KT-30796
private val inProgressLock = atomic(false)

Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/common/test/TestBase.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ public expect open class TestBase constructor() {
public fun expect(index: Int)
public fun expectUnreached()
public fun finish(index: Int)
public fun ensureFinished() // Ensures that 'finish' was invoked
public fun reset() // Resets counter and finish flag. Workaround for parametrized tests absence in common

public fun runTest(
Expand Down
85 changes: 85 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/VirtualTime.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.coroutines.*
import kotlin.jvm.*

private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay {
qwwdfsad marked this conversation as resolved.
Show resolved Hide resolved

private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher
private val heap = ArrayList<TimedTask>() // TODO use MPP heap/ordered set implementation
private var currentTime = 0L

init {
/*
* Launch "event-loop-owning" task on start of the virtual time event loop.
* It ensures the progress of the enclosing event-loop and polls the timed queue
* when the enclosing event loop is empty, emulating virtual time.
*/
enclosingScope.launch {
while (true) {
val secret = ThreadLocalEventLoop.currentOrNull()?.processNextEvent()
?: error("Event loop is missing, virtual time source works only as part of event loop")
if (secret <= 0) continue
if (secret > 0 && secret != Long.MAX_VALUE) error("Unexpected external delay: $secret")
val nextTask = heap.minBy { it.deadline } ?: return@launch
heap.remove(nextTask)
currentTime = nextTask.deadline
nextTask.run()
}
}
}

private inner class TimedTask(
private val runnable: Runnable,
@JvmField val deadline: Long
) : DisposableHandle, Runnable by runnable {

override fun dispose() {
heap.remove(this)
}
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
originalDispatcher.dispatch(context, block)
}

@ExperimentalCoroutinesApi
override fun isDispatchNeeded(context: CoroutineContext): Boolean = originalDispatcher.isDispatchNeeded(context)

override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
val task = TimedTask(block, currentTime + timeMillis)
heap += task
return task
}

override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val task = TimedTask(Runnable { with(continuation) { resumeUndispatched(Unit) } }, currentTime + timeMillis)
heap += task
continuation.invokeOnCancellation { task.dispose() }
}
}

/**
* Runs a test ([TestBase.runTest]) with a virtual time source.
* This runner has the following constraints:
* 1) It works only in the event-loop environment and it is relying on it.
* None of the coroutines should be launched in any dispatcher different from a current
* 2) Regular tasks always dominate delayed ones. It means that
* `launch { while(true) yield() }` will block the progress of the delayed tasks
* 3) [TestBase.finish] should always be invoked.
* Given all the constraints into account, it is easy to mess up with a test and actually
* return from [withVirtualTime] before the test is executed completely.
* To decrease the probability of such error, additional `finish` constraint is added.
*/
public fun TestBase.withVirtualTime(block: suspend CoroutineScope.() -> Unit) = runTest {
withContext(Dispatchers.Unconfined) {
// Create a platform-independent event loop
val dispatcher = VirtualTimeDispatcher(this)
withContext(dispatcher) { block() }
ensureFinished()
}
}
Loading