Skip to content

Commit

Permalink
Minor improvements of debugging experience (Kotlin#2093)
Browse files Browse the repository at this point in the history
* Recognizable toString implementation in default dispatchers
* Fast-path for disabled debug probes
  • Loading branch information
qwwdfsad authored Jun 17, 2020
1 parent 68360e0 commit ad542c4
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 11 deletions.
2 changes: 1 addition & 1 deletion docs/flow.md
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ This code produces the following exception:
```text
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].
but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].
Please refer to 'flow' documentation or use 'flowOn' instead
at ...
```
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/common/src/Unconfined.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ internal object Unconfined : CoroutineDispatcher() {
"isDispatchNeeded and dispatch calls.")
}

override fun toString(): String = "Unconfined"
override fun toString(): String = "Dispatchers.Unconfined"
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ internal object DebugProbesImpl {
internal fun probeCoroutineSuspended(frame: Continuation<*>) = updateState(frame, SUSPENDED)

private fun updateState(frame: Continuation<*>, state: String) {
if (!isInstalled) return
// KT-29997 is here only since 1.3.30
if (state == RUNNING && KotlinVersion.CURRENT.isAtLeast(1, 3, 30)) {
val stackFrame = frame as? CoroutineStackFrame ?: return
Expand Down
22 changes: 14 additions & 8 deletions kotlinx-coroutines-core/jvm/src/scheduling/Dispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,18 @@ import kotlin.coroutines.*
* Default instance of coroutine dispatcher.
*/
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
val IO: CoroutineDispatcher = LimitingDispatcher(
this,
systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
"Dispatchers.IO",
TASK_PROBABLY_BLOCKING
)

override fun close() {
throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")
throw UnsupportedOperationException("$DEFAULT_DISPATCHER_NAME cannot be closed")
}

override fun toString(): String = DEFAULT_SCHEDULER_NAME
override fun toString(): String = DEFAULT_DISPATCHER_NAME

@InternalCoroutinesApi
@Suppress("UNUSED")
Expand Down Expand Up @@ -85,7 +90,7 @@ public open class ExperimentalCoroutineDispatcher(
*/
public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
return LimitingDispatcher(this, parallelism, TASK_PROBABLY_BLOCKING)
return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)
}

/**
Expand All @@ -98,7 +103,7 @@ public open class ExperimentalCoroutineDispatcher(
public fun limited(parallelism: Int): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
require(parallelism <= corePoolSize) { "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" }
return LimitingDispatcher(this, parallelism, TASK_NON_BLOCKING)
return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)
}

internal fun dispatchWithContext(block: Runnable, context: TaskContext, tailDispatch: Boolean) {
Expand Down Expand Up @@ -130,8 +135,9 @@ public open class ExperimentalCoroutineDispatcher(
}

private class LimitingDispatcher(
val dispatcher: ExperimentalCoroutineDispatcher,
val parallelism: Int,
private val dispatcher: ExperimentalCoroutineDispatcher,
private val parallelism: Int,
private val name: String?,
override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {

Expand Down Expand Up @@ -190,7 +196,7 @@ private class LimitingDispatcher(
}

override fun toString(): String {
return "${super.toString()}[dispatcher = $dispatcher]"
return name ?: "${super.toString()}[dispatcher = $dispatcher]"
}

/**
Expand Down
3 changes: 3 additions & 0 deletions kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ import java.util.concurrent.*

// TODO most of these fields will be moved to 'object ExperimentalDispatcher'

// User-visible name
internal const val DEFAULT_DISPATCHER_NAME = "Dispatchers.Default"
// Internal debuggability name + thread name prefixes
internal const val DEFAULT_SCHEDULER_NAME = "DefaultDispatcher"

// 100us as default
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ class FlowGuideTest {
test("ExampleFlow14") { kotlinx.coroutines.guide.exampleFlow14.main() }.verifyExceptions(
"Exception in thread \"main\" java.lang.IllegalStateException: Flow invariant is violated:",
"\t\tFlow was collected in [CoroutineId(1), \"coroutine#1\":BlockingCoroutine{Active}@5511c7f8, BlockingEventLoop@2eac3323],",
"\t\tbut emission happened in [CoroutineId(1), \"coroutine#1\":DispatchedCoroutine{Active}@2dae0000, DefaultDispatcher].",
"\t\tbut emission happened in [CoroutineId(1), \"coroutine#1\":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default].",
"\t\tPlease refer to 'flow' documentation or use 'flowOn' instead",
"\tat ..."
)
Expand Down

0 comments on commit ad542c4

Please sign in to comment.