Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Version 1.3.0-RC #1357

Merged
merged 31 commits into from
Jul 19, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
cd5ac0b
Replace hand-rolled arraycopy with stdlib copyInto
LouisCAD Jun 28, 2019
e174298
Fix _size not incremented in CopyOnWriteList add (#1308)
LouisCAD Jul 1, 2019
583d39d
Rewrite assertions so that they are checked only in debug mode
elizarov Jul 2, 2019
b5a8493
Replace unneeded type parameter with projection
LouisCAD Jul 3, 2019
fa9104e
Update versions in Android example project to 3.4.1 tools
elizarov Jul 3, 2019
96a5c8e
Optimize virtual time source so that DefaultTimeSource is not needed
elizarov Jul 2, 2019
db0ef0c
Update copyright year to 2019 in all the knit-generated files
elizarov Jul 3, 2019
9077b01
Cancellation in `Semaphore` should resume the next waiting acquirer i…
ndkoval Jul 5, 2019
ae225bd
Sleep in RunningThreadStackMergeTest that relies on time
qwwdfsad Jul 15, 2019
ace5899
Add distinctUntilChanged operator that uses a comparator function ins…
zach-klippenstein Jun 20, 2019
d100a3f
Reactive scopeless (#1341)
qwwdfsad Jul 16, 2019
f22604b
Recover stacktraces for no-dispatched continuations, so recovery work…
qwwdfsad Jul 16, 2019
63b6e27
Add Flow.onStart, support emit in onCompletion (#1348)
elizarov Jul 17, 2019
693142c
Context passing between coroutines and Reactor Mono/Flux (#1138)
SokolovaMaria Jul 17, 2019
a8904e2
Channel.receiveOrNull becomes extension, internal receiveOrClosed added
qwwdfsad Jul 18, 2019
f8b43e1
Introduce ReceiveChannel.consumeAsFlow and FlowCollector.emitAll(chan)
elizarov Jul 17, 2019
f6387a7
Fuse consumeAsFlow with channel-using flow operators
elizarov Jul 17, 2019
91cc13a
withIndex and collectIndexed operators
qwwdfsad Jul 17, 2019
fe41869
Cancel extensions for CoroutineScope and Job
qwwdfsad Jul 17, 2019
cb7f37b
Treat Duration.ZERO as 0L in jdk8 extensions
qwwdfsad Jul 17, 2019
98a9705
Move event loop infrastructure to common code
elizarov Jul 2, 2019
4809393
Protect event loop data structures from nonmonothonic nanoTime
elizarov Jul 6, 2019
fe4e05c
Module with kotlinx-coroutines BOM for better dependency management
qwwdfsad Jul 19, 2019
2f50363
Properly specify transitive dependencies for kotlinx-coroutines-debug
qwwdfsad Jul 19, 2019
233e74c
Consistently handle fatal exceptions in PublisherCoroutine
qwwdfsad Jun 28, 2019
39f5cf8
Properly handle fatal exceptions in Rx coroutines, get rid of deadloc…
qwwdfsad Jun 28, 2019
97863c3
Report fatal errors to both onError (to comply the spec fully) and to…
qwwdfsad Jul 17, 2019
dc4a474
Stabilize core flow (#1352)
qwwdfsad Jul 19, 2019
023ea30
Merge branch 'master' into develop
qwwdfsad Jul 19, 2019
3d2bf79
Update Kotlin version to 1.3.41
qwwdfsad Jul 19, 2019
70fbd41
Version 1.3.0-RC
qwwdfsad Jul 19, 2019
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
withIndex and collectIndexed operators
Fixes #1247
  • Loading branch information
qwwdfsad committed Jul 18, 2019
commit 91cc13a349ee1fb9ea7aa0bc1c501a7ba0b9cb1c
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun channelFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function2;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun collectIndexed (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function4;)Lkotlinx/coroutines/flow/Flow;
public static final fun combineLatest (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function5;)Lkotlinx/coroutines/flow/Flow;
Expand Down Expand Up @@ -944,6 +945,7 @@ public final class kotlinx/coroutines/flow/FlowKt {
public static final fun unsafeFlow (Lkotlin/jvm/functions/Function2;)Lkotlinx/coroutines/flow/Flow;
public static final fun unsafeTransform (Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
public static final fun withContext (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;Lkotlin/jvm/functions/Function1;)V
public static final fun withIndex (Lkotlinx/coroutines/flow/Flow;)Lkotlinx/coroutines/flow/Flow;
public static final fun zip (Lkotlinx/coroutines/flow/Flow;Lkotlinx/coroutines/flow/Flow;Lkotlin/jvm/functions/Function3;)Lkotlinx/coroutines/flow/Flow;
}

Expand All @@ -962,6 +964,10 @@ public abstract class kotlinx/coroutines/flow/internal/ChannelFlow : kotlinx/cor
public static synthetic fun update$default (Lkotlinx/coroutines/flow/internal/ChannelFlow;Lkotlin/coroutines/CoroutineContext;IILjava/lang/Object;)Lkotlinx/coroutines/flow/internal/ChannelFlow;
}

public final class kotlinx/coroutines/flow/internal/FlowExceptions_commonKt {
public static final fun checkIndexOverflow (I)I
}

public final class kotlinx/coroutines/flow/internal/SafeCollector : kotlinx/coroutines/flow/FlowCollector {
public fun <init> (Lkotlinx/coroutines/flow/FlowCollector;Lkotlin/coroutines/CoroutineContext;)V
public fun emit (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,3 +16,12 @@ internal expect class AbortFlowException() : CancellationException
* Exception used to cancel child of [scopedFlow] without cancelling the whole scope.
*/
internal expect class ChildCancelledException() : CancellationException

@Suppress("NOTHING_TO_INLINE")
@PublishedApi
internal inline fun checkIndexOverflow(index: Int): Int {
if (index < 0) {
throw ArithmeticException("Index overflow has happened")
}
return index
}
1 change: 0 additions & 1 deletion kotlinx-coroutines-core/common/src/flow/operators/Limit.kt
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/


@file:JvmMultifileClass
@file:JvmName("FlowKt")

Expand Down
11 changes: 11 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/operators/Transform.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,17 @@ public inline fun <T, R: Any> Flow<T>.mapNotNull(crossinline transform: suspend
return@transform emit(transformed)
}

/**
* Returns a flow that wraps each element into [IndexedValue], containing value and its index (starting from zero).
*/
@ExperimentalCoroutinesApi
public fun <T> Flow<T>.withIndex(): Flow<IndexedValue<T>> = flow {
var index = 0
collect { value ->
emit(IndexedValue(checkIndexOverflow(index++), value))
}
}

/**
* Returns a flow which performs the given [action] on each value of the original flow.
*/
Expand Down
13 changes: 13 additions & 0 deletions kotlinx-coroutines-core/common/src/flow/terminal/Collect.kt
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,19 @@ public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value
override suspend fun emit(value: T) = action(value)
})

/**
* Terminal flow operator that collects the given flow with a provided [action] that takes the index of an element (zero-based) and the element.
* If any exception occurs during collect or in the provided flow, this exception is rethrown from this method.
*
* See also [collect] and [withIndex].
*/
@ExperimentalCoroutinesApi
public suspend inline fun <T> Flow<T>.collectIndexed(crossinline action: suspend (index: Int, value: T) -> Unit): Unit =
collect(object : FlowCollector<T> {
private var index = 0
override suspend fun emit(value: T) = action(checkIndexOverflow(index++), value)
})

/**
* Collects all the values from the given [flow] and emits them to the collector.
*
Expand Down
45 changes: 45 additions & 0 deletions kotlinx-coroutines-core/common/test/flow/operators/IndexedTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.flow

import kotlinx.coroutines.*
import kotlin.test.*

class IndexedTest : TestBase() {

@Test
fun testWithIndex() = runTest {
val flow = flowOf(3, 2, 1).withIndex()
assertEquals(listOf(IndexedValue(0, 3), IndexedValue(1, 2), IndexedValue(2, 1)), flow.toList())
}

@Test
fun testWithIndexEmpty() = runTest {
val flow = emptyFlow<Int>().withIndex()
assertEquals(emptyList(), flow.toList())
}

@Test
fun testCollectIndexed() = runTest {
val result = ArrayList<IndexedValue<Long>>()
flowOf(3L, 2L, 1L).collectIndexed { index, value ->
result.add(IndexedValue(index, value))
}
assertEquals(listOf(IndexedValue(0, 3L), IndexedValue(1, 2L), IndexedValue(2, 1L)), result)
}

@Test
fun testCollectIndexedEmptyFlow() = runTest {
val flow = flow<Int> {
expect(1)
}

flow.collectIndexed { _, _ ->
expectUnreached()
}

finish(2)
}
}