Skip to content

Commit

Permalink
Document CoroutineScheduler machinery
Browse files Browse the repository at this point in the history
  • Loading branch information
qwwdfsad committed Dec 12, 2019
1 parent d77c17c commit 966020e
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 66 deletions.
127 changes: 73 additions & 54 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,49 +20,73 @@ import kotlin.random.*
*
* Current scheduler implementation has two optimization targets:
* * Efficiency in the face of communication patterns (e.g., actors communicating via channel)
* * Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool
* * Dynamic resizing to support blocking calls without re-dispatching coroutine to separate "blocking" thread pool.
*
* ### Structural overview
*
* Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to [maxPoolSize] lazily created threads
* to execute blocking tasks. Every worker has a local queue in addition to a global scheduler queue and the global queue
* has priority over local queue to avoid starvation of externally-submitted (e.g. from Android UI thread) tasks.
* Work-stealing is implemented on top of that queues to provide even load distribution and illusion of centralized run queue.
* Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to
* [maxPoolSize] lazily created threads to execute blocking tasks.
* Every worker has a local queue in addition to a global scheduler queue
* and the global queue has priority over local queue to avoid starvation of externally-submitted
* (e.g. from Android UI thread) tasks.
* Work-stealing is implemented on top of that queues to provide
* even load distribution and illusion of centralized run queue.
*
* ### Scheduling policy
*
* When a coroutine is dispatched from within a scheduler worker, it's placed into the head of worker run queue.
* If the head is not empty, the task from the head is moved to the tail. Though it is unfair scheduling policy,
* If the head is not empty, the task from the head is moved to the tail. Though it is an unfair scheduling policy,
* it effectively couples communicating coroutines into one and eliminates scheduling latency
* that arises from placing task to the end of the queue.
* Placing former head to the tail is necessary to provide semi-FIFO order, otherwise queue degenerates to stack.
* that arises from placing tasks to the end of the queue.
* Placing former head to the tail is necessary to provide semi-FIFO order, otherwise, queue degenerates to stack.
* When a coroutine is dispatched from an external thread, it's put into the global queue.
* The original idea with a single-slot LIFO buffer comes from Golang runtime scheduler by D. Vyukov.
* It was proven to be "fair enough", performant and generally well accepted and initially was a significant inspiration
* source for the coroutine scheduler.
*
* ### Work stealing and affinity
*
* To provide even tasks distribution worker tries to steal tasks from other workers queues before parking when his local queue is empty.
* A non-standard solution is implemented to provide tasks affinity: task from FIFO buffer may be stolen only if it is stale enough
* (based on the value of [WORK_STEALING_TIME_RESOLUTION_NS]).
* For this purpose monotonic global clock ([System.nanoTime]) is used and every task has associated with it submission time.
* This approach shows outstanding results when coroutines are cooperative, but as downside scheduler now depends on high-resolution global clock
* To provide even tasks distribution worker tries to steal tasks from other workers queues
* before parking when his local queue is empty.
* A non-standard solution is implemented to provide tasks affinity: a task from FIFO buffer may be stolen
* only if it is stale enough based on the value of [WORK_STEALING_TIME_RESOLUTION_NS].
* For this purpose, monotonic global clock is used, and every task has associated with its submission time.
* This approach shows outstanding results when coroutines are cooperative,
* but as downside scheduler now depends on a high-resolution global clock,
* which may limit scalability on NUMA machines. Tasks from LIFO buffer can be stolen on a regular basis.
*
* ### Thread management
* One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees similar
* to the regular centralized executors. The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
* The former field incorporates the amount of created threads, CPU-tokens and blocking tasks that require a thread compensation,
* One of the hardest parts of the scheduler is decentralized management of the threads with the progress guarantees
* similar to the regular centralized executors.
* The state of the threads consists of [controlState] and [parkedWorkersStack] fields.
* The former field incorporates the amount of created threads, CPU-tokens and blocking tasks
* that require a thread compensation,
* while the latter represents intrusive versioned Treiber stack of idle workers.
* When a worker cannot find any work, he first adds itself to the stack, then re-scans the queue (to avoid missing signal)
* and then attempts to park itself (there is additional layer of signalling against unnecessary park/unpark).
* If worker finds a task that it cannot yet steal due to timer constraints, it stores this fact in its state
* When a worker cannot find any work, they first add themselves to the stack,
* then re-scans the queue to avoid missing signals and then attempts to park
* with additional rendezvous against unnecessary parking.
* If a worker finds a task that it cannot yet steal due to time constraints, it stores this fact in its state
* (to be uncounted when additional work is signalled) and parks for such duration.
*
* When a new task arrives to the scheduler (whether it's local or global queue), either an idle worker is being signalled, or
* a new worker is attempted to be created (only [corePoolSize] workers can be created for regular CPU tasks).
* When a new task arrives in the scheduler (whether it is local or global queue),
* either an idle worker is being signalled, or a new worker is attempted to be created.
* Only [corePoolSize] workers can be created for regular CPU tasks)
*
* ### Dynamic resizing and support of blocking tasks
* ### Support for blocking tasks
* The scheduler also supports the notion of [blocking][TaskMode.PROBABLY_BLOCKING] tasks.
* When executing or enqueuing blocking tasks, the scheduler notifies or creates one more worker in
* addition to core pool size, so at any given moment, it has [corePoolSize] threads (potentially not yet created)
* to serve CPU-bound tasks. To properly guarantee liveness, the scheduler maintains
* "CPU permits" -- [corePoolSize] special tokens that permit an arbitrary worker to execute and steal CPU-bound tasks.
* When worker encounters blocking tasks, it basically hands off its permit to another thread (not directly though) to
* keep invariant "scheduler always has at least min(pending CPU tasks, core pool size)
* and at most core pool size threads to execute CPU tasks".
* To avoid overprovision, workers without CPU permit are allowed to scan [globalBlockingQueue]
* and steal **only** blocking tasks from other workers.
*
* TODO
* The scheduler does not limit the count of pending blocking tasks, potentially creating up to [maxPoolSize] threads.
* End users do not have access to the scheduler directly and can dispatch blocking tasks only with
* [LimitingDispatcher] that does control concurrency level by its own mechanism.
*/
@Suppress("NOTHING_TO_INLINE")
internal class CoroutineScheduler(
Expand Down Expand Up @@ -469,7 +493,7 @@ internal class CoroutineScheduler(
*/
if (worker.state === WorkerState.TERMINATED) return task
// Do not add CPU tasks in local queue if we are not able to execute it
if (task.mode == TaskMode.NON_BLOCKING && worker.isBlocking) {
if (task.mode === TaskMode.NON_BLOCKING && worker.state === WorkerState.BLOCKING) {
return task
}
worker.mayHaveLocalTasks = true
Expand All @@ -487,7 +511,6 @@ internal class CoroutineScheduler(
* E.g. for [1b, 1b, 2c, 1d] means that pool has
* two blocking workers with queue size 1, one worker with CPU permit and queue size 1
* and one dormant (executing his local queue before parking) worker with queue size 1.
* TODO revisit
*/
override fun toString(): String {
var parkedWorkers = 0
Expand Down Expand Up @@ -530,10 +553,10 @@ internal class CoroutineScheduler(
"running workers queues = $queueSizes, "+
"global CPU queue size = ${globalCpuQueue.size}, " +
"global blocking queue size = ${globalBlockingQueue.size}, " +
"Control State Workers {" +
"created = ${createdWorkers(state)}, " +
"blocking = ${blockingTasks(state)}, " +
"CPU acquired = ${corePoolSize - availableCpuPermits(state)}" +
"Control State {" +
"created workers= ${createdWorkers(state)}, " +
"blocking tasks = ${blockingTasks(state)}, " +
"CPUs acquired = ${corePoolSize - availableCpuPermits(state)}" +
"}]"
}

Expand Down Expand Up @@ -574,9 +597,8 @@ internal class CoroutineScheduler(
* Worker state. **Updated only by this worker thread**.
* By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken.
*/
@Volatile
@JvmField
var state = WorkerState.DORMANT
val isBlocking: Boolean get() = state == WorkerState.BLOCKING

/**
* Small state machine for termination.
Expand Down Expand Up @@ -634,15 +656,13 @@ internal class CoroutineScheduler(
* Tries to acquire CPU token if worker doesn't have one
* @return whether worker acquired (or already had) CPU token
*/
private fun tryAcquireCpuPermit(): Boolean {
return when {
state == WorkerState.CPU_ACQUIRED -> true
this@CoroutineScheduler.tryAcquireCpuPermit() -> {
state = WorkerState.CPU_ACQUIRED
true
}
else -> false
private fun tryAcquireCpuPermit(): Boolean = when {
state == WorkerState.CPU_ACQUIRED -> true
this@CoroutineScheduler.tryAcquireCpuPermit() -> {
state = WorkerState.CPU_ACQUIRED
true
}
else -> false
}

/**
Expand Down Expand Up @@ -711,22 +731,21 @@ internal class CoroutineScheduler(
private fun tryPark() {
if (!inStack()) {
parkingState.value = PARKING_ALLOWED
}
if (parkedWorkersStackPush(this)) {
parkedWorkersStackPush(this)
return
} else {
assert { localQueue.size == 0 }
// Failed to get a parking permit => we are not in the stack
while (inStack()) {
if (isTerminated || state == WorkerState.TERMINATED) break
if (parkingState.value != PARKED && !parkingState.compareAndSet(PARKING_ALLOWED, PARKED)) {
return
}
tryReleaseCpu(WorkerState.PARKING)
interrupted() // Cleanup interruptions
if (inStack()) {
park()
}

}
assert { localQueue.size == 0 }
// Failed to get a parking permit => we are not in the stack
while (inStack()) {
if (isTerminated || state == WorkerState.TERMINATED) break
if (parkingState.value != PARKED && !parkingState.compareAndSet(PARKING_ALLOWED, PARKED)) {
return
}
tryReleaseCpu(WorkerState.PARKING)
interrupted() // Cleanup interruptions
if (inStack()) {
park()
}
}
}
Expand Down
14 changes: 2 additions & 12 deletions kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ internal class WorkQueue {
}

fun offloadAllWorkTo(globalQueue: GlobalQueue) {
lastScheduledTask.getAndSet(null)?.let { globalQueue.add(it) }
lastScheduledTask.getAndSet(null)?.let { globalQueue.addLast(it) }
while (pollTo(globalQueue)) {
// Steal everything
}
Expand Down Expand Up @@ -173,7 +173,7 @@ internal class WorkQueue {

private fun pollTo(queue: GlobalQueue): Boolean {
val task = pollBuffer() ?: return false
queue.add(task)
queue.addLast(task)
return true
}

Expand All @@ -198,13 +198,3 @@ internal class WorkQueue {
}
}
}

private fun GlobalQueue.add(task: Task) {
/*
* globalQueue is closed as the very last step in the shutdown sequence when all worker threads had
* been already shutdown (with the only exception of the last worker thread that might be performing
* shutdown procedure itself). As a consistency check we do a [cheap!] check that it is not closed here yet.
*/
val added = addLast(task)
assert { added }
}

0 comments on commit 966020e

Please sign in to comment.