diff --git a/CHANGES.md b/CHANGES.md index 9eebc608c9..cf1e73e8d7 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,25 @@ # Change log for kotlinx.coroutines +## Version 1.8.1 + +* Remove the `@ExperimentalTime` annotation from usages of `TimeSource` (#4046). Thanks, @hfhbd! +* Introduce a workaround for an Android bug that caused an occasional `NullPointerException` when setting the `StateFlow` value on old Android devices (#3820). +* No longer use `kotlin.random.Random` as part of `Dispatchers.Default` and `Dispatchers.IO` initialization (#4051). +* `Flow.timeout` throws the exception with which the channel was closed (#4071). +* Small tweaks and documentation fixes. + +### Changelog relative to version 1.8.1-Beta + +* `Flow.timeout` throws the exception with which the channel was closed (#4071). +* Small documentation fixes. + +## Version 1.8.1-Beta + +* Remove the `@ExperimentalTime` annotation from usages of `TimeSource` (#4046). Thanks, @hfhbd! +* Attempt a workaround for an Android bug that caused an occasional `NullPointerException` when setting the `StateFlow` value on old Android devices (#3820). +* No longer use `kotlin.random.Random` as part of `Dispatchers.Default` and `Dispatchers.IO` initialization (#4051). +* Small tweaks. + ## Version 1.8.0 * Implement the library for the Web Assembly (Wasm) for JavaScript (#3713). Thanks @igoriakovlev! diff --git a/README.md b/README.md index 373912bff8..0cf3c199a2 100644 --- a/README.md +++ b/README.md @@ -3,7 +3,7 @@ [![Kotlin Stable](https://kotl.in/badges/stable.svg)](https://kotlinlang.org/docs/components-stability.html) [![JetBrains official project](https://jb.gg/badges/official.svg)](https://confluence.jetbrains.com/display/ALL/JetBrains+on+GitHub) [![GitHub license](https://img.shields.io/badge/license-Apache%20License%202.0-blue.svg?style=flat)](https://www.apache.org/licenses/LICENSE-2.0) -[![Download](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx/kotlinx-coroutines-core/1.8.0)](https://central.sonatype.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core/1.8.0) +[![Download](https://img.shields.io/maven-central/v/org.jetbrains.kotlinx/kotlinx-coroutines-core/1.8.1)](https://central.sonatype.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core/1.8.1) [![Kotlin](https://img.shields.io/badge/kotlin-1.9.21-blue.svg?logo=kotlin)](http://kotlinlang.org) [![Slack channel](https://img.shields.io/badge/chat-slack-green.svg?logo=slack)](https://kotlinlang.slack.com/messages/coroutines/) @@ -85,7 +85,7 @@ Add dependencies (you can also add other modules that you need): org.jetbrains.kotlinx kotlinx-coroutines-core - 1.8.0 + 1.8.1 ``` @@ -103,7 +103,7 @@ Add dependencies (you can also add other modules that you need): ```kotlin dependencies { - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1") } ``` @@ -133,7 +133,7 @@ Add [`kotlinx-coroutines-android`](ui/kotlinx-coroutines-android) module as a dependency when using `kotlinx.coroutines` on Android: ```kotlin -implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.8.0") +implementation("org.jetbrains.kotlinx:kotlinx-coroutines-android:1.8.1") ``` This gives you access to the Android [Dispatchers.Main] @@ -168,7 +168,7 @@ In common code that should get compiled for different platforms, you can add a d ```kotlin commonMain { dependencies { - implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.0") + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1") } } ``` @@ -178,7 +178,7 @@ Platform-specific dependencies are recommended to be used only for non-multiplat #### JS Kotlin/JS version of `kotlinx.coroutines` is published as -[`kotlinx-coroutines-core-js`](https://central.sonatype.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.8.0) +[`kotlinx-coroutines-core-js`](https://central.sonatype.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core-js/1.8.1) (follow the link to get the dependency declaration snippet). #### Native diff --git a/buildSrc/src/main/kotlin/kover-conventions.gradle.kts b/buildSrc/src/main/kotlin/kover-conventions.gradle.kts index 477d60fbb2..d87e132539 100644 --- a/buildSrc/src/main/kotlin/kover-conventions.gradle.kts +++ b/buildSrc/src/main/kotlin/kover-conventions.gradle.kts @@ -37,33 +37,37 @@ subprojects { } } - extensions.configure("koverReport") { - defaults { - html { - setReportDir(conventionProject.layout.buildDirectory.dir("kover/${project.name}/html")) - } + extensions.configure("kover") { + reports { + total { + html { + htmlDir = conventionProject.layout.buildDirectory.dir("kover/${project.name}/html") + } - verify { - rule { - /* - * 85 is our baseline that we aim to raise to 90+. - * Missing coverage is typically due to bugs in the agent - * (e.g. signatures deprecated with an error are counted), - * sometimes it's various diagnostic `toString` or `catch` for OOMs/VerificationErrors, - * but some places are definitely worth visiting. - */ - minBound(expectedCoverage[projectName] ?: 85) // COVERED_LINES_PERCENTAGE + verify { + rule { + /* + * 85 is our baseline that we aim to raise to 90+. + * Missing coverage is typically due to bugs in the agent + * (e.g. signatures deprecated with an error are counted), + * sometimes it's various diagnostic `toString` or `catch` for OOMs/VerificationErrors, + * but some places are definitely worth visiting. + */ + minBound(expectedCoverage[projectName] ?: 85) // COVERED_LINES_PERCENTAGE + } } } } } } -koverReport { - defaults { - verify { - rule { - minBound(85) // COVERED_LINES_PERCENTAGE +kover { + reports { + total { + verify { + rule { + minBound(85) // COVERED_LINES_PERCENTAGE + } } } } diff --git a/docs/topics/channels.md b/docs/topics/channels.md index 3b8b11fbbe..6820f4c93c 100644 --- a/docs/topics/channels.md +++ b/docs/topics/channels.md @@ -19,7 +19,8 @@ fun main() = runBlocking { //sampleStart val channel = Channel() launch { - // this might be heavy CPU-consuming computation or async logic, we'll just send five squares + // this might be heavy CPU-consuming computation or async logic, + // we'll just send five squares for (x in 1..5) channel.send(x * x) } // here we print five received integers: @@ -103,12 +104,12 @@ and an extension function [consumeEach], that replaces a `for` loop on the consu import kotlinx.coroutines.* import kotlinx.coroutines.channels.* +//sampleStart fun CoroutineScope.produceSquares(): ReceiveChannel = produce { for (x in 1..5) send(x * x) } fun main() = runBlocking { -//sampleStart val squares = produceSquares() squares.consumeEach { println(it) } println("Done!") @@ -575,25 +576,25 @@ import kotlinx.coroutines.channels.* //sampleStart fun main() = runBlocking { - val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel + val tickerChannel = ticker(delayMillis = 200, initialDelayMillis = 0) // create a ticker channel var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Initial element is available immediately: $nextElement") // no initial delay - nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay - println("Next element is not ready in 50 ms: $nextElement") + nextElement = withTimeoutOrNull(100) { tickerChannel.receive() } // all subsequent elements have 200ms delay + println("Next element is not ready in 100 ms: $nextElement") - nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } - println("Next element is ready in 100 ms: $nextElement") + nextElement = withTimeoutOrNull(120) { tickerChannel.receive() } + println("Next element is ready in 200 ms: $nextElement") // Emulate large consumption delays - println("Consumer pauses for 150ms") - delay(150) + println("Consumer pauses for 300ms") + delay(300) // Next element is available immediately nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Next element is available immediately after large consumer delay: $nextElement") // Note that the pause between `receive` calls is taken into account and next element arrives faster - nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } - println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") + nextElement = withTimeoutOrNull(120) { tickerChannel.receive() } + println("Next element is ready in 100ms after consumer pause in 300ms: $nextElement") tickerChannel.cancel() // indicate that no more elements are needed } @@ -609,11 +610,11 @@ It prints following lines: ```text Initial element is available immediately: kotlin.Unit -Next element is not ready in 50 ms: null -Next element is ready in 100 ms: kotlin.Unit -Consumer pauses for 150ms +Next element is not ready in 100 ms: null +Next element is ready in 200 ms: kotlin.Unit +Consumer pauses for 300ms Next element is available immediately after large consumer delay: kotlin.Unit -Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit +Next element is ready in 100ms after consumer pause in 300ms: kotlin.Unit ``` diff --git a/docs/topics/coroutine-context-and-dispatchers.md b/docs/topics/coroutine-context-and-dispatchers.md index 969bd21eb9..ffbb364289 100644 --- a/docs/topics/coroutine-context-and-dispatchers.md +++ b/docs/topics/coroutine-context-and-dispatchers.md @@ -227,7 +227,6 @@ import kotlinx.coroutines.* fun log(msg: String) = println("[${Thread.currentThread().name}] $msg") fun main() { -//sampleStart newSingleThreadContext("Ctx1").use { ctx1 -> newSingleThreadContext("Ctx2").use { ctx2 -> runBlocking(ctx1) { @@ -239,7 +238,6 @@ fun main() { } } } -//sampleEnd } ``` @@ -420,14 +418,14 @@ fun main() = runBlocking(CoroutineName("main")) { val v1 = async(CoroutineName("v1coroutine")) { delay(500) log("Computing v1") - 252 + 6 } val v2 = async(CoroutineName("v2coroutine")) { delay(1000) log("Computing v2") - 6 + 7 } - log("The answer for v1 / v2 = ${v1.await() / v2.await()}") + log("The answer for v1 * v2 = ${v1.await() * v2.await()}") //sampleEnd } ``` @@ -443,7 +441,7 @@ The output it produces with `-Dkotlinx.coroutines.debug` JVM option is similar t [main @main#1] Started main coroutine [main @v1coroutine#2] Computing v1 [main @v2coroutine#3] Computing v2 -[main @main#1] The answer for v1 / v2 = 42 +[main @main#1] The answer for v1 * v2 = 42 ``` @@ -504,7 +502,7 @@ class Activity { // to be continued ... ``` -Now, we can launch coroutines in the scope of this `Activity` using the defined `scope`. +Now, we can launch coroutines in the scope of this `Activity` using the defined `mainScope`. For the demo, we launch ten coroutines that delay for a different time: ```kotlin diff --git a/docs/topics/coroutines-and-channels.md b/docs/topics/coroutines-and-channels.md index b60d8982a0..e3cb7e5a42 100644 --- a/docs/topics/coroutines-and-channels.md +++ b/docs/topics/coroutines-and-channels.md @@ -91,7 +91,10 @@ This API is used by the `loadContributorsBlocking()` function to fetch the list 1. Open `src/tasks/Request1Blocking.kt` to see its implementation: ```kotlin - fun loadContributorsBlocking(service: GitHubService, req: RequestData): List { + fun loadContributorsBlocking( + service: GitHubService, + req: RequestData + ): List { val repos = service .getOrgReposCall(req.org) // #1 .execute() // #2 @@ -328,7 +331,8 @@ fun loadContributorsCallbacks( * The logic for handling the responses is extracted into callbacks: the corresponding lambdas start at lines `#1` and `#2`. However, the provided solution doesn't work. If you run the program and load contributors by choosing the _CALLBACKS_ -option, you'll see that nothing is shown. However, the tests that immediately return the result pass. +option, you'll see that nothing is shown. However, the test from `Request3CallbacksKtTest` immediately returns the result +that it successfully passed. Think about why the given code doesn't work as expected and try to fix it, or see the solutions below. @@ -1206,8 +1210,8 @@ When the channel is full, the next `send` call on it is suspended until more fre

The "Rendezvous" channel is a channel without a buffer, the same as a buffered channel with zero size. One of the functions (send() or receive()) is always suspended until the other is called.

-

If the send() function is called and there's no suspended receive call ready to process the element, then send() -is suspended. Similarly, if the receive function is called and the channel is empty or, in other words, there's no +

If the send() function is called and there's no suspended receive() call ready to process the element, then send() +is suspended. Similarly, if the receive() function is called and the channel is empty or, in other words, there's no suspended send() call ready to send the element, the receive() call is suspended.

The "rendezvous" name ("a meeting at an agreed time and place") refers to the fact that send() and receive() should "meet on time".

diff --git a/docs/topics/exception-handling.md b/docs/topics/exception-handling.md index ab1ec6a49a..f79740c95f 100644 --- a/docs/topics/exception-handling.md +++ b/docs/topics/exception-handling.md @@ -60,7 +60,7 @@ The output of this code is (with [debug](https://github.com/Kotlin/kotlinx.corou ```text Throwing exception from launch -Exception in thread "DefaultDispatcher-worker-2 @coroutine#2" java.lang.IndexOutOfBoundsException +Exception in thread "DefaultDispatcher-worker-1 @coroutine#2" java.lang.IndexOutOfBoundsException Joined failed job Throwing exception from async Caught ArithmeticException @@ -73,7 +73,7 @@ Caught ArithmeticException It is possible to customize the default behavior of printing **uncaught** exceptions to the console. [CoroutineExceptionHandler] context element on a _root_ coroutine can be used as a generic `catch` block for this root coroutine and all its children where custom exception handling may take place. -It is similar to [`Thread.uncaughtExceptionHandler`](https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#setUncaughtExceptionHandler(java.lang.Thread.UncaughtExceptionHandler)). +It is similar to [`Thread.uncaughtExceptionHandler`](https://docs.oracle.com/javase/8/docs/api/java/lang/Thread.html#setUncaughtExceptionHandler-java.lang.Thread.UncaughtExceptionHandler-). You cannot recover from the exception in the `CoroutineExceptionHandler`. The coroutine had already completed with the corresponding exception when the handler is called. Normally, the handler is used to log the exception, show some kind of error message, terminate, and/or restart the application. diff --git a/gradle.properties b/gradle.properties index 0aa26b50db..59063db89b 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,5 +1,5 @@ # Kotlin -version=1.8.0-SNAPSHOT +version=1.8.1-SNAPSHOT group=org.jetbrains.kotlinx kotlin_version=1.9.21 # DO NOT rename this property without adapting kotlinx.train build chain: @@ -11,16 +11,17 @@ junit5_version=5.7.0 knit_version=0.5.0 html_version=0.7.2 lincheck_version=2.18.1 -dokka_version=1.9.10 +dokka_version=1.9.20 byte_buddy_version=1.10.9 reactor_version=3.4.1 +reactor_docs_version=3.4.5 reactive_streams_version=1.0.3 rxjava2_version=2.2.8 rxjava3_version=3.0.2 javafx_version=17.0.2 javafx_plugin_version=0.0.8 binary_compatibility_validator_version=0.13.2 -kover_version=0.7.4 +kover_version=0.8.0-Beta blockhound_version=1.0.8.RELEASE jna_version=5.9.0 diff --git a/integration-testing/gradle.properties b/integration-testing/gradle.properties index 40fb266c59..af5497e453 100644 --- a/integration-testing/gradle.properties +++ b/integration-testing/gradle.properties @@ -1,5 +1,5 @@ kotlin_version=1.9.21 -coroutines_version=1.8.0-SNAPSHOT +coroutines_version=1.8.1-SNAPSHOT asm_version=9.3 kotlin.code.style=official diff --git a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api index 19ee3c54a7..2674d739c7 100644 --- a/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api +++ b/kotlinx-coroutines-core/api/kotlinx-coroutines-core.api @@ -87,7 +87,6 @@ public final class kotlinx/coroutines/CancellableContinuationKt { public final class kotlinx/coroutines/ChildContinuation { public final field child Lkotlinx/coroutines/CancellableContinuationImpl; public fun (Lkotlinx/coroutines/CancellableContinuationImpl;)V - public synthetic fun invoke (Ljava/lang/Object;)Ljava/lang/Object; public fun invoke (Ljava/lang/Throwable;)V } @@ -1332,12 +1331,11 @@ public abstract interface class kotlinx/coroutines/selects/SelectClause1 : kotli public abstract interface class kotlinx/coroutines/selects/SelectClause2 : kotlinx/coroutines/selects/SelectClause { } -public class kotlinx/coroutines/selects/SelectImplementation : kotlinx/coroutines/selects/SelectBuilder, kotlinx/coroutines/selects/SelectInstanceInternal { +public class kotlinx/coroutines/selects/SelectImplementation : kotlinx/coroutines/CancelHandler, kotlinx/coroutines/selects/SelectBuilder, kotlinx/coroutines/selects/SelectInstanceInternal { public fun (Lkotlin/coroutines/CoroutineContext;)V public fun disposeOnCompletion (Lkotlinx/coroutines/DisposableHandle;)V public fun doSelect (Lkotlin/coroutines/Continuation;)Ljava/lang/Object; public fun getContext ()Lkotlin/coroutines/CoroutineContext; - public synthetic fun invoke (Ljava/lang/Object;)Ljava/lang/Object; public fun invoke (Ljava/lang/Throwable;)V public fun invoke (Lkotlinx/coroutines/selects/SelectClause0;Lkotlin/jvm/functions/Function1;)V public fun invoke (Lkotlinx/coroutines/selects/SelectClause1;Lkotlin/jvm/functions/Function2;)V diff --git a/kotlinx-coroutines-core/build.gradle.kts b/kotlinx-coroutines-core/build.gradle.kts index 9efc7847c9..0c6f65cabd 100644 --- a/kotlinx-coroutines-core/build.gradle.kts +++ b/kotlinx-coroutines-core/build.gradle.kts @@ -272,31 +272,31 @@ val check by tasks.getting { } kover { - excludeTests { - // Always disabled, lincheck doesn't really support coverage - tasks("jvmLincheckTest") - } + currentProject { + instrumentation { + // Always disabled, lincheck doesn't really support coverage + disabledForTestTasks.addAll("jvmLincheckTest") - excludeInstrumentation { - // lincheck has NPE error on `ManagedStrategyStateHolder` class - classes("org.jetbrains.kotlinx.lincheck.*") + // lincheck has NPE error on `ManagedStrategyStateHolder` class + excludedClasses.addAll("org.jetbrains.kotlinx.lincheck.*") + } } -} -koverReport { - filters { - excludes { - classes( - "kotlinx.coroutines.debug.*", // Tested by debug module - "kotlinx.coroutines.channels.ChannelsKt__DeprecatedKt*", // Deprecated - "kotlinx.coroutines.scheduling.LimitingDispatcher", // Deprecated - "kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher", // Deprecated - "kotlinx.coroutines.flow.FlowKt__MigrationKt*", // Migrations - "kotlinx.coroutines.flow.LintKt*", // Migrations - "kotlinx.coroutines.internal.WeakMapCtorCache", // Fallback implementation that we never test - "_COROUTINE._CREATION", // For IDE navigation - "_COROUTINE._BOUNDARY", // For IDE navigation - ) + reports { + filters { + excludes { + classes( + "kotlinx.coroutines.debug.*", // Tested by debug module + "kotlinx.coroutines.channels.ChannelsKt__DeprecatedKt*", // Deprecated + "kotlinx.coroutines.scheduling.LimitingDispatcher", // Deprecated + "kotlinx.coroutines.scheduling.ExperimentalCoroutineDispatcher", // Deprecated + "kotlinx.coroutines.flow.FlowKt__MigrationKt*", // Migrations + "kotlinx.coroutines.flow.LintKt*", // Migrations + "kotlinx.coroutines.internal.WeakMapCtorCache", // Fallback implementation that we never test + "_COROUTINE._CREATION", // For IDE navigation + "_COROUTINE._BOUNDARY", // For IDE navigation + ) + } } } } diff --git a/kotlinx-coroutines-core/common/src/Await.kt b/kotlinx-coroutines-core/common/src/Await.kt index 1cc197fc65..b8a76f1b15 100644 --- a/kotlinx-coroutines-core/common/src/Await.kt +++ b/kotlinx-coroutines-core/common/src/Await.kt @@ -67,7 +67,7 @@ private class AwaitAll(private val deferreds: Array>) { val deferred = deferreds[i] deferred.start() // To properly await lazily started deferreds AwaitAllNode(cont).apply { - handle = deferred.invokeOnCompletion(asHandler) + handle = deferred.invokeOnCompletion(handler = this) } } val disposer = DisposeHandlersOnCancel(nodes) @@ -79,11 +79,11 @@ private class AwaitAll(private val deferreds: Array>) { // it is already complete while handlers were being installed -- dispose them all disposer.disposeAll() } else { - cont.invokeOnCancellation(handler = disposer.asHandler) + cont.invokeOnCancellation(handler = disposer) } } - private inner class DisposeHandlersOnCancel(private val nodes: Array) : CancelHandler() { + private inner class DisposeHandlersOnCancel(private val nodes: Array) : CancelHandler { fun disposeAll() { nodes.forEach { it.handle.dispose() } } diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt index e4bca096ab..4e3fc8a7f2 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuation.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuation.kt @@ -122,28 +122,27 @@ public interface CancellableContinuation : Continuation { /** * Registers a [handler] to be **synchronously** invoked on [cancellation][cancel] (regular or exceptional) of this continuation. - * When the continuation is already cancelled, the handler is immediately invoked - * with the cancellation exception. Otherwise, the handler will be invoked as soon as this - * continuation is cancelled. + * When the continuation is already cancelled, the handler is immediately invoked with the cancellation exception. + * Otherwise, the handler will be invoked as soon as this continuation is cancelled. * * The installed [handler] should not throw any exceptions. * If it does, they will get caught, wrapped into a [CompletionHandlerException] and * processed as an uncaught exception in the context of the current coroutine * (see [CoroutineExceptionHandler]). * - * At most one [handler] can be installed on a continuation. Attempt to call `invokeOnCancellation` second - * time produces [IllegalStateException]. + * At most one [handler] can be installed on a continuation. + * Attempting to call `invokeOnCancellation` a second time produces an [IllegalStateException]. * * This handler is also called when this continuation [resumes][Continuation.resume] normally (with a value) and then * is cancelled while waiting to be dispatched. More generally speaking, this handler is called whenever * the caller of [suspendCancellableCoroutine] is getting a [CancellationException]. * - * A typical example for `invokeOnCancellation` usage is given in + * A typical example of `invokeOnCancellation` usage is given in * the documentation for the [suspendCancellableCoroutine] function. * - * **Note**: Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe. - * This `handler` can be invoked concurrently with the surrounding code. - * There is no guarantee on the execution context in which the `handler` will be invoked. + * **Note**: Implementations of [CompletionHandler] must be fast, non-blocking, and thread-safe. + * This [handler] can be invoked concurrently with the surrounding code. + * There is no guarantee on the execution context in which the [handler] will be invoked. */ public fun invokeOnCancellation(handler: CompletionHandler) @@ -201,6 +200,15 @@ public interface CancellableContinuation : Continuation { public fun resume(value: T, onCancellation: ((cause: Throwable) -> Unit)?) } +/** + * A version of `invokeOnCancellation` that accepts a class as a handler instead of a lambda, but identical otherwise. + * This allows providing a custom [toString] instance that will look better during debugging. + */ +internal fun CancellableContinuation.invokeOnCancellation(handler: CancelHandler) = when (this) { + is CancellableContinuationImpl -> invokeOnCancellationInternal(handler) + else -> throw UnsupportedOperationException("third-party implementation of CancellableContinuation is not supported") +} + /** * Suspends the coroutine like [suspendCoroutine], but providing a [CancellableContinuation] to * the [block]. This function throws a [CancellationException] if the [Job] of the coroutine is @@ -373,9 +381,9 @@ internal fun getOrCreateCancellableContinuation(delegate: Continuation): */ @InternalCoroutinesApi public fun CancellableContinuation<*>.disposeOnCancellation(handle: DisposableHandle): Unit = - invokeOnCancellation(handler = DisposeOnCancel(handle).asHandler) + invokeOnCancellation(handler = DisposeOnCancel(handle)) -private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHandler() { +private class DisposeOnCancel(private val handle: DisposableHandle) : CancelHandler { override fun invoke(cause: Throwable?) = handle.dispose() override fun toString(): String = "DisposeOnCancel[$handle]" } diff --git a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt index 2b03903272..b7c6111303 100644 --- a/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt +++ b/kotlinx-coroutines-core/common/src/CancellableContinuationImpl.kt @@ -234,12 +234,12 @@ internal open class CancellableContinuationImpl( } } - private fun callCancelHandler(handler: CompletionHandler, cause: Throwable?) = + private fun callCancelHandler(handler: InternalCompletionHandler, cause: Throwable?) = /* * :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, * because we play type tricks on Kotlin/JS and handler is not necessarily a function there */ - callCancelHandlerSafely { handler.invokeIt(cause) } + callCancelHandlerSafely { handler.invoke(cause) } fun callCancelHandler(handler: CancelHandler, cause: Throwable?) = callCancelHandlerSafely { handler.invoke(cause) } @@ -343,7 +343,7 @@ internal open class CancellableContinuationImpl( // Install the handle val handle = parent.invokeOnCompletion( onCancelling = true, - handler = ChildContinuation(this).asHandler + handler = ChildContinuation(this) ) _parentHandle.compareAndSet(null, handle) return handle @@ -390,10 +390,9 @@ internal open class CancellableContinuationImpl( invokeOnCancellationImpl(segment) } - public override fun invokeOnCancellation(handler: CompletionHandler) { - val cancelHandler = makeCancelHandler(handler) - invokeOnCancellationImpl(cancelHandler) - } + override fun invokeOnCancellation(handler: CompletionHandler) = invokeOnCancellation(CancelHandler.UserSupplied(handler)) + + internal fun invokeOnCancellationInternal(handler: CancelHandler) = invokeOnCancellationImpl(handler) private fun invokeOnCancellationImpl(handler: Any) { assert { handler is CancelHandler || handler is Segment<*> } @@ -461,9 +460,6 @@ internal open class CancellableContinuationImpl( error("It's prohibited to register multiple handlers, tried to register $handler, already has $state") } - private fun makeCancelHandler(handler: CompletionHandler): CancelHandler = - if (handler is CancelHandler) handler else InvokeOnCancel(handler) - private fun dispatchResume(mode: Int) { if (tryResume()) return // completed before getResult invocation -- bail out // otherwise, getResult has already commenced, i.e. completed later or in other thread @@ -625,19 +621,46 @@ private object Active : NotCompleted { } /** - * Base class for all [CancellableContinuation.invokeOnCancellation] handlers to avoid an extra instance - * on JVM, yet support JS where you cannot extend from a functional type. + * Essentially the same as just a function from `Throwable?` to `Unit`. + * The only thing implementors can do is call [invoke]. + * The reason this abstraction exists is to allow providing a readable [toString] in the list of completion handlers + * as seen from the debugger. + * Use [UserSupplied] to create an instance from a lambda. + * We can't avoid defining a separate type, because on JS, you can't inherit from a function type. + * + * @see InternalCompletionHandler for a very similar interface, but used for handling completion and not cancellation. */ -internal abstract class CancelHandler : CancelHandlerBase(), NotCompleted - -// Wrapper for lambdas, for the performance sake CancelHandler can be subclassed directly -private class InvokeOnCancel( // Clashes with InvokeOnCancellation - private val handler: CompletionHandler -) : CancelHandler() { - override fun invoke(cause: Throwable?) { - handler.invoke(cause) +internal interface CancelHandler : NotCompleted { + /** + * Signals cancellation. + * + * This function: + * - Does not throw any exceptions. + * Violating this rule in an implementation leads to [handleUncaughtCoroutineException] being called with a + * [CompletionHandlerException] wrapping the thrown exception. + * - Is fast, non-blocking, and thread-safe. + * - Can be invoked concurrently with the surrounding code. + * - Can be invoked from any context. + * + * The meaning of `cause` that is passed to the handler is: + * - It is `null` if the continuation was cancelled directly via [CancellableContinuation.cancel] without a `cause`. + * - It is an instance of [CancellationException] if the continuation was _normally_ cancelled from the outside. + * **It should not be treated as an error**. In particular, it should not be reported to error logs. + * - Otherwise, the continuation had cancelled with an _error_. + */ + fun invoke(cause: Throwable?) + + /** + * A lambda passed from outside the coroutine machinery. + * + * See the requirements for [CancelHandler.invoke] when implementing this function. + */ + class UserSupplied(private val handler: (cause: Throwable?) -> Unit) : CancelHandler { + /** @suppress */ + override fun invoke(cause: Throwable?) { handler(cause) } + + override fun toString() = "CancelHandler.UserSupplied[${handler.classSimpleName}@$hexAddress]" } - override fun toString() = "InvokeOnCancel[${handler.classSimpleName}@$hexAddress]" } // Completed with additional metadata diff --git a/kotlinx-coroutines-core/common/src/CompletionHandler.common.kt b/kotlinx-coroutines-core/common/src/CompletionHandler.common.kt index 54c5a936ac..0a0176e729 100644 --- a/kotlinx-coroutines-core/common/src/CompletionHandler.common.kt +++ b/kotlinx-coroutines-core/common/src/CompletionHandler.common.kt @@ -1,43 +1,71 @@ package kotlinx.coroutines -import kotlinx.coroutines.internal.* - /** * Handler for [Job.invokeOnCompletion] and [CancellableContinuation.invokeOnCancellation]. * - * Installed handler should not throw any exceptions. If it does, they will get caught, - * wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code. - * - * The meaning of `cause` that is passed to the handler: - * - Cause is `null` when the job has completed normally. - * - Cause is an instance of [CancellationException] when the job was cancelled _normally_. + * The meaning of `cause` that is passed to the handler is: + * - It is `null` if the job has completed normally or the continuation was cancelled without a `cause`. + * - It is an instance of [CancellationException] if the job or the continuation was cancelled _normally_. * **It should not be treated as an error**. In particular, it should not be reported to error logs. - * - Otherwise, the job had _failed_. + * - Otherwise, the job or the continuation had _failed_. + * + * A function used for this should not throw any exceptions. + * If it does, they will get caught, wrapped into [CompletionHandlerException], and then either + * - passed to [handleCoroutineException] for [CancellableContinuation.invokeOnCancellation] + * and, for [Job] instances that are coroutines, [Job.invokeOnCompletion], or + * - for [Job] instances that are not coroutines, simply thrown, potentially crashing unrelated code. + * + * Functions used for this must be fast, non-blocking, and thread-safe. + * This handler can be invoked concurrently with the surrounding code. + * There is no guarantee on the execution context in which the function is invoked. * * **Note**: This type is a part of internal machinery that supports parent-child hierarchies * and allows for implementation of suspending functions that wait on the Job's state. * This type should not be used in general application code. - * Implementations of `CompletionHandler` must be fast and _lock-free_. */ +// TODO: deprecate. This doesn't seem better than a simple function type. public typealias CompletionHandler = (cause: Throwable?) -> Unit -// We want class that extends LockFreeLinkedListNode & CompletionHandler but we cannot do it on Kotlin/JS, -// so this expect class provides us with the corresponding abstraction in a platform-agnostic way. -internal expect abstract class CompletionHandlerBase() : LockFreeLinkedListNode { - abstract fun invoke(cause: Throwable?) -} +/** + * Essentially the same as just a function from `Throwable?` to `Unit`. + * The only thing implementors can do is call [invoke]. + * The reason this abstraction exists is to allow providing a readable [toString] in the list of completion handlers + * as seen from the debugger. + * Use [UserSupplied] to create an instance from a lambda. + * We can't avoid defining a separate type, because on JS, you can't inherit from a function type. + * + * @see CancelHandler for a very similar interface, but used for handling cancellation and not completion. + */ +internal interface InternalCompletionHandler { + /** + * Signals completion. + * + * This function: + * - Does not throw any exceptions. + * For [Job] instances that are coroutines, exceptions thrown by this function will be caught, wrapped into + * [CompletionHandlerException], and passed to [handleCoroutineException], but for those that are not coroutines, + * they will just be rethrown, potentially crashing unrelated code. + * - Is fast, non-blocking, and thread-safe. + * - Can be invoked concurrently with the surrounding code. + * - Can be invoked from any context. + * + * The meaning of `cause` that is passed to the handler is: + * - It is `null` if the job has completed normally. + * - It is an instance of [CancellationException] if the job was cancelled _normally_. + * **It should not be treated as an error**. In particular, it should not be reported to error logs. + * - Otherwise, the job had _failed_. + */ + fun invoke(cause: Throwable?) -internal expect val CompletionHandlerBase.asHandler: CompletionHandler + /** + * A lambda passed from outside the coroutine machinery. + * + * See the requirements for [InternalCompletionHandler.invoke] when implementing this function. + */ + class UserSupplied(private val handler: (cause: Throwable?) -> Unit) : InternalCompletionHandler { + /** @suppress */ + override fun invoke(cause: Throwable?) { handler(cause) } -// More compact version of CompletionHandlerBase for CancellableContinuation with same workaround for JS -internal expect abstract class CancelHandlerBase() { - abstract fun invoke(cause: Throwable?) + override fun toString() = "InternalCompletionHandler.UserSupplied[${handler.classSimpleName}@$hexAddress]" + } } - -internal expect val CancelHandlerBase.asHandler: CompletionHandler - -// :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, -// because we play type tricks on Kotlin/JS and handler is not necessarily a function there -internal expect fun CompletionHandler.invokeIt(cause: Throwable?) - -internal inline fun CompletionHandler.isHandlerOf(): Boolean = this is T diff --git a/kotlinx-coroutines-core/common/src/Exceptions.common.kt b/kotlinx-coroutines-core/common/src/Exceptions.common.kt index e19c36f273..ed20d02217 100644 --- a/kotlinx-coroutines-core/common/src/Exceptions.common.kt +++ b/kotlinx-coroutines-core/common/src/Exceptions.common.kt @@ -1,7 +1,7 @@ package kotlinx.coroutines /** - * This exception gets thrown if an exception is caught while processing [CompletionHandler] invocation for [Job]. + * This exception gets thrown if an exception is caught while processing [InternalCompletionHandler] invocation for [Job]. * * @suppress **This an internal API and should not be used from general code.** */ diff --git a/kotlinx-coroutines-core/common/src/Job.kt b/kotlinx-coroutines-core/common/src/Job.kt index 817ac4d607..e9a935c5a3 100644 --- a/kotlinx-coroutines-core/common/src/Job.kt +++ b/kotlinx-coroutines-core/common/src/Job.kt @@ -310,44 +310,8 @@ public interface Job : CoroutineContext.Element { public fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle /** - * Registers handler that is **synchronously** invoked once on cancellation or completion of this job. - * when the job was already cancelled and is completed its execution, then the handler is immediately invoked - * with the job's cancellation cause or `null` unless [invokeImmediately] is set to false. - * Otherwise, handler will be invoked once when this job is cancelled or is complete. - * - * The meaning of `cause` that is passed to the handler: - * - Cause is `null` when the job has completed normally. - * - Cause is an instance of [CancellationException] when the job was cancelled _normally_. - * **It should not be treated as an error**. In particular, it should not be reported to error logs. - * - Otherwise, the job had _failed_. - * - * Invocation of this handler on a transition to a _cancelling_ state - * is controlled by [onCancelling] boolean parameter. - * The handler is invoked when the job becomes _cancelling_ if [onCancelling] parameter is set to `true`. - * - * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] the - * registration of this handler and release its memory if its invocation is no longer needed. - * There is no need to dispose the handler after completion of this job. The references to - * all the handlers are released when this job completes. - * - * Installed [handler] should not throw any exceptions. If it does, they will get caught, - * wrapped into [CompletionHandlerException], and rethrown, potentially causing crash of unrelated code. - * - * **Note**: This function is a part of internal machinery that supports parent-child hierarchies - * and allows for implementation of suspending functions that wait on the Job's state. - * This function should not be used in general application code. - * Implementation of `CompletionHandler` must be fast, non-blocking, and thread-safe. - * This handler can be invoked concurrently with the surrounding code. - * There is no guarantee on the execution context in which the [handler] is invoked. - * - * @param onCancelling when `true`, then the [handler] is invoked as soon as this job transitions to _cancelling_ state; - * when `false` then the [handler] is invoked only when it transitions to _completed_ state. - * @param invokeImmediately when `true` and this job is already in the desired state (depending on [onCancelling]), - * then the [handler] is immediately and synchronously invoked and no-op [DisposableHandle] is returned; - * when `false` then no-op [DisposableHandle] is returned, but the [handler] is not invoked. - * @param handler the handler. - * - * @suppress **This an internal API and should not be used from general code.** + * Kept for preserving compatibility. Shouldn't be used by anyone. + * @suppress */ @InternalCoroutinesApi public fun invokeOnCompletion( @@ -370,6 +334,36 @@ public interface Job : CoroutineContext.Element { public operator fun plus(other: Job): Job = other } +/** + * Registers a handler that is **synchronously** invoked once on cancellation or completion of this job. + * + * If the handler would have been invoked earlier if it was registered at that time, then it is invoked immediately, + * unless [invokeImmediately] is set to `false`. + * + * The handler is scheduled to be invoked once the job is cancelled or is complete. + * This behavior can be changed by setting the [onCancelling] parameter to `true`. + * In this case, the handler is invoked as soon as the job becomes _cancelling_ instead. + * + * The meaning of `cause` that is passed to the handler is: + * - It is `null` if the job has completed normally. + * - It is an instance of [CancellationException] if the job was cancelled _normally_. + * **It should not be treated as an error**. In particular, it should not be reported to error logs. + * - Otherwise, the job had _failed_. + * + * The resulting [DisposableHandle] can be used to [dispose][DisposableHandle.dispose] of the registration of this + * handler and release its memory if its invocation is no longer needed. + * There is no need to dispose of the handler after completion of this job. The references to + * all the handlers are released when this job completes. + */ +internal fun Job.invokeOnCompletion( + onCancelling: Boolean = false, + invokeImmediately: Boolean = true, + handler: InternalCompletionHandler +): DisposableHandle = when (this) { + is JobSupport -> invokeOnCompletionInternal(onCancelling, invokeImmediately, handler) + else -> invokeOnCompletion(onCancelling, invokeImmediately, handler::invoke) +} + /** * Creates a job object in an active state. * A failure of any child of this job immediately causes this job to fail, too, and cancels the rest of its children. @@ -487,7 +481,7 @@ public interface ChildHandle : DisposableHandle { * ``` */ internal fun Job.disposeOnCompletion(handle: DisposableHandle): DisposableHandle = - invokeOnCompletion(handler = DisposeOnCompletion(handle).asHandler) + invokeOnCompletion(handler = DisposeOnCompletion(handle)) /** * Cancels the job and suspends the invoking coroutine until the cancelled job is complete. diff --git a/kotlinx-coroutines-core/common/src/JobSupport.kt b/kotlinx-coroutines-core/common/src/JobSupport.kt index 2fc526a1bb..a4a5a2957c 100644 --- a/kotlinx-coroutines-core/common/src/JobSupport.kt +++ b/kotlinx-coroutines-core/common/src/JobSupport.kt @@ -443,14 +443,24 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren protected val completionCauseHandled: Boolean get() = state.let { it is CompletedExceptionally && it.handled } - @Suppress("OverridingDeprecatedMember") public final override fun invokeOnCompletion(handler: CompletionHandler): DisposableHandle = - invokeOnCompletion(onCancelling = false, invokeImmediately = true, handler = handler) + invokeOnCompletionInternal( + onCancelling = false, + invokeImmediately = true, + handler = InternalCompletionHandler.UserSupplied(handler) + ) + + public final override fun invokeOnCompletion(onCancelling: Boolean, invokeImmediately: Boolean, handler: CompletionHandler): DisposableHandle = + invokeOnCompletionInternal( + onCancelling = onCancelling, + invokeImmediately = invokeImmediately, + handler = InternalCompletionHandler.UserSupplied(handler) + ) - public final override fun invokeOnCompletion( + internal fun invokeOnCompletionInternal( onCancelling: Boolean, invokeImmediately: Boolean, - handler: CompletionHandler + handler: InternalCompletionHandler ): DisposableHandle { // Create node upfront -- for common cases it just initializes JobNode.job field, // for user-defined handlers it allocates a JobNode object that we might not need, but this is Ok. @@ -477,7 +487,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren rootCause = state.rootCause // != null if cancelling job // We add node to the list in two cases --- either the job is not being cancelled // or we are adding a child to a coroutine that is not completing yet - if (rootCause == null || handler.isHandlerOf() && !state.isCompleting) { + if (rootCause == null || handler is ChildHandleNode && !state.isCompleting) { // Note: add node the list while holding lock on state (make sure it cannot change) if (!addLastAtomic(state, list, node)) return@loopOnState // retry // just return node if we don't have to invoke handler (not cancelling yet) @@ -489,7 +499,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } if (rootCause != null) { // Note: attachChild uses invokeImmediately, so it gets invoked when adding to cancelled job - if (invokeImmediately) handler.invokeIt(rootCause) + if (invokeImmediately) handler.invoke(rootCause) return handle } else { if (addLastAtomic(state, list, node)) return node @@ -499,14 +509,14 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren else -> { // is complete // :KLUDGE: We have to invoke a handler in platform-specific way via `invokeIt` extension, // because we play type tricks on Kotlin/JS and handler is not necessarily a function there - if (invokeImmediately) handler.invokeIt((state as? CompletedExceptionally)?.cause) + if (invokeImmediately) handler.invoke((state as? CompletedExceptionally)?.cause) return NonDisposableHandle } } } } - private fun makeNode(handler: CompletionHandler, onCancelling: Boolean): JobNode { + private fun makeNode(handler: InternalCompletionHandler, onCancelling: Boolean): JobNode { val node = if (onCancelling) { (handler as? JobCancellingNode) ?: InvokeOnCancelling(handler) @@ -555,7 +565,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private suspend fun joinSuspend() = suspendCancellableCoroutine { cont -> // We have to invoke join() handler only on cancellation, on completion we will be resumed regularly without handlers - cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont).asHandler)) + cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeOnCompletion(cont))) } @Suppress("UNCHECKED_CAST") @@ -571,7 +581,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren select.selectInRegistrationPhase(Unit) return } - val disposableHandle = invokeOnCompletion(SelectOnJoinCompletionHandler(select).asHandler) + val disposableHandle = invokeOnCompletion(handler = SelectOnJoinCompletionHandler(select)) select.disposeOnCompletion(disposableHandle) } @@ -918,7 +928,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren private tailrec fun tryWaitForChild(state: Finishing, child: ChildHandleNode, proposedUpdate: Any?): Boolean { val handle = child.childJob.invokeOnCompletion( invokeImmediately = false, - handler = ChildCompletion(this, state, child, proposedUpdate).asHandler + handler = ChildCompletion(this, state, child, proposedUpdate) ) if (handle !== NonDisposableHandle) return true // child is not complete and we've started waiting for it val nextChild = child.nextChild() ?: return false @@ -968,7 +978,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren * If child is attached when the job is already being cancelled, such child will receive immediate notification on * cancellation, but parent *will* wait for that child before completion and will handle its exception. */ - return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(child).asHandler) as ChildHandle + return invokeOnCompletion(onCancelling = true, handler = ChildHandleNode(child)) as ChildHandle } /** @@ -1231,7 +1241,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren val cont = AwaitContinuation(uCont.intercepted(), this) // we are mimicking suspendCancellableCoroutine here and call initCancellability, too. cont.initCancellability() - cont.disposeOnCancellation(invokeOnCompletion(ResumeAwaitOnCompletion(cont).asHandler)) + cont.disposeOnCancellation(invokeOnCompletion(handler = ResumeAwaitOnCompletion(cont))) cont.getResult() } @@ -1253,7 +1263,7 @@ public open class JobSupport constructor(active: Boolean) : Job, ChildJob, Paren } if (startInternal(state) >= 0) break // break unless needs to retry } - val disposableHandle = invokeOnCompletion(SelectOnAwaitCompletionHandler(select).asHandler) + val disposableHandle = invokeOnCompletion(handler = SelectOnAwaitCompletionHandler(select)) select.disposeOnCompletion(disposableHandle) } @@ -1338,7 +1348,7 @@ internal interface Incomplete { val list: NodeList? // is null only for Empty and JobNode incomplete state objects } -internal abstract class JobNode : CompletionHandlerBase(), DisposableHandle, Incomplete { +internal abstract class JobNode : LockFreeLinkedListNode(), InternalCompletionHandler, DisposableHandle, Incomplete { /** * Initialized by [JobSupport.makeNode]. */ @@ -1377,7 +1387,7 @@ internal class InactiveNodeList( } private class InvokeOnCompletion( - private val handler: CompletionHandler + private val handler: InternalCompletionHandler ) : JobNode() { override fun invoke(cause: Throwable?) = handler.invoke(cause) } @@ -1420,7 +1430,7 @@ internal class DisposeOnCompletion( internal abstract class JobCancellingNode : JobNode() private class InvokeOnCancelling( - private val handler: CompletionHandler + private val handler: InternalCompletionHandler ) : JobCancellingNode() { // delegate handler shall be invoked at most once, so here is an additional flag private val _invoked = atomic(0) // todo: replace with atomic boolean after migration to recent atomicFu diff --git a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt index 9224ae8fce..fb6846ef4e 100644 --- a/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/BufferedChannel.kt @@ -241,7 +241,7 @@ internal open class BufferedChannel( /** * Abstract send implementation. */ - protected inline fun sendImpl( + private inline fun sendImpl( /* The element to be sent. */ element: E, /* The waiter to be stored in case of suspension, @@ -350,6 +350,29 @@ internal open class BufferedChannel( } } + // Note: this function is temporarily moved from ConflatedBufferedChannel to BufferedChannel class, because of this issue: KT-65554. + // For now, an inline function, which invokes atomic operations, may only be called within a parent class. + protected fun trySendDropOldest(element: E): ChannelResult = + sendImpl( // <-- this is an inline function + element = element, + // Put the element into the logical buffer even + // if this channel is already full, the `onSuspend` + // callback below extract the first (oldest) element. + waiter = BUFFERED, + // Finish successfully when a rendezvous has happened + // or the element has been buffered. + onRendezvousOrBuffered = { return success(Unit) }, + // In case the algorithm decided to suspend, the element + // was added to the buffer. However, as the buffer is now + // overflowed, the first (oldest) element has to be extracted. + onSuspend = { segm, i -> + dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i) + return success(Unit) + }, + // If the channel is closed, return the corresponding result. + onClosed = { return closed(sendException) } + ) + private inline fun sendImplOnNoWaiter( /* The working cell is specified by the segment and the index in it. */ diff --git a/kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt b/kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt index a2d3a10322..5c7f151022 100644 --- a/kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt +++ b/kotlinx-coroutines-core/common/src/channels/ConflatedBufferedChannel.kt @@ -72,27 +72,6 @@ internal open class ConflatedBufferedChannel( return success(Unit) } - private fun trySendDropOldest(element: E): ChannelResult = - sendImpl( // <-- this is an inline function - element = element, - // Put the element into the logical buffer even - // if this channel is already full, the `onSuspend` - // callback below extract the first (oldest) element. - waiter = BUFFERED, - // Finish successfully when a rendezvous has happened - // or the element has been buffered. - onRendezvousOrBuffered = { return success(Unit) }, - // In case the algorithm decided to suspend, the element - // was added to the buffer. However, as the buffer is now - // overflowed, the first (oldest) element has to be extracted. - onSuspend = { segm, i -> - dropFirstElementUntilTheSpecifiedCellIsInTheBuffer(segm.id * SEGMENT_SIZE + i) - return success(Unit) - }, - // If the channel is closed, return the corresponding result. - onClosed = { return closed(sendException) } - ) - @Suppress("UNCHECKED_CAST") override fun registerSelectForSend(select: SelectInstance<*>, element: Any?) { // The plain `send(..)` operation never suspends. Thus, either this diff --git a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt index 077ba83704..dbb9507c23 100644 --- a/kotlinx-coroutines-core/common/src/flow/StateFlow.kt +++ b/kotlinx-coroutines-core/common/src/flow/StateFlow.kt @@ -250,8 +250,14 @@ private class StateFlowSlot : AbstractSharedFlowSlot>() { * * It is important that default `null` value is used, because there can be a race between allocation * of a new slot and trying to do [makePending] on this slot. + * + * === + * This should be `atomic(null)` instead of the atomic reference, but because of #3820 + * it is used as a **temporary** solution starting from 1.8.1 version. + * Depending on the fix rollout on Android, it will be removed in 1.9.0 or 2.0.0. + * See https://issuetracker.google.com/issues/325123736 */ - private val _state = atomic(null) + private val _state = WorkaroundAtomicReference(null) override fun allocateLocked(flow: StateFlowImpl<*>): Boolean { // No need for atomic check & update here, since allocated happens under StateFlow lock @@ -290,7 +296,6 @@ private class StateFlowSlot : AbstractSharedFlowSlot>() { return state === PENDING } - @Suppress("UNCHECKED_CAST") suspend fun awaitPending(): Unit = suspendCancellableCoroutine sc@ { cont -> assert { _state.value !is CancellableContinuationImpl<*> } // can be NONE or PENDING if (_state.compareAndSet(NONE, cont)) return@sc // installed continuation, waiting for pending @@ -306,7 +311,6 @@ private class StateFlowImpl( private val _state = atomic(initialState) // T | NULL private var sequence = 0 // serializes updates, value update is in process when sequence is odd - @Suppress("UNCHECKED_CAST") public override var value: T get() = NULL.unbox(_state.value) set(value) { updateState(null, value ?: NULL) } diff --git a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt index cad34a0d55..2a701c0c12 100644 --- a/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt +++ b/kotlinx-coroutines-core/common/src/flow/operators/Delay.kt @@ -394,6 +394,7 @@ private fun Flow.timeoutInternal( value.onSuccess { downStream.emit(it) }.onClosed { + it?.let { throw it } return@onReceiveCatching false } return@onReceiveCatching true diff --git a/kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt b/kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt index 8aa3df41dc..417e9a8199 100644 --- a/kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt +++ b/kotlinx-coroutines-core/common/src/internal/Concurrent.common.kt @@ -19,3 +19,22 @@ internal expect fun identitySet(expectedSize: Int): MutableSet @OptionalExpectation @Target(AnnotationTarget.FIELD) internal expect annotation class BenignDataRace() + +// Used **only** as a workaround for #3820 in StateFlow. Do not use anywhere else +internal expect class WorkaroundAtomicReference(value: T) { + public fun get(): T + public fun set(value: T) + public fun getAndSet(value: T): T + public fun compareAndSet(expected: T, value: T): Boolean +} + +@Suppress("UNUSED_PARAMETER", "EXTENSION_SHADOWED_BY_MEMBER") +internal var WorkaroundAtomicReference.value: T + get() = this.get() + set(value) = this.set(value) + +internal inline fun WorkaroundAtomicReference.loop(action: WorkaroundAtomicReference.(value: T) -> Unit) { + while (true) { + action(value) + } +} diff --git a/kotlinx-coroutines-core/common/src/selects/Select.kt b/kotlinx-coroutines-core/common/src/selects/Select.kt index 92bfb3f522..7e685af3d4 100644 --- a/kotlinx-coroutines-core/common/src/selects/Select.kt +++ b/kotlinx-coroutines-core/common/src/selects/Select.kt @@ -239,7 +239,7 @@ internal interface SelectInstanceInternal: SelectInstance, Waiter @PublishedApi internal open class SelectImplementation( override val context: CoroutineContext -) : CancelHandler(), SelectBuilder, SelectInstanceInternal { +) : CancelHandler, SelectBuilder, SelectInstanceInternal { /** * Essentially, the `select` operation is split into three phases: REGISTRATION, WAITING, and COMPLETION. @@ -565,7 +565,7 @@ internal open class SelectImplementation( // Also, we MUST guarantee that this dispose handle is _visible_ // according to the memory model, and we CAN guarantee this when // the state is updated. - cont.invokeOnCancellation(this.asHandler) + cont.invokeOnCancellation(this) return@sc } // This `select` is in REGISTRATION phase, but there are clauses that has to be registered again. diff --git a/kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt b/kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt index a2ca101ef0..0162a216c3 100644 --- a/kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt +++ b/kotlinx-coroutines-core/common/test/flow/operators/TimeoutTest.kt @@ -237,6 +237,17 @@ class TimeoutTest : TestBase() { testImmediateTimeout(-1.seconds) } + @Test + fun testClosing() = runTest { + assertFailsWith { + channelFlow { close(TestException()) } + .timeout(Duration.INFINITE) + .collect { + expectUnreached() + } + } + } + private fun testImmediateTimeout(timeout: Duration) { expect(1) val flow = emptyFlow().timeout(timeout) diff --git a/kotlinx-coroutines-core/concurrent/src/CompletionHandler.kt b/kotlinx-coroutines-core/concurrent/src/CompletionHandler.kt deleted file mode 100644 index 4737112e4f..0000000000 --- a/kotlinx-coroutines-core/concurrent/src/CompletionHandler.kt +++ /dev/null @@ -1,18 +0,0 @@ -package kotlinx.coroutines - -import kotlinx.coroutines.internal.* - -internal actual abstract class CompletionHandlerBase actual constructor() : LockFreeLinkedListNode(), CompletionHandler { - actual abstract override fun invoke(cause: Throwable?) -} - -internal actual inline val CompletionHandlerBase.asHandler: CompletionHandler get() = this - -internal actual abstract class CancelHandlerBase actual constructor() : CompletionHandler { - actual abstract override fun invoke(cause: Throwable?) -} - -internal actual inline val CancelHandlerBase.asHandler: CompletionHandler get() = this - -@Suppress("NOTHING_TO_INLINE") -internal actual inline fun CompletionHandler.invokeIt(cause: Throwable?) = invoke(cause) diff --git a/kotlinx-coroutines-core/concurrent/test/channels/BroadcastChannelSubStressTest.kt b/kotlinx-coroutines-core/concurrent/test/channels/BroadcastChannelSubStressTest.kt index 8f6a8e0c31..8db6d8b9ec 100644 --- a/kotlinx-coroutines-core/concurrent/test/channels/BroadcastChannelSubStressTest.kt +++ b/kotlinx-coroutines-core/concurrent/test/channels/BroadcastChannelSubStressTest.kt @@ -12,13 +12,13 @@ import kotlin.test.* */ class BroadcastChannelSubStressTest: TestBase() { - private val nSeconds = 5 * stressTestMultiplier + private val nSeconds = maxOf(5, stressTestMultiplier) private val sentTotal = atomic(0L) private val receivedTotal = atomic(0L) @Test fun testStress() = runTest { - TestBroadcastChannelKind.values().forEach { kind -> + TestBroadcastChannelKind.entries.forEach { kind -> println("--- BroadcastChannelSubStressTest $kind") val broadcast = kind.create() val sender = diff --git a/kotlinx-coroutines-core/concurrent/test/sync/MutexStressTest.kt b/kotlinx-coroutines-core/concurrent/test/sync/MutexStressTest.kt index 8c8f04f809..67ff03b91e 100644 --- a/kotlinx-coroutines-core/concurrent/test/sync/MutexStressTest.kt +++ b/kotlinx-coroutines-core/concurrent/test/sync/MutexStressTest.kt @@ -8,7 +8,7 @@ import kotlin.test.* class MutexStressTest : TestBase() { - private val n = (if (isNative) 1_000 else 10_000) * stressTestMultiplier + private val n = 1000 * stressTestMultiplier // It mostly stresses K/N as JVM Mutex is tested by lincheck @Test fun testDefaultDispatcher() = runTest { testBody(Dispatchers.Default) } diff --git a/kotlinx-coroutines-core/js/src/CompletionHandler.kt b/kotlinx-coroutines-core/js/src/CompletionHandler.kt deleted file mode 100644 index 06fec14918..0000000000 --- a/kotlinx-coroutines-core/js/src/CompletionHandler.kt +++ /dev/null @@ -1,26 +0,0 @@ -package kotlinx.coroutines - -import kotlinx.coroutines.internal.* - -internal actual abstract class CompletionHandlerBase : LinkedListNode() { - @JsName("invoke") - actual abstract fun invoke(cause: Throwable?) -} - -@Suppress("UnsafeCastFromDynamic") -internal actual inline val CompletionHandlerBase.asHandler: CompletionHandler get() = asDynamic() - -internal actual abstract class CancelHandlerBase { - @JsName("invoke") - actual abstract fun invoke(cause: Throwable?) -} - -@Suppress("UnsafeCastFromDynamic") -internal actual inline val CancelHandlerBase.asHandler: CompletionHandler get() = asDynamic() - -internal actual fun CompletionHandler.invokeIt(cause: Throwable?) { - when(jsTypeOf(this)) { - "function" -> invoke(cause) - else -> asDynamic().invoke(cause) - } -} diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/Concurrent.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/Concurrent.kt index c692dfcb9d..652d60c4a5 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/Concurrent.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/Concurrent.kt @@ -11,3 +11,25 @@ internal class NoOpLock { internal actual fun identitySet(expectedSize: Int): MutableSet = HashSet(expectedSize) +internal actual class WorkaroundAtomicReference actual constructor(private var value: T) { + + public actual fun get(): T = value + + public actual fun set(value: T) { + this.value = value + } + + public actual fun getAndSet(value: T): T { + val prev = this.value + this.value = value + return prev + } + + public actual fun compareAndSet(expected: T, value: T): Boolean { + if (this.value === expected) { + this.value = value + return true + } + return false + } +} diff --git a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt index 4fa3f44f20..b002ea722a 100644 --- a/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt +++ b/kotlinx-coroutines-core/jsAndWasmShared/src/internal/JSDispatcher.kt @@ -46,7 +46,7 @@ internal abstract class SetTimeoutBasedDispatcher: CoroutineDispatcher(), Delay override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { val handle = w3cSetTimeout({ with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis)) - continuation.invokeOnCancellation(handler = ClearTimeout(handle).asHandler) + continuation.invokeOnCancellation(handler = ClearTimeout(handle)) } } @@ -57,7 +57,7 @@ internal class WindowDispatcher(private val window: W3CWindow) : CoroutineDispat override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation) { val handle = w3cSetTimeout(window, { with(continuation) { resumeUndispatched(Unit) } }, delayToInt(timeMillis)) - continuation.invokeOnCancellation(handler = WindowClearTimeout(handle).asHandler) + continuation.invokeOnCancellation(handler = WindowClearTimeout(handle)) } override fun invokeOnTimeout(timeMillis: Long, block: Runnable, context: CoroutineContext): DisposableHandle { @@ -78,7 +78,7 @@ internal object SetTimeoutDispatcher : SetTimeoutBasedDispatcher() { } } -private open class ClearTimeout(protected val handle: Int) : CancelHandler(), DisposableHandle { +private open class ClearTimeout(protected val handle: Int) : CancelHandler, DisposableHandle { override fun dispose() { w3cClearTimeout(handle) } diff --git a/kotlinx-coroutines-core/jvm/src/Future.kt b/kotlinx-coroutines-core/jvm/src/Future.kt index c2425dba73..2e217cc65f 100644 --- a/kotlinx-coroutines-core/jvm/src/Future.kt +++ b/kotlinx-coroutines-core/jvm/src/Future.kt @@ -38,7 +38,7 @@ private class CancelFutureOnCompletion( } } -private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler() { +private class CancelFutureOnCancel(private val future: Future<*>) : CancelHandler { override fun invoke(cause: Throwable?) { // Don't interrupt when cancelling future on completion, because no one is going to reset this // interruption flag and it will cause spurious failures elsewhere diff --git a/kotlinx-coroutines-core/jvm/src/Interruptible.kt b/kotlinx-coroutines-core/jvm/src/Interruptible.kt index 595c5062f7..b25a289cf3 100644 --- a/kotlinx-coroutines-core/jvm/src/Interruptible.kt +++ b/kotlinx-coroutines-core/jvm/src/Interruptible.kt @@ -59,7 +59,7 @@ private const val FINISHED = 1 private const val INTERRUPTING = 2 private const val INTERRUPTED = 3 -private class ThreadState(private val job: Job) : CompletionHandler { +private class ThreadState(private val job: Job) : InternalCompletionHandler { /* === States === diff --git a/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt b/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt index 9ebeed0ad9..4e8eaf27b0 100644 --- a/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt +++ b/kotlinx-coroutines-core/jvm/src/internal/Concurrent.kt @@ -10,6 +10,9 @@ internal actual typealias ReentrantLock = java.util.concurrent.locks.ReentrantLo internal actual inline fun ReentrantLock.withLock(action: () -> T) = this.withLockJvm(action) +@Suppress("ACTUAL_WITHOUT_EXPECT") // Visibility +internal actual typealias WorkaroundAtomicReference = java.util.concurrent.atomic.AtomicReference + @Suppress("NOTHING_TO_INLINE") // So that R8 can completely remove ConcurrentKt class internal actual inline fun identitySet(expectedSize: Int): MutableSet = Collections.newSetFromMap(IdentityHashMap(expectedSize)) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index e9d11d354f..4d6dce37a5 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -8,7 +8,6 @@ import java.util.concurrent.* import java.util.concurrent.locks.* import kotlin.jvm.internal.Ref.ObjectRef import kotlin.math.* -import kotlin.random.* /** * Coroutine scheduler (pool of shared threads) which primary target is to distribute dispatched coroutines @@ -658,11 +657,21 @@ internal class CoroutineScheduler( var nextParkedWorker: Any? = NOT_IN_STACK /* - * The delay until at least one task in other worker queues will become stealable. + * The delay until at least one task in other worker queues will become stealable. */ private var minDelayUntilStealableTaskNs = 0L - private var rngState = Random.nextInt() + /** + * The state of embedded Marsaglia xorshift random number generator, used for work-stealing purposes. + * It is initialized with a seed. + */ + private var rngState: Int = run { + // This could've been Random.nextInt(), but we are shaving an extra initialization cost, see #4051 + val seed = System.nanoTime().toInt() + // rngState shouldn't be zero, as required for the xorshift algorithm + if (seed != 0) return@run seed + 42 + } /** * Tries to acquire CPU token if worker doesn't have one diff --git a/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt b/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt index 4cb45ba96e..6287be0081 100644 --- a/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt +++ b/kotlinx-coroutines-core/jvm/test/AbstractLincheckTest.kt @@ -13,16 +13,16 @@ abstract class AbstractLincheckTest { @Test fun modelCheckingTest() = ModelCheckingOptions() - .iterations(if (isStressTest) 200 else 20) - .invocationsPerIteration(if (isStressTest) 10_000 else 1_000) + .iterations(20 * stressTestMultiplierSqrt) + .invocationsPerIteration(1_000 * stressTestMultiplierSqrt) .commonConfiguration() .customize(isStressTest) .check(this::class) @Test fun stressTest() = StressOptions() - .iterations(if (isStressTest) 200 else 20) - .invocationsPerIteration(if (isStressTest) 10_000 else 1_000) + .iterations(20 * stressTestMultiplierSqrt) + .invocationsPerIteration(1_000 * stressTestMultiplierSqrt) .commonConfiguration() .customize(isStressTest) .check(this::class) diff --git a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt index e6e629da22..5b211807da 100644 --- a/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/channels/BroadcastChannelMultiReceiveStressTest.kt @@ -15,15 +15,17 @@ import java.util.concurrent.atomic.* class BroadcastChannelMultiReceiveStressTest( private val kind: TestBroadcastChannelKind ) : TestBase() { + + // Stressed by lincheck companion object { @Parameterized.Parameters(name = "{0}") @JvmStatic fun params(): Collection> = - TestBroadcastChannelKind.values().map { arrayOf(it) } + TestBroadcastChannelKind.entries.map { arrayOf(it) } } private val nReceivers = if (isStressTest) 10 else 5 - private val nSeconds = 3 * stressTestMultiplier + private val nSeconds = 3 * stressTestMultiplierSqrt private val broadcast = kind.create() private val pool = newFixedThreadPoolContext(nReceivers + 1, "BroadcastChannelMultiReceiveStressTest") @@ -62,13 +64,13 @@ class BroadcastChannelMultiReceiveStressTest( println("Launching $name") receivers += launch(pool + CoroutineName(name)) { val channel = broadcast.openSubscription() - when (receiverIndex % 5) { - 0 -> doReceive(channel, receiverIndex) - 1 -> doReceiveCatching(channel, receiverIndex) - 2 -> doIterator(channel, receiverIndex) - 3 -> doReceiveSelect(channel, receiverIndex) - 4 -> doReceiveCatchingSelect(channel, receiverIndex) - } + when (receiverIndex % 5) { + 0 -> doReceive(channel, receiverIndex) + 1 -> doReceiveCatching(channel, receiverIndex) + 2 -> doIterator(channel, receiverIndex) + 3 -> doReceiveSelect(channel, receiverIndex) + 4 -> doReceiveCatchingSelect(channel, receiverIndex) + } channel.cancel() } printProgress() @@ -93,7 +95,7 @@ class BroadcastChannelMultiReceiveStressTest( } catch (e: Exception) { println("Failed: $e") pool.dumpThreads("Threads in pool") - receivers.indices.forEach { index -> + receivers.indices.forEach { index -> println("lastReceived[$index] = ${lastReceived[index].get()}") } throw e @@ -116,8 +118,9 @@ class BroadcastChannelMultiReceiveStressTest( try { val stop = doReceived(receiverIndex, channel.receive()) if (stop) break + } catch (ex: ClosedReceiveChannelException) { + break } - catch (ex: ClosedReceiveChannelException) { break } } } @@ -141,7 +144,9 @@ class BroadcastChannelMultiReceiveStressTest( val event = select { channel.onReceive { it } } val stop = doReceived(receiverIndex, event) if (stop) break - } catch (ex: ClosedReceiveChannelException) { break } + } catch (ex: ClosedReceiveChannelException) { + break + } } } @@ -152,4 +157,10 @@ class BroadcastChannelMultiReceiveStressTest( if (stop) break } } + + @Suppress("UNUSED_PARAMETER") + private fun println(debugMessage: String) { + // Uncomment for local debugging + //println(debugMessage as Any?) + } } diff --git a/kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt b/kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt index 7a524853eb..0d160e6a70 100644 --- a/kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt @@ -11,7 +11,6 @@ import kotlin.test.* import kotlin.time.* import kotlin.time.TimeSource -@OptIn(ExperimentalTime::class) class SharingStressTest : TestBase() { private val testDuration = 1000L * stressTestMultiplier private val nSubscribers = 5 diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt b/kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt index d68c113bd2..8c9845afa5 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-channel-01.kt @@ -7,7 +7,8 @@ import kotlinx.coroutines.channels.* fun main() = runBlocking { val channel = Channel() launch { - // this might be heavy CPU-consuming computation or async logic, we'll just send five squares + // this might be heavy CPU-consuming computation or async logic, + // we'll just send five squares for (x in 1..5) channel.send(x * x) } // here we print five received integers: diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt b/kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt index 7e2b59bf1d..8867e949ab 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-channel-10.kt @@ -5,25 +5,25 @@ import kotlinx.coroutines.* import kotlinx.coroutines.channels.* fun main() = runBlocking { - val tickerChannel = ticker(delayMillis = 100, initialDelayMillis = 0) // create ticker channel + val tickerChannel = ticker(delayMillis = 200, initialDelayMillis = 0) // create a ticker channel var nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Initial element is available immediately: $nextElement") // no initial delay - nextElement = withTimeoutOrNull(50) { tickerChannel.receive() } // all subsequent elements have 100ms delay - println("Next element is not ready in 50 ms: $nextElement") + nextElement = withTimeoutOrNull(100) { tickerChannel.receive() } // all subsequent elements have 200ms delay + println("Next element is not ready in 100 ms: $nextElement") - nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } - println("Next element is ready in 100 ms: $nextElement") + nextElement = withTimeoutOrNull(120) { tickerChannel.receive() } + println("Next element is ready in 200 ms: $nextElement") // Emulate large consumption delays - println("Consumer pauses for 150ms") - delay(150) + println("Consumer pauses for 300ms") + delay(300) // Next element is available immediately nextElement = withTimeoutOrNull(1) { tickerChannel.receive() } println("Next element is available immediately after large consumer delay: $nextElement") // Note that the pause between `receive` calls is taken into account and next element arrives faster - nextElement = withTimeoutOrNull(60) { tickerChannel.receive() } - println("Next element is ready in 50ms after consumer pause in 150ms: $nextElement") + nextElement = withTimeoutOrNull(120) { tickerChannel.receive() } + println("Next element is ready in 100ms after consumer pause in 300ms: $nextElement") tickerChannel.cancel() // indicate that no more elements are needed } diff --git a/kotlinx-coroutines-core/jvm/test/guide/example-context-08.kt b/kotlinx-coroutines-core/jvm/test/guide/example-context-08.kt index 0ab1a3ee48..bb871662b6 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/example-context-08.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/example-context-08.kt @@ -11,12 +11,12 @@ fun main() = runBlocking(CoroutineName("main")) { val v1 = async(CoroutineName("v1coroutine")) { delay(500) log("Computing v1") - 252 + 6 } val v2 = async(CoroutineName("v2coroutine")) { delay(1000) log("Computing v2") - 6 + 7 } - log("The answer for v1 / v2 = ${v1.await() / v2.await()}") + log("The answer for v1 * v2 = ${v1.await() * v2.await()}") } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/ChannelsGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/ChannelsGuideTest.kt index c2798a3335..97aa1da836 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/ChannelsGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/ChannelsGuideTest.kt @@ -113,11 +113,11 @@ class ChannelsGuideTest { fun testExampleChannel10() { test("ExampleChannel10") { kotlinx.coroutines.guide.exampleChannel10.main() }.verifyLines( "Initial element is available immediately: kotlin.Unit", - "Next element is not ready in 50 ms: null", - "Next element is ready in 100 ms: kotlin.Unit", - "Consumer pauses for 150ms", + "Next element is not ready in 100 ms: null", + "Next element is ready in 200 ms: kotlin.Unit", + "Consumer pauses for 300ms", "Next element is available immediately after large consumer delay: kotlin.Unit", - "Next element is ready in 50ms after consumer pause in 150ms: kotlin.Unit" + "Next element is ready in 100ms after consumer pause in 300ms: kotlin.Unit" ) } } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt index 5a4bb9939c..1cf6c2bd4d 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/DispatcherGuideTest.kt @@ -77,7 +77,7 @@ class DispatcherGuideTest { "[main @main#1] Started main coroutine", "[main @v1coroutine#2] Computing v1", "[main @v2coroutine#3] Computing v2", - "[main @main#1] The answer for v1 / v2 = 42" + "[main @main#1] The answer for v1 * v2 = 42" ) } diff --git a/kotlinx-coroutines-core/jvm/test/guide/test/ExceptionsGuideTest.kt b/kotlinx-coroutines-core/jvm/test/guide/test/ExceptionsGuideTest.kt index 33da0a7bc7..dc600616e1 100644 --- a/kotlinx-coroutines-core/jvm/test/guide/test/ExceptionsGuideTest.kt +++ b/kotlinx-coroutines-core/jvm/test/guide/test/ExceptionsGuideTest.kt @@ -9,7 +9,7 @@ class ExceptionsGuideTest { fun testExampleExceptions01() { test("ExampleExceptions01") { kotlinx.coroutines.guide.exampleExceptions01.main() }.verifyExceptions( "Throwing exception from launch", - "Exception in thread \"DefaultDispatcher-worker-2 @coroutine#2\" java.lang.IndexOutOfBoundsException", + "Exception in thread \"DefaultDispatcher-worker-1 @coroutine#2\" java.lang.IndexOutOfBoundsException", "Joined failed job", "Throwing exception from async", "Caught ArithmeticException" diff --git a/kotlinx-coroutines-core/jvm/test/selects/SelectPhilosophersStressTest.kt b/kotlinx-coroutines-core/jvm/test/selects/SelectPhilosophersStressTest.kt index 09918068de..423c1a8a55 100644 --- a/kotlinx-coroutines-core/jvm/test/selects/SelectPhilosophersStressTest.kt +++ b/kotlinx-coroutines-core/jvm/test/selects/SelectPhilosophersStressTest.kt @@ -7,7 +7,7 @@ import org.junit.Test import kotlin.test.* class SelectPhilosophersStressTest : TestBase() { - private val TEST_DURATION = 3000L * stressTestMultiplier + private val TEST_DURATION = 3000L * stressTestMultiplierSqrt val n = 10 // number of philosophers private val forks = Array(n) { Mutex() } @@ -59,4 +59,4 @@ class SelectPhilosophersStressTest : TestBase() { assertTrue(eats > 0, "$id shall not starve") } } -} \ No newline at end of file +} diff --git a/kotlinx-coroutines-core/native/src/internal/Concurrent.kt b/kotlinx-coroutines-core/native/src/internal/Concurrent.kt index eb9c8a59c4..8423c618e6 100644 --- a/kotlinx-coroutines-core/native/src/internal/Concurrent.kt +++ b/kotlinx-coroutines-core/native/src/internal/Concurrent.kt @@ -13,3 +13,18 @@ internal actual fun identitySet(expectedSize: Int): MutableSet = HashSet( @Suppress("ACTUAL_WITHOUT_EXPECT") // This suppress can be removed in 2.0: KT-59355 internal actual typealias BenignDataRace = kotlin.concurrent.Volatile + +internal actual class WorkaroundAtomicReference actual constructor(value: T) { + + private val nativeAtomic = kotlin.concurrent.AtomicReference(value) + + public actual fun get(): T = nativeAtomic.value + + public actual fun set(value: T) { + nativeAtomic.value = value + } + + public actual fun getAndSet(value: T): T = nativeAtomic.getAndSet(value) + + public actual fun compareAndSet(expected: T, value: T): Boolean = nativeAtomic.compareAndSet(expected, value) +} diff --git a/kotlinx-coroutines-core/wasmJs/src/CompletionHandler.kt b/kotlinx-coroutines-core/wasmJs/src/CompletionHandler.kt deleted file mode 100644 index 4737112e4f..0000000000 --- a/kotlinx-coroutines-core/wasmJs/src/CompletionHandler.kt +++ /dev/null @@ -1,18 +0,0 @@ -package kotlinx.coroutines - -import kotlinx.coroutines.internal.* - -internal actual abstract class CompletionHandlerBase actual constructor() : LockFreeLinkedListNode(), CompletionHandler { - actual abstract override fun invoke(cause: Throwable?) -} - -internal actual inline val CompletionHandlerBase.asHandler: CompletionHandler get() = this - -internal actual abstract class CancelHandlerBase actual constructor() : CompletionHandler { - actual abstract override fun invoke(cause: Throwable?) -} - -internal actual inline val CancelHandlerBase.asHandler: CompletionHandler get() = this - -@Suppress("NOTHING_TO_INLINE") -internal actual inline fun CompletionHandler.invokeIt(cause: Throwable?) = invoke(cause) diff --git a/kotlinx-coroutines-debug/README.md b/kotlinx-coroutines-debug/README.md index 95df04a914..24a2fa8d7b 100644 --- a/kotlinx-coroutines-debug/README.md +++ b/kotlinx-coroutines-debug/README.md @@ -61,7 +61,7 @@ stacktraces will be dumped to the console. ### Using as JVM agent Debug module can also be used as a standalone JVM agent to enable debug probes on the application startup. -You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.8.0.jar`. +You can run your application with an additional argument: `-javaagent:kotlinx-coroutines-debug-1.8.1.jar`. Additionally, on Linux and Mac OS X you can use `kill -5 $pid` command in order to force your application to print all alive coroutines. When used as Java agent, `"kotlinx.coroutines.debug.enable.creation.stack.trace"` system property can be used to control [DebugProbes.enableCreationStackTraces] along with agent startup. diff --git a/kotlinx-coroutines-debug/build.gradle.kts b/kotlinx-coroutines-debug/build.gradle.kts index 83fc944fb9..a9b2d6730d 100644 --- a/kotlinx-coroutines-debug/build.gradle.kts +++ b/kotlinx-coroutines-debug/build.gradle.kts @@ -99,11 +99,13 @@ configurations { } } -koverReport { - filters { - excludes { - // Never used, safety mechanism - classes("kotlinx.coroutines.debug.internal.NoOpProbesKt") +kover { + reports { + filters { + excludes { + // Never used, safety mechanism + classes("kotlinx.coroutines.debug.internal.NoOpProbesKt") + } } } } diff --git a/kotlinx-coroutines-test/README.md b/kotlinx-coroutines-test/README.md index 847a29bec8..fbadf574d6 100644 --- a/kotlinx-coroutines-test/README.md +++ b/kotlinx-coroutines-test/README.md @@ -26,7 +26,7 @@ Provided [TestDispatcher] implementations: Add `kotlinx-coroutines-test` to your project test dependencies: ``` dependencies { - testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.0' + testImplementation 'org.jetbrains.kotlinx:kotlinx-coroutines-test:1.8.1' } ``` diff --git a/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt b/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt index ba46d17c4b..8c70fa8e05 100644 --- a/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt +++ b/kotlinx-coroutines-test/common/src/TestCoroutineScheduler.kt @@ -219,7 +219,6 @@ public class TestCoroutineScheduler : AbstractCoroutineContextElement(TestCorout /** * Returns the [TimeSource] representation of the virtual time of this scheduler. */ - @ExperimentalTime public val timeSource: TimeSource.WithComparableMarks = object : AbstractLongTimeSource(DurationUnit.MILLISECONDS) { override fun read(): Long = currentTime } diff --git a/kotlinx-coroutines-test/common/src/TestScope.kt b/kotlinx-coroutines-test/common/src/TestScope.kt index f6e69510a7..180e76de21 100644 --- a/kotlinx-coroutines-test/common/src/TestScope.kt +++ b/kotlinx-coroutines-test/common/src/TestScope.kt @@ -131,7 +131,6 @@ public fun TestScope.advanceTimeBy(delayTime: Duration): Unit = testScheduler.ad * @see TestCoroutineScheduler.timeSource */ @ExperimentalCoroutinesApi -@ExperimentalTime public val TestScope.testTimeSource: TimeSource.WithComparableMarks get() = testScheduler.timeSource /** diff --git a/kotlinx-coroutines-test/common/test/TestCoroutineSchedulerTest.kt b/kotlinx-coroutines-test/common/test/TestCoroutineSchedulerTest.kt index ba539696d1..a7dd8c623f 100644 --- a/kotlinx-coroutines-test/common/test/TestCoroutineSchedulerTest.kt +++ b/kotlinx-coroutines-test/common/test/TestCoroutineSchedulerTest.kt @@ -310,7 +310,6 @@ class TestCoroutineSchedulerTest { } @Test - @ExperimentalTime fun testAdvanceTimeSource() = runTest { val expected = 1.seconds val before = testTimeSource.markNow() diff --git a/reactive/kotlinx-coroutines-reactive/build.gradle.kts b/reactive/kotlinx-coroutines-reactive/build.gradle.kts index f49ffa0100..3e6ecccf3e 100644 --- a/reactive/kotlinx-coroutines-reactive/build.gradle.kts +++ b/reactive/kotlinx-coroutines-reactive/build.gradle.kts @@ -36,14 +36,16 @@ externalDocumentationLink( url = "https://www.reactive-streams.org/reactive-streams-$reactiveStreamsVersion-javadoc/" ) -koverReport { - filters { - excludes { - classes( - "kotlinx.coroutines.reactive.FlowKt", // Deprecated - "kotlinx.coroutines.reactive.FlowKt__MigrationKt", // Deprecated - "kotlinx.coroutines.reactive.ConvertKt" // Deprecated - ) +kover { + reports { + filters { + excludes { + classes( + "kotlinx.coroutines.reactive.FlowKt", // Deprecated + "kotlinx.coroutines.reactive.FlowKt__MigrationKt", // Deprecated + "kotlinx.coroutines.reactive.ConvertKt" // Deprecated + ) + } } } } diff --git a/reactive/kotlinx-coroutines-reactor/build.gradle.kts b/reactive/kotlinx-coroutines-reactor/build.gradle.kts index 4c056ee35d..6e46d912b5 100644 --- a/reactive/kotlinx-coroutines-reactor/build.gradle.kts +++ b/reactive/kotlinx-coroutines-reactor/build.gradle.kts @@ -5,10 +5,8 @@ plugins { id("org.jetbrains.kotlinx.kover") } -val reactorVersion = version("reactor") - dependencies { - api("io.projectreactor:reactor-core:$reactorVersion") + api("io.projectreactor:reactor-core:${version("reactor")}") api(project(":kotlinx-coroutines-reactive")) } @@ -27,18 +25,22 @@ tasks { } } +// the version of the docs can be different from the version of the Reactor +// library itself: https://github.com/reactor/reactor-core/issues/3794 externalDocumentationLink( - url = "https://projectreactor.io/docs/core/$reactorVersion/api/" + url = "https://projectreactor.io/docs/core/${version("reactor_docs")}/api/" ) -koverReport { - filters { - excludes { - classes( - "kotlinx.coroutines.reactor.FlowKt", // Deprecated - "kotlinx.coroutines.reactor.ConvertKt\$asFlux$1" // Deprecated - ) +kover { + reports { + filters { + excludes { + classes( + "kotlinx.coroutines.reactor.FlowKt", // Deprecated + "kotlinx.coroutines.reactor.ConvertKt\$asFlux$1" // Deprecated + ) + } } } } diff --git a/ui/coroutines-guide-ui.md b/ui/coroutines-guide-ui.md index bd0e596859..3b2087d677 100644 --- a/ui/coroutines-guide-ui.md +++ b/ui/coroutines-guide-ui.md @@ -110,7 +110,7 @@ Add dependencies on `kotlinx-coroutines-android` module to the `dependencies { . `app/build.gradle` file: ```groovy -implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.8.0" +implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.8.1" ``` You can clone [kotlinx.coroutines](https://github.com/Kotlin/kotlinx.coroutines) project from GitHub onto your diff --git a/ui/kotlinx-coroutines-android/build.gradle.kts b/ui/kotlinx-coroutines-android/build.gradle.kts index a09d02ca75..adbafe4571 100644 --- a/ui/kotlinx-coroutines-android/build.gradle.kts +++ b/ui/kotlinx-coroutines-android/build.gradle.kts @@ -1,5 +1,3 @@ -import kotlinx.kover.api.* - configurations { create("r8") }