Skip to content

Commit

Permalink
Flow operators review: docs tweaks and code style fixes in tests
Browse files Browse the repository at this point in the history
  • Loading branch information
elizarov committed Apr 24, 2019
1 parent af2f68e commit a407fd7
Show file tree
Hide file tree
Showing 6 changed files with 73 additions and 51 deletions.
64 changes: 35 additions & 29 deletions kotlinx-coroutines-core/common/src/flow/operators/Delay.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,7 @@ import kotlinx.coroutines.flow.unsafeFlow as flow
@FlowPreview
public fun <T> Flow<T>.delayFlow(timeMillis: Long): Flow<T> = flow {
delay(timeMillis)
collect { value ->
emit(value)
}
collect(this@flow)
}

/**
Expand All @@ -38,26 +36,30 @@ public fun <T> Flow<T>.delayEach(timeMillis: Long): Flow<T> = flow {

/**
* Returns a flow that mirrors the original flow, but filters out values
* that are followed by the newer values within the given [timeout][timeoutMs].
* that are followed by the newer values within the given [timeout][timeoutMillis].
* The latest value is always emitted.
*
* Example:
* ```
* flow {
* emit(1)
* delay(99)
* emit(2)
* delay(99)
* emit(3)
* delay(1001)
* emit(4)
* delay(1001)
* emit(5)
* emit(1)
* delay(99)
* emit(2)
* delay(99)
* emit(3)
* delay(1001)
* emit(4)
* delay(1001)
* emit(5)
* }.debounce(1000)
* ```
* will produce `3, 4, 5`.
* produces `3, 4, 5`.
*
* Note that the resulting flow does not emit anything as long as the original flow emits
* items faster than every [timeoutMillis] milliseconds.
*/
public fun <T> Flow<T>.debounce(timeoutMs: Long): Flow<T> {
require(timeoutMs > 0) { "Debounce timeout should be positive" }
public fun <T> Flow<T>.debounce(timeoutMillis: Long): Flow<T> {
require(timeoutMillis > 0) { "Debounce timeout should be positive" }
return flow {
coroutineScope {
val values = Channel<Any?>(Channel.CONFLATED) // Actually Any, KT-30796
Expand All @@ -79,10 +81,11 @@ public fun <T> Flow<T>.debounce(timeoutMs: Long): Flow<T> {
lastValue = it
}

onTimeout(timeoutMs) {
val value = lastValue ?: return@onTimeout
lastValue = null // Consume the value
emit(NullSurrogate.unbox(value))
lastValue?.let { value -> // set timeout when lastValue != null
onTimeout(timeoutMillis) {
lastValue = null // Consume the value
emit(NullSurrogate.unbox(value))
}
}

// Close with value 'idiom'
Expand All @@ -97,21 +100,23 @@ public fun <T> Flow<T>.debounce(timeoutMs: Long): Flow<T> {
}

/**
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMs].
* Returns a flow that emits only the latest value emitted by the original flow during the given sampling [period][periodMillis].
*
* Example:
* ```
* flow {
* repeat(10) {
* emit(it)
* delay(50)
* }
* repeat(10) {
* emit(it)
* delay(50)
* }
* }.sample(100)
* ```
* will produce `1, 3, 5, 7, 9`.
* produces `1, 3, 5, 7, 9`.
*
* Note that the latest element is not emitted if it does not fit into the sampling window.
*/
public fun <T> Flow<T>.sample(periodMs: Long): Flow<T> {
require(periodMs > 0) { "Sample period should be positive" }
public fun <T> Flow<T>.sample(periodMillis: Long): Flow<T> {
require(periodMillis > 0) { "Sample period should be positive" }
return flow {
coroutineScope {
val values = produce<Any?>(capacity = Channel.CONFLATED) { // Actually Any, KT-30796
Expand All @@ -120,7 +125,7 @@ public fun <T> Flow<T>.sample(periodMs: Long): Flow<T> {

var isDone = false
var lastValue: Any? = null
val ticker = fixedPeriodTicker(periodMs, periodMs)
val ticker = fixedPeriodTicker(periodMillis)
while (!isDone) {
select<Unit> {
values.onReceiveOrNull {
Expand All @@ -132,6 +137,7 @@ public fun <T> Flow<T>.sample(periodMs: Long): Flow<T> {
}
}

// todo: shall be start sampling only when an element arrives or sample aways as here?
ticker.onReceive {
val value = lastValue ?: return@onReceive
lastValue = null // Consume the value
Expand Down
6 changes: 3 additions & 3 deletions kotlinx-coroutines-core/common/src/flow/operators/Merge.kt
Original file line number Diff line number Diff line change
Expand Up @@ -90,8 +90,8 @@ public fun <T> Flow<Flow<T>>.flattenConcat(): Flow<T> = flow {
public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int = 16): Flow<T> = flatMapMerge(concurrency, bufferSize) { it }

/**
* Returns a flow that switches to the new flow produced by [transform] function every time the original flow emits a value.
* When switch on the newer flow is performed, the previous one is cancelled.
* Returns a flow that switches to a new flow produced by [transform] function every time the original flow emits a value.
* When switch on the a flow is performed, the previous one is cancelled.
*
* For example, the following flow:
* ```
Expand All @@ -107,7 +107,7 @@ public fun <T> Flow<Flow<T>>.flattenMerge(concurrency: Int = 16, bufferSize: Int
* }
* }
* ```
* will produce `aa bb b_last`
* produces `aa bb b_last`
*/
@FlowPreview
public fun <T, R> Flow<T>.switchMap(transform: suspend (value: T) -> Flow<R>): Flow<R> = flow {
Expand Down
8 changes: 4 additions & 4 deletions kotlinx-coroutines-core/common/test/flow/VirtualTime.kt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import kotlin.jvm.*
private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineDispatcher(), Delay {

private val originalDispatcher = enclosingScope.coroutineContext[ContinuationInterceptor] as CoroutineDispatcher
private val heap = ArrayList<TimedTask>() // TODO use MPP heap/ordered set implementation
private val heap = ArrayList<TimedTask>() // TODO use MPP heap/ordered set implementation (commonize ThreadSafeHeap)
private var currentTime = 0L

init {
Expand All @@ -21,10 +21,10 @@ private class VirtualTimeDispatcher(enclosingScope: CoroutineScope) : CoroutineD
*/
enclosingScope.launch {
while (true) {
val secret = ThreadLocalEventLoop.currentOrNull()?.processNextEvent()
val delayNanos = ThreadLocalEventLoop.currentOrNull()?.processNextEvent()
?: error("Event loop is missing, virtual time source works only as part of event loop")
if (secret <= 0) continue
if (secret > 0 && secret != Long.MAX_VALUE) error("Unexpected external delay: $secret")
if (delayNanos <= 0) continue
if (delayNanos > 0 && delayNanos != Long.MAX_VALUE) error("Unexpected external delay: $delayNanos")
val nextTask = heap.minBy { it.deadline } ?: return@launch
heap.remove(nextTask)
currentTime = nextTask.deadline
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,44 +16,44 @@ class CombineLatestTest : TestBase() {
fun testCombineLatest() = runTest {
val flow = flowOf("a", "b", "c")
val flow2 = flowOf(1, 2, 3)
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
val list = flow.combineLatest(flow2) { i, j -> i + j }.toList()
assertEquals(listOf("a1", "b1", "b2", "c2", "c3"), list)
}

@Test
fun testNulls() = runTest {
val flow = flowOf("a", null, null)
val flow2 = flowOf(1, 2, 3)
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
assertEquals(listOf("a1", "null1", "null2", "null2", "null3"), list)
}

@Test
fun testNullsOther() = runTest {
val flow = flowOf("a", "b", "c")
val flow2 = flowOf(null, 2, null)
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
val list = flow.combineLatest(flow2, { i, j -> i + j }).toList()
assertEquals(listOf("anull", "bnull", "b2", "c2", "cnull"), list)
}

@Test
fun testEmptyFlow() = runTest {
val flow = emptyFlow<String>().combineLatest(emptyFlow<Int>(), { i, j -> i + j })
val flow = emptyFlow<String>().combineLatest(emptyFlow<Int>(), { i, j -> i + j })
assertNull(flow.singleOrNull())
}

@Test
fun testFirstIsEmpty() = runTest {
val f1 = emptyFlow<String>()
val f2 = flowOf(1)
assertEquals(emptyList(), f1.combineLatest(f2, { i, j -> i + j }).toList())
assertEquals(emptyList(), f1.combineLatest(f2) { i, j -> i + j }.toList())
}

@Test
fun testSecondIsEmpty() = runTest {
val f1 = flowOf("a")
val f2 = emptyFlow<Int>()
assertEquals(emptyList(), f1.combineLatest(f2, { i, j -> i + j }).toList())
assertEquals(emptyList(), f1.combineLatest(f2) { i, j -> i + j }.toList())
}

@Test
Expand All @@ -80,7 +80,7 @@ class CombineLatestTest : TestBase() {
emit(3)
}

val result = f1.combineLatest(f2, { i, j -> i + j }).toList()
val result = f1.combineLatest(f2) { i, j -> i + j }.toList()
assertEquals(listOf("a1", "b1", "c1", "c2", "c3"), result)
finish(8)
}
Expand Down Expand Up @@ -137,7 +137,7 @@ class CombineLatestTest : TestBase() {
}
}

val value = withContext(NamedDispatchers("main")) {
val value = withContext(NamedDispatchers("main")) {
f1.combineLatest(f2) { i, j ->
assertEquals("main", NamedDispatchers.name())
expect(5)
Expand Down Expand Up @@ -170,8 +170,8 @@ class CombineLatestTest : TestBase() {
expect(1)
i + j
}.flowOn(NamedDispatchers("combine")).onEach {
throw TestException()
}
throw TestException()
}

assertFailsWith<TestException>(flow)
finish(4)
Expand Down
16 changes: 16 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/SampleTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,22 @@ class SampleTest : TestBase() {
assertNull(flow.singleOrNull())
}

@Test
// note that this test depends on the sampling strategy -- when sampling time starts on a quiescent flow that suddenly emits
fun testLongWait() = runTest {
val flow = flow {
emit("A")
delay(3500) // long delay -- multiple sampling intervals
emit("B")
delay(900) // crosses time = 4000 barrier
emit("C")
delay(3000) // long wait again

}
val result = flow.sample(1000).toList()
assertEquals(listOf("A", "B", "C"), result)
}

@Test
fun testPace() = withVirtualTime {
val flow = flow {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,11 @@ class SwitchMapTest : TestBase() {
emit(2)
emit(4)
}.flowOn(NamedDispatchers("source")).switchMap { value ->
flow {
assertEquals("switch$value", NamedDispatchers.name())
emit(value)
expect(value)
}.flowOn(NamedDispatchers("switch$value"))
flow {
assertEquals("switch$value", NamedDispatchers.name())
emit(value)
expect(value)
}.flowOn(NamedDispatchers("switch$value"))
}.onEach {
expect(it + 1)
assertEquals("main", NamedDispatchers.nameOr("main"))
Expand Down

0 comments on commit a407fd7

Please sign in to comment.