Skip to content

Commit

Permalink
CoroutineScheduler parking strategy rework
Browse files Browse the repository at this point in the history
    * WorkQueue.trySteal reports not only whether the steal was successful, but also a waiting time unless task becomes stealable
    * CoroutineScheduler.trySteal attempts to steal from all the workers (starting from the random position) per iteration to have deterministic stealing
    * Parking mechanics rework. After unsuccessful findTask, worker immediately adds itself to parking stack, then rescans all the queues to avoid missing tryUnparks and only then parks itself (parking duration depends on WorkQueue.trySteal result), terminating later
    * Excessive spinning and parking is completely eliminated, significantly (x3) reducing CPU-consumption and making CoroutineScheduler on-par with FJP and FTP on Ktor-like workloads
    * Downside of aggressive parking is a cost of slow-path unpark payed by external submitters that can be shown in degraded DispatchersContextSwitchBenchmark. Follow-up commits will fix that problem
    * Retry on tryStealLastScheduled failures to avoid potential starvation
    * Merge available CPU permits with controlState to simplify reasoning about pool state and make all state transitions atomic
    * Get rid of synthetic accessors
  • Loading branch information
qwwdfsad committed Dec 12, 2019
1 parent 5202a8b commit f27d176
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 182 deletions.
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
#

# Kotlin
version=1.3.2-SNAPSHOT
version=1.3.2-sched10
group=org.jetbrains.kotlinx
kotlin_version=1.3.60

Expand Down
301 changes: 149 additions & 152 deletions kotlinx-coroutines-core/jvm/src/scheduling/CoroutineScheduler.kt

Large diffs are not rendered by default.

45 changes: 29 additions & 16 deletions kotlinx-coroutines-core/jvm/src/scheduling/WorkQueue.kt
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ internal const val BUFFER_CAPACITY_BASE = 7
internal const val BUFFER_CAPACITY = 1 shl BUFFER_CAPACITY_BASE
internal const val MASK = BUFFER_CAPACITY - 1 // 128 by default

internal const val TASK_STOLEN = -1L
internal const val NOTHING_TO_STEAL = -2L

/**
* Tightly coupled with [CoroutineScheduler] queue of pending tasks, but extracted to separate file for simplicity.
* At any moment queue is used only by [CoroutineScheduler.Worker] threads, has only one producer (worker owning this queue)
Expand Down Expand Up @@ -49,10 +52,7 @@ internal class WorkQueue {
* This is in general harmless because steal will be blocked by timer
*/
internal val bufferSize: Int get() = producerIndex.value - consumerIndex.value

// TODO replace with inlined array when atomicfu will support it
private val buffer: AtomicReferenceArray<Task?> = AtomicReferenceArray(BUFFER_CAPACITY)

private val lastScheduledTask = atomic<Task?>(null)

private val producerIndex = atomic(0)
Expand Down Expand Up @@ -93,30 +93,43 @@ internal class WorkQueue {
/**
* Tries stealing from [victim] queue into this queue, using [globalQueue] to offload stolen tasks in case of current queue overflow.
*
* @return whether any task was stolen
* Returns [NOTHING_TO_STEAL] if queue has nothing to steal, [TASK_STOLEN] if at least task was stolen
* or positive value of how many nanoseconds should pass until the head of this queue will be available to steal.
*/
fun trySteal(victim: WorkQueue, globalQueue: GlobalQueue): Boolean {
fun trySteal(victim: WorkQueue, globalQueue: GlobalQueue): Long {
if (victim.stealBatch { task -> add(task, globalQueue) }) {
return true
return TASK_STOLEN
}
return tryStealLastScheduled(victim, globalQueue)
}

/**
* Contract on return value is the same as for [trySteal]
*/
private fun tryStealLastScheduled(
victim: WorkQueue,
globalQueue: GlobalQueue
): Boolean {
val lastScheduled = victim.lastScheduledTask.value ?: return false
val time = schedulerTimeSource.nanoTime()
if (time - lastScheduled.submissionTime < WORK_STEALING_TIME_RESOLUTION_NS) {
return false
}
): Long {
while (true) {
val lastScheduled = victim.lastScheduledTask.value ?: return NOTHING_TO_STEAL
// TODO time wraparound ?
val time = schedulerTimeSource.nanoTime()
val staleness = time - lastScheduled.submissionTime
if (staleness < WORK_STEALING_TIME_RESOLUTION_NS) {
return WORK_STEALING_TIME_RESOLUTION_NS - staleness
}

if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) {
add(lastScheduled, globalQueue)
return true
/*
* If CAS has failed, either someone else had stolen this task or the owner executed this task
* and dispatched another one. In the latter case we should retry to avoid missing task.
*/
if (victim.lastScheduledTask.compareAndSet(lastScheduled, null)) {
add(lastScheduled, globalQueue)
return TASK_STOLEN
}
continue
}
return false

}

internal fun size(): Int = if (lastScheduledTask.value != null) bufferSize + 1 else bufferSize
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ package kotlinx.coroutines.scheduling

import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
import java.util.concurrent.atomic.*
import kotlin.test.*

class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() {
private val concurrentWorkers = AtomicInteger(0)
Expand All @@ -33,30 +35,27 @@ class BlockingCoroutineDispatcherRaceStressTest : SchedulerTestBase() {
}
}
}

tasks.forEach { it.await() }
}

checkPoolThreadsCreated(2..4)
}

@Test
fun testPingPongThreadsCount() = runBlocking {
corePoolSize = CORES_COUNT
val iterations = 100_000 * stressTestMultiplier
// Stress test for specific case (race #2 from LimitingDispatcher). Shouldn't hang.
val completed = AtomicInteger(0)
for (i in 1..iterations) {
val tasks = (1..2).map {
async(dispatcher) {
// Useless work
concurrentWorkers.incrementAndGet()
concurrentWorkers.decrementAndGet()
completed.incrementAndGet()
}
}

tasks.forEach { it.await() }
}

checkPoolThreadsCreated(CORES_COUNT)
assertEquals(2 * iterations, completed.get())
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,4 @@ class BlockingCoroutineDispatcherTest : SchedulerTestBase() {
fun testZeroParallelism() {
blockingDispatcher(0)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,10 @@

package kotlinx.coroutines.scheduling

import kotlinx.coroutines.TestBase
import org.junit.Test
import kotlinx.coroutines.*
import org.junit.*
import java.lang.Runnable
import java.util.concurrent.*
import java.util.concurrent.CountDownLatch
import kotlin.coroutines.*

class CoroutineSchedulerTest : TestBase() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ class WorkQueueStressTest : TestBase() {
val myQueue = WorkQueue()
startLatch.await()
while (stolen.size != offerIterations) {
if (!myQueue.trySteal(producerQueue, stolen)) {
if (myQueue.trySteal(producerQueue, stolen) != NOTHING_TO_STEAL) {
stolen.addAll(myQueue.drain().map { task(it) })
}
}
Expand Down
4 changes: 2 additions & 2 deletions kotlinx-coroutines-core/jvm/test/scheduling/WorkQueueTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,10 @@ class WorkQueueTest : TestBase() {
timeSource.step(3)

val stealer = WorkQueue()
require(stealer.trySteal(victim, globalQueue))
assertEquals(TASK_STOLEN, stealer.trySteal(victim, globalQueue))
assertEquals(arrayListOf(1L), stealer.drain())

require(stealer.trySteal(victim, globalQueue))
assertEquals(TASK_STOLEN, stealer.trySteal(victim, globalQueue))
assertEquals(arrayListOf(2L), stealer.drain())
}
}
Expand Down

0 comments on commit f27d176

Please sign in to comment.