Skip to content

Commit

Permalink
Introduce first version of Dispatchers.IO for K/N (Kotlin#3576)
Browse files Browse the repository at this point in the history
* Emulate expect declaration refinement via extension property as the only way to do it in a backwards-compatible manner: in the current model, it is impossible to have common 'expect' Dispatchers declaration, then refined 'concurrent' Dispatchers declaration with 'expect val IO' and then JVM declaration with JVM-specific members. Current solutions seems to be the less intrusive one

Fixes Kotlin#3205
  • Loading branch information
qwwdfsad committed Feb 22, 2023
1 parent bf03c48 commit 1ed19c8
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 5 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,7 @@ See [Contributing Guidelines](CONTRIBUTING.md).
[MainScope()]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-main-scope.html
[SupervisorJob()]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-supervisor-job.html
[CoroutineExceptionHandler]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-coroutine-exception-handler/index.html
[Dispatchers.IO]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-dispatchers/-i-o.html
[Dispatchers.IO]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-i-o.html
[asCoroutineDispatcher]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/as-coroutine-dispatcher.html
[Promise.await]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/await.html
[promise]: https://kotlinlang.org/api/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/promise.html
Expand Down
1 change: 1 addition & 0 deletions kotlinx-coroutines-core/api/kotlinx-coroutines-core.api
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,7 @@ public final class kotlinx/coroutines/Dispatchers {

public final class kotlinx/coroutines/DispatchersKt {
public static final field IO_PARALLELISM_PROPERTY_NAME Ljava/lang/String;
public static final synthetic fun getIO (Lkotlinx/coroutines/Dispatchers;)Lkotlinx/coroutines/CoroutineDispatcher;
}

public abstract interface class kotlinx/coroutines/DisposableHandle {
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Dispatchers.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import kotlin.coroutines.*
public expect object Dispatchers {
/**
* The default [CoroutineDispatcher] that is used by all standard builders like
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc
* [launch][CoroutineScope.launch], [async][CoroutineScope.async], etc.
* if neither a dispatcher nor any other [ContinuationInterceptor] is specified in their context.
*
* It is backed by a shared pool of threads on JVM and Native. By default, the maximum number of threads used
Expand Down
37 changes: 37 additions & 0 deletions kotlinx-coroutines-core/concurrent/src/Dispatchers.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

/**
* The [CoroutineDispatcher] that is designed for offloading blocking IO tasks to a shared pool of threads.
* Additional threads in this pool are created on demand.
* Default IO pool size is `64`; on JVM it can be configured using JVM-specific mechanisms,
* please refer to `Dispatchers.IO` documentation on JVM platform.
*
* ### Elasticity for limited parallelism
*
* `Dispatchers.IO` has a unique property of elasticity: its views
* obtained with [CoroutineDispatcher.limitedParallelism] are
* not restricted by the `Dispatchers.IO` parallelism. Conceptually, there is
* a dispatcher backed by an unlimited pool of threads, and both `Dispatchers.IO`
* and views of `Dispatchers.IO` are actually views of that dispatcher. In practice
* this means that, despite not abiding by `Dispatchers.IO`'s parallelism
* restrictions, its views share threads and resources with it.
*
* In the following example
* ```
* // 100 threads for MySQL connection
* val myMysqlDbDispatcher = Dispatchers.IO.limitedParallelism(100)
* // 60 threads for MongoDB connection
* val myMongoDbDispatcher = Dispatchers.IO.limitedParallelism(60)
* ```
* the system may have up to `64 + 100 + 60` threads dedicated to blocking tasks during peak loads,
* but during its steady state there is only a small number of threads shared
* among `Dispatchers.IO`, `myMysqlDbDispatcher` and `myMongoDbDispatcher`
*/
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
public expect val Dispatchers.IO: CoroutineDispatcher


Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ package kotlinx.coroutines
class DefaultDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
override val dispatcher: CoroutineDispatcher = Dispatchers.Default
}

class IoDispatcherConcurrencyTest : AbstractDispatcherConcurrencyTest() {
override val dispatcher: CoroutineDispatcher = Dispatchers.IO
}
14 changes: 11 additions & 3 deletions kotlinx-coroutines-core/jvm/src/Dispatchers.kt
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
/*
* Copyright 2016-2021 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
* Copyright 2016-2023 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:Suppress("unused")

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
Expand Down Expand Up @@ -97,3 +95,13 @@ public actual object Dispatchers {
DefaultScheduler.shutdown()
}
}

/**
* `actual` counterpart of the corresponding `expect` declaration.
* Should never be used directly from JVM sources, all accesses
* to `Dispatchers.IO` should be resolved to the corresponding member of [Dispatchers] object.
* @suppress
*/
@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
@Deprecated(message = "Should not be used directly", level = DeprecationLevel.HIDDEN)
public actual val Dispatchers.IO: CoroutineDispatcher get() = Dispatchers.IO
32 changes: 32 additions & 0 deletions kotlinx-coroutines-core/native/src/Dispatchers.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlin.coroutines.*


public actual object Dispatchers {
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
Expand All @@ -19,6 +22,35 @@ public actual object Dispatchers {
internal fun injectMain(dispatcher: MainCoroutineDispatcher) {
injectedMainDispatcher = dispatcher
}

internal val IO: CoroutineDispatcher = DefaultIoScheduler
}

internal object DefaultIoScheduler : CoroutineDispatcher() {
// 2048 is an arbitrary KMP-friendly constant
private val unlimitedPool = newFixedThreadPoolContext(2048, "Dispatchers.IO")
private val io = unlimitedPool.limitedParallelism(64) // Default JVM size

@ExperimentalCoroutinesApi
override fun limitedParallelism(parallelism: Int): CoroutineDispatcher {
// See documentation to Dispatchers.IO for the rationale
return unlimitedPool.limitedParallelism(parallelism)
}

override fun dispatch(context: CoroutineContext, block: Runnable) {
io.dispatch(context, block)
}

@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable) {
io.dispatchYield(context, block)
}

override fun toString(): String = "Dispatchers.IO"
}


@Suppress("EXTENSION_SHADOWED_BY_MEMBER")
public actual val Dispatchers.IO: CoroutineDispatcher get() = IO

internal expect fun createMainDispatcher(default: CoroutineDispatcher): MainCoroutineDispatcher

0 comments on commit 1ed19c8

Please sign in to comment.