Skip to content

Commit

Permalink
Improve performance of task acquisition
Browse files Browse the repository at this point in the history
    * Do not push worker to the stack during second pass of "min duration scanning"
    * Locally cache whether local queue has any work to execute to save atomic getAndSet and a bunch of atomic loads
    * More precise rendezvous for parking
    * Long-scanning stealing to emulate spinning
    * Properly handle interference of termination sequence and protection against spurious wakeups
    * Documentation improvements, naming, proper spurious wakeup check
  • Loading branch information
qwwdfsad committed Dec 12, 2019
1 parent ab30d72 commit d77c17c
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,10 @@ open class DispatchersContextSwitchBenchmark {
repeat(nCoroutines) {
launch(dispatcher) {
repeat(nRepeatDelay) {
delayOrYield()
delay(delayTimeMs)
}
}
}
}
}

private suspend fun delayOrYield() {
delay(delayTimeMs)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import java.util.concurrent.*
* CycledActorsBenchmark.cycledActors 262144 experimental avgt 14 1804.146 ± 57.275 ms/op
*/
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 3)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
Expand All @@ -43,7 +43,7 @@ open class CycledActorsBenchmark : ParametrizedDispatcherBase() {
@Param("fjp", "ftp_1", "scheduler")
override var dispatcher: String = "fjp"

@Param("524288")
@Param("1", "1024")
var actorStateSize = 1

@Benchmark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import java.util.concurrent.*
*/
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import kotlin.coroutines.*
* PingPongWithBlockingContext.withContextPingPong avgt 20 761.669 ± 41.371 ms/op
*/
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 10, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import java.util.concurrent.*
*/
@Warmup(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Measurement(iterations = 5, time = 1, timeUnit = TimeUnit.SECONDS)
@Fork(value = 2)
@Fork(value = 1)
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,6 @@ internal class LockFreeTaskQueueCore<E : Any>(
}
// element == Placeholder can only be when add has not finished yet
if (element is Placeholder) return null // consider it not added yet
// now we tentative know element to remove -- check predicate
// we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
val newHead = (head + 1) and MAX_CAPACITY_MASK
if (_state.compareAndSet(state, state.updateHead(newHead))) {
Expand Down
136 changes: 80 additions & 56 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kotlin.random.*
* ### Structural overview
*
* Scheduler consists of [corePoolSize] worker threads to execute CPU-bound tasks and up to [maxPoolSize] lazily created threads
* to execute blocking tasks. Every worker a has local queue in addition to a global scheduler queue and the global queue
* to execute blocking tasks. Every worker has a local queue in addition to a global scheduler queue and the global queue
* has priority over local queue to avoid starvation of externally-submitted (e.g. from Android UI thread) tasks.
* Work-stealing is implemented on top of that queues to provide even load distribution and illusion of centralized run queue.
*
Expand Down Expand Up @@ -245,7 +245,7 @@ internal class CoroutineScheduler(
*/
private val controlState = atomic(corePoolSize.toLong() shl CPU_PERMITS_SHIFT)
private val createdWorkers: Int inline get() = (controlState.value and CREATED_MASK).toInt()
private val availableCpuPermits: Int inline get() = (controlState.value and CPU_PERMITS_MASK shr CPU_PERMITS_SHIFT).toInt()
private val availableCpuPermits: Int inline get() = availableCpuPermits(controlState.value)

private inline fun createdWorkers(state: Long): Int = (state and CREATED_MASK).toInt()
private inline fun blockingTasks(state: Long): Int = (state and BLOCKING_MASK shr BLOCKING_SHIFT).toInt()
Expand All @@ -261,14 +261,11 @@ internal class CoroutineScheduler(
controlState.addAndGet(-(1L shl BLOCKING_SHIFT))
}

private inline fun tryAcquireCpuPermit(): Boolean {
while (true) {
val state = controlState.value
val available = availableCpuPermits(state)
if (available == 0) return false
val update = state - (1L shl CPU_PERMITS_SHIFT)
if (controlState.compareAndSet(state, update)) return true
}
private inline fun tryAcquireCpuPermit(): Boolean = controlState.loop { state ->
val available = availableCpuPermits(state)
if (available == 0) return false
val update = state - (1L shl CPU_PERMITS_SHIFT)
if (controlState.compareAndSet(state, update)) return true
}

private inline fun releaseCpuPermit() = controlState.addAndGet(1L shl CPU_PERMITS_SHIFT)
Expand All @@ -283,9 +280,12 @@ internal class CoroutineScheduler(
val NOT_IN_STACK = Symbol("NOT_IN_STACK")

// Worker termination states
private const val FORBIDDEN = -1
private const val ALLOWED = 0
private const val TERMINATION_FORBIDDEN = -1
private const val TERMINATION_ALLOWED = 0
private const val TERMINATED = 1
// Worker parking states
private const val PARKING_FORBIDDEN = -1
private const val PARKING_ALLOWED = 0
private const val PARKED = 1

// Masks of control state
Expand Down Expand Up @@ -334,7 +334,7 @@ internal class CoroutineScheduler(
globalCpuQueue.close()
// Finish processing tasks from globalQueue and/or from this worker's local queue
while (true) {
val task = currentWorker?.findTask()
val task = currentWorker?.findTask(true)
?: globalCpuQueue.removeFirstOrNull()
?: globalBlockingQueue.removeFirstOrNull()
?: break
Expand Down Expand Up @@ -419,7 +419,7 @@ internal class CoroutineScheduler(
private fun tryUnpark(): Boolean {
while (true) {
val worker = parkedWorkersStackPop() ?: return false
if (!worker.parkingState.compareAndSet(ALLOWED, FORBIDDEN)) {
if (!worker.parkingState.compareAndSet(PARKING_ALLOWED, PARKING_FORBIDDEN)) {
LockSupport.unpark(worker)
}
if (worker.tryForbidTermination()) return true
Expand Down Expand Up @@ -469,10 +469,10 @@ internal class CoroutineScheduler(
*/
if (worker.state === WorkerState.TERMINATED) return task
// Do not add CPU tasks in local queue if we are not able to execute it
// TODO discuss: maybe add it to the local queue and offload back in the global queue iff permit wasn't acquired?
if (task.mode == TaskMode.NON_BLOCKING && worker.isBlocking) {
return task
}
worker.mayHaveLocalTasks = true
return worker.localQueue.add(task, fair = fair)
}

Expand Down Expand Up @@ -525,7 +525,7 @@ internal class CoroutineScheduler(
"CPU = $cpuWorkers, " +
"blocking = $blockingWorkers, " +
"parked = $parkedWorkers, " +
"retired = $dormant, " +
"dormant = $dormant, " +
"terminated = $terminated}, " +
"running workers queues = $queueSizes, "+
"global CPU queue size = ${globalCpuQueue.size}, " +
Expand Down Expand Up @@ -581,16 +581,16 @@ internal class CoroutineScheduler(
/**
* Small state machine for termination.
* Followed states are allowed:
* [ALLOWED] -- worker can wake up and terminate itself
* [FORBIDDEN] -- worker is not allowed to terminate (because it was chosen by another thread to help)
* [TERMINATION_ALLOWED] -- worker can wake up and terminate itself
* [TERMINATION_FORBIDDEN] -- worker is not allowed to terminate (because it was chosen by another thread to help)
* [TERMINATED] -- final state, thread is terminating and cannot be resurrected
*
* Allowed transitions:
* [ALLOWED] -> [FORBIDDEN]
* [ALLOWED] -> [TERMINATED]
* [FORBIDDEN] -> [ALLOWED]
* [TERMINATION_ALLOWED] -> [TERMINATION_FORBIDDEN]
* [TERMINATION_ALLOWED] -> [TERMINATED]
* [TERMINATION_FORBIDDEN] -> [TERMINATION_ALLOWED]
*/
private val terminationState = atomic(ALLOWED)
private val terminationState = atomic(TERMINATION_ALLOWED)

/**
* It is set to the termination deadline when started doing [park] and it reset
Expand All @@ -610,22 +610,22 @@ internal class CoroutineScheduler(
* The delay until at least one task in other worker queues will become stealable.
*/
private var minDelayUntilStealableTaskNs = 0L
// ALLOWED | PARKED | FORBIDDEN
val parkingState = atomic(ALLOWED)
// PARKING_ALLOWED | PARKING_FORBIDDEN | PARKED
val parkingState = atomic(PARKING_ALLOWED)

private var rngState = Random.nextInt()
/**
* Tries to set [terminationState] to [FORBIDDEN], returns `false` if this attempt fails.
* Tries to set [terminationState] to [TERMINATION_FORBIDDEN], returns `false` if this attempt fails.
* This attempt may fail either because worker terminated itself or because someone else
* claimed this worker (though this case is rare, because require very bad timings)
*/
fun tryForbidTermination(): Boolean =
when (val state = terminationState.value) {
TERMINATED -> false // already terminated
FORBIDDEN -> false // already forbidden, someone else claimed this worker
ALLOWED -> terminationState.compareAndSet(
ALLOWED,
FORBIDDEN
TERMINATION_FORBIDDEN -> false // already forbidden, someone else claimed this worker
TERMINATION_ALLOWED -> terminationState.compareAndSet(
TERMINATION_ALLOWED,
TERMINATION_FORBIDDEN
)
else -> error("Invalid terminationState = $state")
}
Expand Down Expand Up @@ -658,61 +658,77 @@ internal class CoroutineScheduler(
}

override fun run() = runWorker()
@JvmField
var mayHaveLocalTasks = false

private fun runWorker() {
var rescanned = false
while (!isTerminated && state != WorkerState.TERMINATED) {
val task = findTask()
val task = findTask(mayHaveLocalTasks)
// Task found. Execute and repeat
if (task != null) {
rescanned = false
minDelayUntilStealableTaskNs = 0L
executeTask(task)
continue
} else {
mayHaveLocalTasks = false
}
/*
* No tasks were found:
* 1) Either at least one of the workers has stealable task in its FIFO-buffer with a stealing deadline.
* Then its deadline is stored in [minDelayUntilStealableTask]
*
* Then just park for that duration (ditto re-scanning).
* While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
* While it could potentially lead to short (up to WORK_STEALING_TIME_RESOLUTION_NS ns) starvations,
* excess unparks and managing "one unpark per signalling" invariant become unfeasible, instead we are going to resolve
* it with "spinning via scans" mechanism.
* NB: this short potential parking does not interfere with `tryUnpark`
*/
if (minDelayUntilStealableTaskNs != 0L) {
if (!rescanned) {
rescanned = true
continue
} else {
rescanned = false
tryReleaseCpu(WorkerState.PARKING)
interrupted()
LockSupport.parkNanos(minDelayUntilStealableTaskNs)
minDelayUntilStealableTaskNs = 0L
}
continue
}
/*
* 2) No tasks available, time to park and, potentially, shut down the thread.
*
* 2) Or no tasks available, time to park and, potentially, shut down the thread.
* Add itself to the stack of parked workers, re-scans all the queues
* to avoid missing wake-up (requestCpuWorker) and either starts executing discovered tasks or parks itself awaiting for new tasks.
*/
parkingState.value = ALLOWED
if (parkedWorkersStackPush(this)) {
continue
} else {
assert { localQueue.size == 0 }
tryPark()
}
tryReleaseCpu(WorkerState.TERMINATED)
}

// Counterpart to "tryUnpark"
private fun tryPark() {
if (!inStack()) {
parkingState.value = PARKING_ALLOWED
}
if (parkedWorkersStackPush(this)) {
return
} else {
assert { localQueue.size == 0 }
// Failed to get a parking permit => we are not in the stack
while (inStack()) {
if (isTerminated || state == WorkerState.TERMINATED) break
if (parkingState.value != PARKED && !parkingState.compareAndSet(PARKING_ALLOWED, PARKED)) {
return
}
tryReleaseCpu(WorkerState.PARKING)
interrupted() // Cleanup interruptions
while (inStack()) { // Prevent spurious wakeups
if (isTerminated) break
if (!parkingState.compareAndSet(ALLOWED, PARKED)) {
break
}
if (inStack()) {
park()
}
}
}
tryReleaseCpu(WorkerState.TERMINATED)
}

private fun inStack(): Boolean = nextParkedWorker !== NOT_IN_STACK
Expand Down Expand Up @@ -763,7 +779,7 @@ internal class CoroutineScheduler(
}

private fun park() {
terminationState.value = ALLOWED
terminationState.value = TERMINATION_ALLOWED
// set termination deadline the first time we are here (it is reset in idleReset)
if (terminationDeadline == 0L) terminationDeadline = System.nanoTime() + idleWorkerKeepAliveNs
// actually park
Expand All @@ -789,7 +805,7 @@ internal class CoroutineScheduler(
* See tryUnpark for state reasoning.
* If this CAS fails, then we were successfully unparked by other worker and cannot terminate.
*/
if (!terminationState.compareAndSet(ALLOWED, TERMINATED)) return
if (!terminationState.compareAndSet(TERMINATION_ALLOWED, TERMINATED)) return
/*
* At this point this thread is no longer considered as usable for scheduling.
* We need multi-step choreography to reindex workers.
Expand Down Expand Up @@ -841,22 +857,30 @@ internal class CoroutineScheduler(
}
}

fun findTask(): Task? {
if (tryAcquireCpuPermit()) return findAnyTask()
fun findTask(scanLocalQueue: Boolean): Task? {
if (tryAcquireCpuPermit()) return findAnyTask(scanLocalQueue)
// If we can't acquire a CPU permit -- attempt to find blocking task
val task = localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
val task = if (scanLocalQueue) {
localQueue.poll() ?: globalBlockingQueue.removeFirstOrNull()
} else {
globalBlockingQueue.removeFirstOrNull()
}
return task ?: trySteal(blockingOnly = true)
}

private fun findAnyTask(): Task? {
private fun findAnyTask(scanLocalQueue: Boolean): Task? {
/*
* Anti-starvation mechanism: probabilistically poll either local
* or global queue to ensure progress for both external and internal tasks.
*/
val globalFirst = nextInt(2 * corePoolSize) == 0
if (globalFirst) pollGlobalQueues()?.let { return it }
localQueue.poll()?.let { return it }
if (!globalFirst) pollGlobalQueues()?.let { return it }
if (scanLocalQueue) {
val globalFirst = nextInt(2 * corePoolSize) == 0
if (globalFirst) pollGlobalQueues()?.let { return it }
localQueue.poll()?.let { return it }
if (!globalFirst) pollGlobalQueues()?.let { return it }
} else {
pollGlobalQueues()?.let { return it }
}
return trySteal(blockingOnly = false)
}

Expand All @@ -880,7 +904,7 @@ internal class CoroutineScheduler(

var currentIndex = nextInt(created)
var minDelay = Long.MAX_VALUE
repeat(created) {
repeat(workers.length()) {
++currentIndex
if (currentIndex > created) currentIndex = 1
val worker = workers[currentIndex]
Expand Down
2 changes: 1 addition & 1 deletion kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ internal val MAX_POOL_SIZE = systemProp(

@JvmField
internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 100000L)
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 60L)
)

@JvmField
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.junit.*
import java.util.*
import java.util.concurrent.*

class BlockingIOTerminationStressTest : TestBase() {
class BlockingCoroutineDispatcherTerminationStressTest : TestBase() {
private val baseDispatcher = ExperimentalCoroutineDispatcher(
2, 20,
TimeUnit.MILLISECONDS.toNanos(10)
Expand Down

0 comments on commit d77c17c

Please sign in to comment.