From 4224e01552ace0cc0f719762ddda8c13a84a62e7 Mon Sep 17 00:00:00 2001 From: Vsevolod Tolstopyatov Date: Tue, 10 Dec 2019 17:09:43 +0300 Subject: [PATCH] Merge termination state machine and parking state machine into one, get rid of long-scanning sequence --- .../jvm/src/scheduling/CoroutineScheduler.kt | 67 +++++-------------- .../jvm/src/scheduling/WorkQueue.kt | 2 +- 2 files changed, 18 insertions(+), 51 deletions(-) diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt index 40daf1cd41..09e9deb838 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt @@ -303,14 +303,10 @@ internal class CoroutineScheduler( @JvmField val NOT_IN_STACK = Symbol("NOT_IN_STACK") - // Worker termination states - private const val TERMINATION_FORBIDDEN = -1 - private const val TERMINATION_ALLOWED = 0 + // Worker ctl states + private const val PARKED = -1 + private const val CLAIMED = 0 private const val TERMINATED = 1 - // Worker parking states - private const val PARKING_FORBIDDEN = -1 - private const val PARKING_ALLOWED = 0 - private const val PARKED = 1 // Masks of control state private const val BLOCKING_SHIFT = 21 // 2M threads max @@ -443,10 +439,10 @@ internal class CoroutineScheduler( private fun tryUnpark(): Boolean { while (true) { val worker = parkedWorkersStackPop() ?: return false - if (!worker.parkingState.compareAndSet(PARKING_ALLOWED, PARKING_FORBIDDEN)) { + if (worker.workerCtl.compareAndSet(PARKED, CLAIMED)) { LockSupport.unpark(worker) + return true } - if (worker.tryForbidTermination()) return true } } @@ -596,23 +592,19 @@ internal class CoroutineScheduler( /** * Worker state. **Updated only by this worker thread**. * By default, worker is in DORMANT state in the case when it was created, but all CPU tokens or tasks were taken. + * Is used locally by the worker to maintain its own invariants. */ @JvmField var state = WorkerState.DORMANT /** - * Small state machine for termination. - * Followed states are allowed: - * [TERMINATION_ALLOWED] -- worker can wake up and terminate itself - * [TERMINATION_FORBIDDEN] -- worker is not allowed to terminate (because it was chosen by another thread to help) - * [TERMINATED] -- final state, thread is terminating and cannot be resurrected - * - * Allowed transitions: - * [TERMINATION_ALLOWED] -> [TERMINATION_FORBIDDEN] - * [TERMINATION_ALLOWED] -> [TERMINATED] - * [TERMINATION_FORBIDDEN] -> [TERMINATION_ALLOWED] + * Worker control state responsible for worker claiming, parking and termination. + * List of states: + * [PARKED] -- worker is parked and can self-terminate after a termination deadline. + * [CLAIMED] -- worker is claimed by an external submitter. + * [TERMINATED] -- worker is terminated and no longer usable. */ - private val terminationState = atomic(TERMINATION_ALLOWED) + val workerCtl = atomic(CLAIMED) /** * It is set to the termination deadline when started doing [park] and it reset @@ -632,25 +624,8 @@ internal class CoroutineScheduler( * The delay until at least one task in other worker queues will become stealable. */ private var minDelayUntilStealableTaskNs = 0L - // PARKING_ALLOWED | PARKING_FORBIDDEN | PARKED - val parkingState = atomic(PARKING_ALLOWED) private var rngState = Random.nextInt() - /** - * Tries to set [terminationState] to [TERMINATION_FORBIDDEN], returns `false` if this attempt fails. - * This attempt may fail either because worker terminated itself or because someone else - * claimed this worker (though this case is rare, because require very bad timings) - */ - fun tryForbidTermination(): Boolean = - when (val state = terminationState.value) { - TERMINATED -> false // already terminated - TERMINATION_FORBIDDEN -> false // already forbidden, someone else claimed this worker - TERMINATION_ALLOWED -> terminationState.compareAndSet( - TERMINATION_ALLOWED, - TERMINATION_FORBIDDEN - ) - else -> error("Invalid terminationState = $state") - } /** * Tries to acquire CPU token if worker doesn't have one @@ -730,23 +705,16 @@ internal class CoroutineScheduler( // Counterpart to "tryUnpark" private fun tryPark() { if (!inStack()) { - parkingState.value = PARKING_ALLOWED parkedWorkersStackPush(this) return - } assert { localQueue.size == 0 } - // Failed to get a parking permit => we are not in the stack - while (inStack()) { + workerCtl.value = PARKED // Update value once + while (inStack()) { // Prevent spurious wakeups if (isTerminated || state == WorkerState.TERMINATED) break - if (parkingState.value != PARKED && !parkingState.compareAndSet(PARKING_ALLOWED, PARKED)) { - return - } tryReleaseCpu(WorkerState.PARKING) interrupted() // Cleanup interruptions - if (inStack()) { - park() - } + park() } } @@ -798,7 +766,6 @@ internal class CoroutineScheduler( } private fun park() { - terminationState.value = TERMINATION_ALLOWED // set termination deadline the first time we are here (it is reset in idleReset) if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs // actually park @@ -824,7 +791,7 @@ internal class CoroutineScheduler( * See tryUnpark for state reasoning. * If this CAS fails, then we were successfully unparked by other worker and cannot terminate. */ - if (!terminationState.compareAndSet(TERMINATION_ALLOWED, TERMINATED)) return + if (!workerCtl.compareAndSet(PARKED, TERMINATED)) return /* * At this point this thread is no longer considered as usable for scheduling. * We need multi-step choreography to reindex workers. @@ -923,7 +890,7 @@ internal class CoroutineScheduler( var currentIndex = nextInt(created) var minDelay = Long.MAX_VALUE - repeat(workers.length()) { + repeat(created) { ++currentIndex if (currentIndex > created) currentIndex = 1 val worker = workers[currentIndex] diff --git a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt index f5d94fb779..1a0603e413 100644 --- a/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt +++ b/kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt @@ -32,7 +32,7 @@ internal const val NOTHING_TO_STEAL = -2L * order to properly claim value from the buffer. * Moreover, [Task] objects are reusable, so it may seem that this queue is prone to ABA problem. * Indeed it formally has ABA-problem, but the whole processing logic is written in the way that such ABA is harmless. - * "I have discovered a truly marvelous proof of this, which this margin is too narrow to contain" + * I have discovered a truly marvelous proof of this, which this KDoc is too narrow to contain. */ internal class WorkQueue {