diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index 8467611025..111ef7cf9a 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -20,9 +20,7 @@ import kotlinx.coroutines.flow.unsafeFlow as flow @FlowPreview public fun Flow.delayFlow(timeMillis: Long): Flow = flow { delay(timeMillis) - collect { value -> - emit(value) - } + collect(this@flow) } /** @@ -38,26 +36,30 @@ public fun Flow.delayEach(timeMillis: Long): Flow = flow { /** * Returns a flow that mirrors the original flow, but filters out values - * that are followed by the newer values within the given [timeout][timeoutMs]. + * that are followed by the newer values within the given [timeout][timeoutMillis]. * The latest value is always emitted. + * * Example: * ``` * flow { - * emit(1) - * delay(99) - * emit(2) - * delay(99) - * emit(3) - * delay(1001) - * emit(4) - * delay(1001) - * emit(5) + * emit(1) + * delay(99) + * emit(2) + * delay(99) + * emit(3) + * delay(1001) + * emit(4) + * delay(1001) + * emit(5) * }.debounce(1000) * ``` - * will produce `3, 4, 5`. + * produces `3, 4, 5`. + * + * Note that the resulting flow does not emit anything as long as the original flow emits + * items faster than every [timeoutMillis] milliseconds. */ -public fun Flow.debounce(timeoutMs: Long): Flow { - require(timeoutMs > 0) { "Debounce timeout should be positive" } +public fun Flow.debounce(timeoutMillis: Long): Flow { + require(timeoutMillis > 0) { "Debounce timeout should be positive" } return flow { coroutineScope { val values = Channel(Channel.CONFLATED) // Actually Any, KT-30796 @@ -79,10 +81,11 @@ public fun Flow.debounce(timeoutMs: Long): Flow { lastValue = it } - onTimeout(timeoutMs) { - val value = lastValue ?: return@onTimeout - lastValue = null // Consume the value - emit(NullSurrogate.unbox(value)) + lastValue?.let { value -> // set timeout when lastValue != null + onTimeout(timeoutMillis) { + lastValue = null // Consume the value + emit(NullSurrogate.unbox(value)) + } } // Close with value 'idiom' @@ -97,21 +100,23 @@ public fun Flow.debounce(timeoutMs: Long): Flow { } /** - * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMs]. + * Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis]. + * * Example: * ``` * flow { - * repeat(10) { - * emit(it) - * delay(50) - * } + * repeat(10) { + * emit(it) + * delay(50) + * } * }.sample(100) * ``` - * will produce `1, 3, 5, 7, 9`. + * produces `1, 3, 5, 7, 9`. + * * Note that the latest element is not emitted if it does not fit into the sampling window. */ -public fun Flow.sample(periodMs: Long): Flow { - require(periodMs > 0) { "Sample period should be positive" } +public fun Flow.sample(periodMillis: Long): Flow { + require(periodMillis > 0) { "Sample period should be positive" } return flow { coroutineScope { val values = produce(capacity = Channel.CONFLATED) { // Actually Any, KT-30796 @@ -120,7 +125,7 @@ public fun Flow.sample(periodMs: Long): Flow { var isDone = false var lastValue: Any? = null - val ticker = fixedPeriodTicker(periodMs, periodMs) + val ticker = fixedPeriodTicker(periodMillis) while (!isDone) { select { values.onReceiveOrNull { @@ -132,6 +137,7 @@ public fun Flow.sample(periodMs: Long): Flow { } } + // todo: shall be start sampling only when an element arrives or sample aways as here? ticker.onReceive { val value = lastValue ?: return@onReceive lastValue = null // Consume the value diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt index 6f7e9bc32e..0b6625b8bb 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Merge.kt @@ -90,8 +90,8 @@ public fun Flow>.flattenConcat(): Flow = flow { public fun Flow>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow = flatMapMerge(concurrency, bufferSize) { it } /** - * Returns a flow that switches to the new flow produced by [transform] function every time the original flow emits a value. - * When switch on the newer flow is performed, the previous one is cancelled. + * Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value. + * When switch on the a flow is performed, the previous one is cancelled. * * For example, the following flow: * ``` @@ -107,7 +107,7 @@ public fun Flow>.flattenMerge(concurrency: Int = 16, bufferSize: Int * } * } * ``` - * will produce `aa bb b_last` + * produces `aa bb b_last` */ @FlowPreview public fun Flow.switchMap(transform: suspend (value: T) -> Flow): Flow = flow { diff --git a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt index 201daaf498..9b257d933e 100644 --- a/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt +++ b/kotlinx-coroutines-core/common/test/flow/VirtualTime.kt @@ -10,7 +10,7 @@ import kotlin.jvm.* private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay { private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher - private val heap = ArrayList() // TODO use MPP heap/ordered set implementation + private val heap = ArrayList() // TODO use MPP heap/ordered set implementation (commonize ThreadSafeHeap) private var currentTime = 0L init { @@ -21,10 +21,10 @@ private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineD */ enclosingScope.launch { while (true) { - val secret = ThreadLocalEventLoop.currentOrNull()?.processNextEvent() + val delayNanos = ThreadLocalEventLoop.currentOrNull()?.processNextEvent() ?: error("Event loop is missing, virtual time source works only as part of event loop") - if (secret <= 0) continue - if (secret > 0 && secret != Long.MAX_VALUE) error("Unexpected external delay: $secret") + if (delayNanos <= 0) continue + if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) error("Unexpected external delay: $delayNanos") val nextTask = heap.minBy { it.deadline } ?: return@launch heap.remove(nextTask) currentTime = nextTask.deadline diff --git a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt index 5954e211ad..25bb75be57 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/CombineLatestTest.kt @@ -16,7 +16,7 @@ class CombineLatestTest : TestBase() { fun testCombineLatest() = runTest { val flow = flowOf("a", "b", "c") val flow2 = flowOf(1, 2, 3) - val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() + val list = flow.combineLatest(flow2) { i, j -> i + j }.toList() assertEquals(listOf("a1", "b1", "b2", "c2", "c3"), list) } @@ -24,7 +24,7 @@ class CombineLatestTest : TestBase() { fun testNulls() = runTest { val flow = flowOf("a", null, null) val flow2 = flowOf(1, 2, 3) - val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() + val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() assertEquals(listOf("a1", "null1", "null2", "null2", "null3"), list) } @@ -32,13 +32,13 @@ class CombineLatestTest : TestBase() { fun testNullsOther() = runTest { val flow = flowOf("a", "b", "c") val flow2 = flowOf(null, 2, null) - val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() + val list = flow.combineLatest(flow2, { i, j -> i + j }).toList() assertEquals(listOf("anull", "bnull", "b2", "c2", "cnull"), list) } @Test fun testEmptyFlow() = runTest { - val flow = emptyFlow().combineLatest(emptyFlow(), { i, j -> i + j }) + val flow = emptyFlow().combineLatest(emptyFlow(), { i, j -> i + j }) assertNull(flow.singleOrNull()) } @@ -46,14 +46,14 @@ class CombineLatestTest : TestBase() { fun testFirstIsEmpty() = runTest { val f1 = emptyFlow() val f2 = flowOf(1) - assertEquals(emptyList(), f1.combineLatest(f2, { i, j -> i + j }).toList()) + assertEquals(emptyList(), f1.combineLatest(f2) { i, j -> i + j }.toList()) } @Test fun testSecondIsEmpty() = runTest { val f1 = flowOf("a") val f2 = emptyFlow() - assertEquals(emptyList(), f1.combineLatest(f2, { i, j -> i + j }).toList()) + assertEquals(emptyList(), f1.combineLatest(f2) { i, j -> i + j }.toList()) } @Test @@ -80,7 +80,7 @@ class CombineLatestTest : TestBase() { emit(3) } - val result = f1.combineLatest(f2, { i, j -> i + j }).toList() + val result = f1.combineLatest(f2) { i, j -> i + j }.toList() assertEquals(listOf("a1", "b1", "c1", "c2", "c3"), result) finish(8) } @@ -137,7 +137,7 @@ class CombineLatestTest : TestBase() { } } - val value = withContext(NamedDispatchers("main")) { + val value = withContext(NamedDispatchers("main")) { f1.combineLatest(f2) { i, j -> assertEquals("main", NamedDispatchers.name()) expect(5) @@ -170,8 +170,8 @@ class CombineLatestTest : TestBase() { expect(1) i + j }.flowOn(NamedDispatchers("combine")).onEach { - throw TestException() - } + throw TestException() + } assertFailsWith(flow) finish(4) diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt index c9093a0103..814aec669e 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt @@ -129,6 +129,25 @@ class SampleTest : TestBase() { assertNull(flow.singleOrNull()) } + @Test + // note that this test depends on the sampling strategy -- when sampling time starts on a quiescent flow that suddenly emits + fun testLongWait() = withVirtualTime { + expect(1) + val flow = flow { + expect(2) + emit("A") + delay(3500) // long delay -- multiple sampling intervals + emit("B") + delay(900) // crosses time = 4000 barrier + emit("C") + delay(3000) // long wait again + + } + val result = flow.sample(1000).toList() + assertEquals(listOf("A", "B", "C"), result) + finish(3) + } + @Test fun testPace() = withVirtualTime { val flow = flow { diff --git a/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt index 7e29cdb8e6..933bb1628e 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/SwitchMapTest.kt @@ -42,11 +42,11 @@ class SwitchMapTest : TestBase() { emit(2) emit(4) }.flowOn(NamedDispatchers("source")).switchMap { value -> - flow { - assertEquals("switch$value", NamedDispatchers.name()) - emit(value) - expect(value) - }.flowOn(NamedDispatchers("switch$value")) + flow { + assertEquals("switch$value", NamedDispatchers.name()) + emit(value) + expect(value) + }.flowOn(NamedDispatchers("switch$value")) }.onEach { expect(it + 1) assertEquals("main", NamedDispatchers.nameOr("main"))