Skip to content

Commit

Permalink
New deterministic algorithm for working with blocking tasks
Browse files Browse the repository at this point in the history
Invariants:

    * Steal only one task per attempt to avoid missing steals that potentially may block the progress (check-park-check may miss tasks that are being stolen)
    * New WorkQueue.add invariant: bufferSize < capacity => add is always successful
    * Re-visited tests that expose a lot of problems
    * Ability to steal from the middle of work queue in order to steal blocking tasks with ABA prevention

Changes:

    * Instead of "blocking workers" use "blocking tasks" state that is incremented on each blocking submit and decrement only when task is completed
    * On each work signalling try to compensate blocking tasks by enforcinf invariant "created threads == blocking tasks + up to core size workers"
    * Now if worker was not spuriously woken up, it has a task dedicated for him that should be found. For that reason attempt to steal blocking tasks
      (that may be in the middle of the work queue). Additionally, instead of scanning the whole global queue, just split it in two (one for blocking, one for regular tasks)
    * Get rid of conditional remove from the global queue
    * Avoid excessive unparks for threads that are not yet able to steal the task due to workstealing resolution: do not add such workers to the stack
  • Loading branch information
qwwdfsad committed Dec 12, 2019
1 parent 4236c8c commit ab30d72
Show file tree
Hide file tree
Showing 17 changed files with 511 additions and 576 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

# Kotlin
version=1.3.2-sched10
version=1.3.2-SNAPSHOT
group=org.jetbrains.kotlinx
kotlin_version=1.3.60

Expand Down
14 changes: 3 additions & 11 deletions kotlinx-coroutines-core/common/src/internal/LockFreeTaskQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,9 @@ internal open class LockFreeTaskQueue<E : Any>(
}

@Suppress("UNCHECKED_CAST")
fun removeFirstOrNull(): E? = removeFirstOrNullIf { true }

@Suppress("UNCHECKED_CAST")
inline fun removeFirstOrNullIf(predicate: (E) -> Boolean): E? {
fun removeFirstOrNull(): E? {
_cur.loop { cur ->
val result = cur.removeFirstOrNullIf(predicate)
val result = cur.removeFirstOrNull()
if (result !== Core.REMOVE_FROZEN) return result as E?
_cur.compareAndSet(cur, cur.next())
}
Expand Down Expand Up @@ -164,10 +161,7 @@ internal class LockFreeTaskQueueCore<E : Any>(
}

// REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
fun removeFirstOrNull(): Any? = removeFirstOrNullIf { true }

// REMOVE_FROZEN | null (EMPTY) | E (SUCCESS)
inline fun removeFirstOrNullIf(predicate: (E) -> Boolean): Any? {
fun removeFirstOrNull(): Any? {
_state.loop { state ->
if (state and FROZEN_MASK != 0L) return REMOVE_FROZEN // frozen -- cannot modify
state.withState { head, tail ->
Expand All @@ -182,8 +176,6 @@ internal class LockFreeTaskQueueCore<E : Any>(
// element == Placeholder can only be when add has not finished yet
if (element is Placeholder) return null // consider it not added yet
// now we tentative know element to remove -- check predicate
@Suppress("UNCHECKED_CAST")
if (!predicate(element as E)) return null
// we cannot put null into array here, because copying thread could replace it with Placeholder and that is a disaster
val newHead = (head + 1) and MAX_CAPACITY_MASK
if (_state.compareAndSet(state, state.updateHead(newHead))) {
Expand Down
426 changes: 198 additions & 228 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

Large diffs are not rendered by default.

16 changes: 5 additions & 11 deletions kotlinx-coroutines-core/jvm/src/scheduling/Tasks.kt
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,6 @@ internal val WORK_STEALING_TIME_RESOLUTION_NS = systemProp(
"kotlinx.coroutines.scheduler.resolution.ns", 100000L
)

@JvmField
internal val QUEUE_SIZE_OFFLOAD_THRESHOLD = systemProp(
"kotlinx.coroutines.scheduler.offload.threshold", 96, maxValue = BUFFER_CAPACITY
)

@JvmField
internal val BLOCKING_DEFAULT_PARALLELISM = systemProp(
"kotlinx.coroutines.scheduler.blocking.parallelism", 16
Expand All @@ -50,7 +45,7 @@ internal val MAX_POOL_SIZE = systemProp(

@JvmField
internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 5L)
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 100000L)
)

@JvmField
Expand Down Expand Up @@ -87,9 +82,11 @@ internal abstract class Task(
@JvmField var taskContext: TaskContext
) : Runnable {
constructor() : this(0, NonBlockingContext)
val mode: TaskMode get() = taskContext.taskMode
inline val mode: TaskMode get() = taskContext.taskMode
}

internal inline val Task.isBlocking get() = taskContext.taskMode == TaskMode.PROBABLY_BLOCKING

// Non-reusable Task implementation to wrap Runnable instances that do not otherwise implement task
internal class TaskImpl(
@JvmField val block: Runnable,
Expand All @@ -109,10 +106,7 @@ internal class TaskImpl(
}

// Open for tests
internal open class GlobalQueue : LockFreeTaskQueue<Task>(singleConsumer = false) {
public fun removeFirstWithModeOrNull(mode: TaskMode): Task? =
removeFirstOrNullIf { it.mode == mode }
}
internal class GlobalQueue : LockFreeTaskQueue<Task>(singleConsumer = false)

internal abstract class TimeSource {
abstract fun nanoTime(): Long
Expand Down
152 changes: 85 additions & 67 deletions kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ internal const val NOTHING_TO_STEAL = -2L
* that these two (current one and submitted) are communicating and sharing state thus making such communication extremely fast.
* E.g. submitted jobs [1, 2, 3, 4] will be executed in [4, 1, 2, 3] order.
*
* ### Work offloading
*
* When the queue is full, half of existing tasks are offloaded to global queue which is regularly polled by other pool workers.
* Offloading occurs in LIFO order for the sake of implementation simplicity: offloads should be extremely rare and occurs only in specific use-cases
* (e.g. when coroutine starts heavy fork-join-like computation), so fairness is not important.
* As an alternative, offloading directly to some [CoroutineScheduler.Worker] may be used, but then the strategy of selecting any idle worker
* should be implemented and implementation should be aware multiple producers.
*
* @suppress **This is unstable API and it is subject to change.**
* ### Algorithm and implementation details
* This is a regular SPMC bounded queue with the additional property that tasks can be removed from the middle of the queue
* (scheduler workers without a CPU permit steal blocking tasks via this mechanism). Such property enforces us to use CAS in
* order to properly claim value from the buffer.
* Moreover, [Task] objects are reusable, so it may seem that this queue is prone to ABA problem.
* Indeed it formally has ABA-problem, but the whole processing logic is written in the way that such ABA is harmless.
* "I have discovered a truly marvelous proof of this, which this margin is too narrow to contain"
*/
internal class WorkQueue {

Expand All @@ -58,18 +56,21 @@ internal class WorkQueue {

private val producerIndex = atomic(0)
private val consumerIndex = atomic(0)
// Shortcut to avoid scanning queue without blocking tasks
private val blockingTasksInBuffer = atomic(0)

/**
* Retrieves and removes task from the head of the queue
* Invariant: this method is called only by the owner of the queue ([stealBatch] is not)
* Invariant: this method is called only by the owner of the queue.
*/
fun poll(): Task? = lastScheduledTask.getAndSet(null) ?: pollBuffer()

/**
* Invariant: Called only by the owner of the queue, returns
* `null` if task was added, task that wasn't added otherwise.
*/
fun add(task: Task): Task? {
fun add(task: Task, fair: Boolean = false): Task? {
if (fair) return addLast(task)
val previous = lastScheduledTask.getAndSet(task) ?: return null
return addLast(previous)
}
Expand All @@ -78,18 +79,20 @@ internal class WorkQueue {
* Invariant: Called only by the owner of the queue, returns
* `null` if task was added, task that wasn't added otherwise.
*/
fun addLast(task: Task): Task? {
private fun addLast(task: Task): Task? {
if (task.isBlocking) blockingTasksInBuffer.incrementAndGet()
if (bufferSize == BUFFER_CAPACITY - 1) return task
val headLocal = producerIndex.value
val nextIndex = headLocal and MASK

val nextIndex = producerIndex.value and MASK
/*
* If current element is not null then we're racing with consumers for the tail. If we skip this check then
* the consumer can null out current element and it will be lost. If we're racing for tail then
* the queue is close to overflowing => return task
* If current element is not null then we're racing with a really slow consumer that committed the consumer index,
* but hasn't yet nulled out the slot, effectively preventing us from using it.
* Such situations are very rare in practise (although possible) and we decided to give up a progress guarantee
* to have a stronger invariant "add to queue with bufferSize == 0 is always successful".
* This algorithm can still be wait-free for add, but if and only if tasks are not reusable, otherwise
* nulling out the buffer wouldn't be possible.
*/
if (buffer[nextIndex] != null) {
return task
while (buffer[nextIndex] != null) {
Thread.yield()
}
buffer.lazySet(nextIndex, task)
producerIndex.incrementAndGet()
Expand All @@ -103,18 +106,52 @@ internal class WorkQueue {
* or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
*/
fun tryStealFrom(victim: WorkQueue): Long {
if (victim.stealBatch { task -> add(task) }) {
assert { bufferSize == 0 }
val task = victim.pollBuffer()
if (task != null) {
val notAdded = add(task)
assert { notAdded == null }
return TASK_STOLEN
}
return tryStealLastScheduled(victim)
return tryStealLastScheduled(victim, blockingOnly = false)
}

fun tryStealBlockingFrom(victim: WorkQueue): Long {
assert { bufferSize == 0 }
var start = victim.consumerIndex.value
val end = victim.producerIndex.value
val buffer = victim.buffer

while (start != end) {
val index = start and MASK
if (victim.blockingTasksInBuffer.value == 0) break
val value = buffer[index]
if (value != null && value.isBlocking && buffer.compareAndSet(index, value, null)) {
victim.blockingTasksInBuffer.decrementAndGet()
add(value)
return TASK_STOLEN
} else {
++start
}
}
return tryStealLastScheduled(victim, blockingOnly = true)
}

fun offloadAllWorkTo(globalQueue: GlobalQueue) {
lastScheduledTask.getAndSet(null)?.let { globalQueue.add(it) }
while (pollTo(globalQueue)) {
// Steal everything
}
}

/**
* Contract on return value is the same as for [tryStealFrom]
*/
private fun tryStealLastScheduled(victim: WorkQueue): Long {
private fun tryStealLastScheduled(victim: WorkQueue, blockingOnly: Boolean): Long {
while (true) {
val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL
if (blockingOnly && !lastScheduled.isBlocking) return NOTHING_TO_STEAL

// TODO time wraparound ?
val time = schedulerTimeSource.nanoTime()
val staleness = time - lastScheduled.submissionTime
Expand All @@ -134,49 +171,10 @@ internal class WorkQueue {
}
}

private fun GlobalQueue.add(task: Task) {
/*
* globalQueue is closed as the very last step in the shutdown sequence when all worker threads had
* been already shutdown (with the only exception of the last worker thread that might be performing
* shutdown procedure itself). As a consistency check we do a [cheap!] check that it is not closed here yet.
*/
val added = addLast(task)
assert { added }
}

internal fun offloadAllWork(globalQueue: GlobalQueue) {
lastScheduledTask.getAndSet(null)?.let { globalQueue.add(it) }
while (stealBatchTo(globalQueue)) {
// Steal everything
}
}

/**
* Method that is invoked by external workers to steal work.
* Half of the buffer (at least 1) is stolen, returns `true` if at least one task was stolen.
*/
private inline fun stealBatch(consumer: (Task) -> Unit): Boolean {
val size = bufferSize
if (size == 0) return false
var toSteal = (size / 2).coerceAtLeast(1)
var wasStolen = false
while (toSteal-- > 0) {
val tailLocal = consumerIndex.value
if (tailLocal - producerIndex.value == 0) return wasStolen
val index = tailLocal and MASK
val element = buffer[index] ?: continue
if (consumerIndex.compareAndSet(tailLocal, tailLocal + 1)) {
// 1) Help GC 2) Signal producer that this slot is consumed and may be used
consumer(element)
buffer[index] = null
wasStolen = true
}
}
return wasStolen
}

private fun stealBatchTo(queue: GlobalQueue): Boolean {
return stealBatch { queue.add(it) }
private fun pollTo(queue: GlobalQueue): Boolean {
val task = pollBuffer() ?: return false
queue.add(task)
return true
}

private fun pollBuffer(): Task? {
Expand All @@ -185,8 +183,28 @@ internal class WorkQueue {
if (tailLocal - producerIndex.value == 0) return null
val index = tailLocal and MASK
if (consumerIndex.compareAndSet(tailLocal, tailLocal + 1)) {
return buffer.getAndSet(index, null)
// Nulls are allowed when blocking tasks are stolen from the middle of the queue.
val value = buffer.getAndSet(index, null) ?: continue
value.decrementIfBlocking()
return value
}
}
}

private fun Task?.decrementIfBlocking() {
if (this != null && isBlocking) {
val value = blockingTasksInBuffer.decrementAndGet()
assert { value >= 0 }
}
}
}

private fun GlobalQueue.add(task: Task) {
/*
* globalQueue is closed as the very last step in the shutdown sequence when all worker threads had
* been already shutdown (with the only exception of the last worker thread that might be performing
* shutdown procedure itself). As a consistency check we do a [cheap!] check that it is not closed here yet.
*/
val added = addLast(task)
assert { added }
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,11 @@ import org.junit.Test
import java.util.concurrent.atomic.*
import kotlin.test.*

class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() {
/**
* Test that ensures implementation correctness of [LimitingDispatcher] and
* designed to stress its particular implementation details.
*/
class BlockingCoroutineDispatcherLivenessStressTest : SchedulerTestBase() {
private val concurrentWorkers = AtomicInteger(0)

@Before
Expand All @@ -29,15 +33,14 @@ class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() {
async(limitingDispatcher) {
try {
val currentlyExecuting = concurrentWorkers.incrementAndGet()
require(currentlyExecuting == 1)
assertEquals(1, currentlyExecuting)
} finally {
concurrentWorkers.decrementAndGet()
}
}
}
tasks.forEach { it.await() }
}
checkPoolThreadsCreated(2..4)
}

@Test
Expand Down
Loading

0 comments on commit ab30d72

Please sign in to comment.