Skip to content

Commit

Permalink
Flow firstOrNull support (Kotlin#1869)
Browse files Browse the repository at this point in the history
* Flow firstOrNull support

Co-authored-by: Bradyn Poulsen <[email protected]>
  • Loading branch information
qwwdfsad and bradynpoulsen authored Mar 17, 2020
1 parent 1eaa309 commit 6802f7b
Show file tree
Hide file tree
Showing 9 changed files with 128 additions and 26 deletions.
2 changes: 2 additions & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -907,6 +907,8 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun filterNotNull (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun first (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun firstOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun firstOrNull (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun flatMap (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapConcat (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun flatMapLatest (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
Expand Down
37 changes: 36 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/terminal/Reduce.kt
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ public suspend fun <T: Any> Flow<T>.singleOrNull(): T? {
if (result != null) error("Expected only one element")
result = value
}

return result
}

Expand Down Expand Up @@ -120,3 +119,39 @@ public suspend fun <T> Flow<T>.first(predicate: suspend (T) -> Boolean): T {
if (result === NULL) throw NoSuchElementException("Expected at least one element matching the predicate $predicate")
return result as T
}

/**
* The terminal operator that returns the first element emitted by the flow and then cancels flow's collection.
* Returns `null` if the flow was empty.
*/
public suspend fun <T : Any> Flow<T>.firstOrNull(): T? {
var result: T? = null
try {
collect { value ->
result = value
throw AbortFlowException(NopCollector)
}
} catch (e: AbortFlowException) {
// Do nothing
}
return result
}

/**
* The terminal operator that returns the first element emitted by the flow matching the given [predicate] and then cancels flow's collection.
* Returns `null` if the flow did not contain an element matching the [predicate].
*/
public suspend fun <T : Any> Flow<T>.firstOrNull(predicate: suspend (T) -> Boolean): T? {
var result: T? = null
try {
collect { value ->
if (predicate(value)) {
result = value
throw AbortFlowException(NopCollector)
}
}
} catch (e: AbortFlowException) {
// Do nothing
}
return result
}
6 changes: 0 additions & 6 deletions kotlinx-coroutines-core/common/test/AsyncTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -210,12 +210,6 @@ class AsyncTest : TestBase() {
finish(13)
}

class BadClass {
override fun equals(other: Any?): Boolean = error("equals")
override fun hashCode(): Int = error("hashCode")
override fun toString(): String = error("toString")
}

@Test
fun testDeferBadClass() = runTest {
val bad = BadClass()
Expand Down
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/common/test/TestBase.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,8 @@ public fun wrapperDispatcher(context: CoroutineContext): CoroutineContext {

public suspend fun wrapperDispatcher(): CoroutineContext = wrapperDispatcher(coroutineContext)

class BadClass {
override fun equals(other: Any?): Boolean = error("equals")
override fun hashCode(): Int = error("hashCode")
override fun toString(): String = error("toString")
}
6 changes: 0 additions & 6 deletions kotlinx-coroutines-core/common/test/WithTimeoutOrNullTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,6 @@ class WithTimeoutOrNullTest : TestBase() {
assertSame(bad, result)
}

class BadClass {
override fun equals(other: Any?): Boolean = error("Should not be called")
override fun hashCode(): Int = error("Should not be called")
override fun toString(): String = error("Should not be called")
}

@Test
fun testNullOnTimeout() = runTest {
expect(1)
Expand Down
6 changes: 0 additions & 6 deletions kotlinx-coroutines-core/common/test/WithTimeoutTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,6 @@ class WithTimeoutTest : TestBase() {
assertSame(bad, result)
}

class BadClass {
override fun equals(other: Any?): Boolean = error("Should not be called")
override fun hashCode(): Int = error("Should not be called")
override fun toString(): String = error("Should not be called")
}

@Test
fun testExceptionOnTimeout() = runTest {
expect(1)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,12 +241,6 @@ class RendezvousChannelTest : TestBase() {
finish(12)
}

class BadClass {
override fun equals(other: Any?): Boolean = error("equals")
override fun hashCode(): Int = error("hashCode")
override fun toString(): String = error("toString")
}

@Test
fun testProduceBadClass() = runTest {
val bad = BadClass()
Expand Down
77 changes: 77 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/terminal/FirstTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -83,4 +83,81 @@ class FirstTest : TestBase() {
assertEquals(1, flow.first())
finish(2)
}

@Test
fun testFirstOrNull() = runTest {
val flow = flowOf(1, 2, 3)
assertEquals(1, flow.firstOrNull())
}

@Test
fun testFirstOrNullWithPredicate() = runTest {
val flow = flowOf(1, 2, 3)
assertEquals(1, flow.firstOrNull { it > 0 })
assertEquals(2, flow.firstOrNull { it > 1 })
assertNull(flow.firstOrNull { it > 3 })
}

@Test
fun testFirstOrNullCancellation() = runTest {
val latch = Channel<Unit>()
val flow = flow {
coroutineScope {
launch {
latch.send(Unit)
hang { expect(1) }
}
emit(1)
emit(2)
}
}


val result = flow.firstOrNull {
latch.receive()
true
}
assertEquals(1, result)
finish(2)
}

@Test
fun testFirstOrNullWithEmptyFlow() = runTest {
assertNull(emptyFlow<Int>().firstOrNull())
assertNull(emptyFlow<Int>().firstOrNull { true })
}

@Test
fun testFirstOrNullWhenErrorCancelsUpstream() = runTest {
val latch = Channel<Unit>()
val flow = flow {
coroutineScope {
launch {
latch.send(Unit)
hang { expect(1) }
}
emit(1)
}
}

assertFailsWith<TestException> {
flow.firstOrNull {
latch.receive()
throw TestException()
}
}

assertEquals(1, flow.firstOrNull())
finish(2)
}

@Test
fun testBadClass() = runTest {
val instance = BadClass()
val flow = flowOf(instance)
assertSame(instance, flow.first())
assertSame(instance, flow.firstOrNull())
assertSame(instance, flow.first { true })
assertSame(instance, flow.firstOrNull { true })
}
}