Skip to content

Commit

Permalink
Support yield in immediate dispatchers
Browse files Browse the repository at this point in the history
So yield now checks for "Unconfined" dispatcher instead of
"isDispatchNeeded" and works properly for immediate dispatchers.

Fixes Kotlin#1474
  • Loading branch information
elizarov committed Nov 26, 2019
1 parent 7e895fc commit 3ab34d8
Show file tree
Hide file tree
Showing 7 changed files with 147 additions and 37 deletions.
3 changes: 2 additions & 1 deletion kotlinx-coroutines-core/common/src/Unconfined.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import kotlin.coroutines.*
*/
internal object Unconfined : CoroutineDispatcher() {
override fun isDispatchNeeded(context: CoroutineContext): Boolean = false
override fun dispatch(context: CoroutineContext, block: Runnable) { throw UnsupportedOperationException() }
// Just in case somebody wraps Unconfined dispatcher casing the "dispatch" to be called from "yield"
override fun dispatch(context: CoroutineContext, block: Runnable) = block.run()
override fun toString(): String = "Unconfined"
}
19 changes: 13 additions & 6 deletions kotlinx-coroutines-core/common/src/Yield.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,27 @@ import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

/**
* Yields the thread (or thread pool) of the current coroutine dispatcher to other coroutines to run.
* If the coroutine dispatcher does not have its own thread pool (like [Dispatchers.Unconfined]), this
* function does nothing but check if the coroutine's [Job] was completed.
* Yields the thread (or thread pool) of the current coroutine dispatcher to other coroutines to run if possible.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed when this suspending function is invoked or while
* this function is waiting for dispatch, it resumes with a [CancellationException].
*
* **Note**: This function always [checks for cancellation][ensureActive] even when it does not suspend.
*
* ### Implementation details
*
* If the coroutine dispatcher is [Unconfined][Dispatchers.Unconfined], this
* functions suspends only when there are other unconfined coroutines working and forming an event-loop.
* For other dispatchers, this function does not call [CoroutineDispatcher.isDispatchNeeded] and
* always suspends to be resumed later. If there is no [CoroutineDispatcher] in the context, it does not suspend.
*/
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
val context = uCont.context
context.checkCompletion()
val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
if (!cont.dispatcher.isDispatchNeeded(context)) {
return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
}
// Special case for the unconfined dispatcher that can yield only in existing unconfined loop
if (cont.dispatcher === Unconfined) return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
cont.dispatchYield(Unit)
COROUTINE_SUSPENDED
}
Expand Down
46 changes: 46 additions & 0 deletions kotlinx-coroutines-core/common/test/ImmediateYieldTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlin.coroutines.*
import kotlin.test.*

class ImmediateYieldTest : TestBase() {

// See https://github.com/Kotlin/kotlinx.coroutines/issues/1474
@Test
fun testImmediateYield() = runTest {
expect(1)
launch(ImmediateDispatcher(coroutineContext[ContinuationInterceptor])) {
expect(2)
yield()
expect(4)
}
expect(3) // after yield
yield() // yield back
finish(5)
}

// imitate immediate dispatcher
private class ImmediateDispatcher(job: ContinuationInterceptor?) : CoroutineDispatcher() {
val delegate: CoroutineDispatcher = job as CoroutineDispatcher

override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

override fun dispatch(context: CoroutineContext, block: Runnable) =
delegate.dispatch(context, block)
}

@Test
fun testWrappedUnconfinedDispatcherYield() = runTest {
expect(1)
launch(wrapperDispatcher(Dispatchers.Unconfined)) {
expect(2)
yield() // Would not work with wrapped unconfined dispatcher
expect(3)
}
finish(4) // after launch
}
}
14 changes: 14 additions & 0 deletions ui/kotlinx-coroutines-android/test/HandlerDispatcherTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -141,4 +141,18 @@ class HandlerDispatcherTest : TestBase() {
// TODO compile against API 22+ so this can be invoked without reflection.
private val Message.isAsynchronous: Boolean
get() = Message::class.java.getDeclaredMethod("isAsynchronous").invoke(this) as Boolean

@Test
fun testImmediateDispatcherYield() = runBlocking(Dispatchers.Main) {
expect(1)
// launch in the immediate dispatcher
launch(Dispatchers.Main.immediate) {
expect(2)
yield()
expect(4)
}
expect(3) // after yield
yield() // yield back
finish(5)
}
}
65 changes: 35 additions & 30 deletions ui/kotlinx-coroutines-javafx/src/JavaFxDispatcher.kt
Original file line number Diff line number Diff line change
Expand Up @@ -115,36 +115,41 @@ private class PulseTimer : AnimationTimer() {
}
}

internal fun initPlatform(): Boolean {
/*
* Try to instantiate JavaFx platform in a way which works
* both on Java 8 and Java 11 and does not produce "illegal reflective access":
*
* 1) Try to invoke javafx.application.Platform.startup if this class is
* present in a classpath.
* 2) If it is not successful and does not because it is already started,
* fallback to PlatformImpl.
*
* Ignore exception anyway in case of unexpected changes in API, in that case
* user will have to instantiate it manually.
*/
val runnable = Runnable {}
return runCatching {
// Invoke public API if it is present
Class.forName("javafx.application.Platform")
.getMethod("startup", java.lang.Runnable::class.java)
.invoke(null, runnable)
}.recoverCatching { exception ->
// Recover -> check re-initialization
val cause = exception.cause
if (exception is InvocationTargetException && cause is IllegalStateException
&& "Toolkit already initialized" == cause.message) {
// Toolkit is already initialized -> success, return
Unit
} else { // Fallback to Java 8 API
Class.forName("com.sun.javafx.application.PlatformImpl")
internal fun initPlatform(): Boolean = PlatformInitializer.success

// Lazily try to initialize JavaFx platform just once
private object PlatformInitializer {
val success = run {
/*
* Try to instantiate JavaFx platform in a way which works
* both on Java 8 and Java 11 and does not produce "illegal reflective access":
*
* 1) Try to invoke javafx.application.Platform.startup if this class is
* present in a classpath.
* 2) If it is not successful and does not because it is already started,
* fallback to PlatformImpl.
*
* Ignore exception anyway in case of unexpected changes in API, in that case
* user will have to instantiate it manually.
*/
val runnable = Runnable {}
runCatching {
// Invoke public API if it is present
Class.forName("javafx.application.Platform")
.getMethod("startup", java.lang.Runnable::class.java)
.invoke(null, runnable)
}
}.isSuccess
}.recoverCatching { exception ->
// Recover -> check re-initialization
val cause = exception.cause
if (exception is InvocationTargetException && cause is IllegalStateException
&& "Toolkit already initialized" == cause.message) {
// Toolkit is already initialized -> success, return
Unit
} else { // Fallback to Java 8 API
Class.forName("com.sun.javafx.application.PlatformImpl")
.getMethod("startup", java.lang.Runnable::class.java)
.invoke(null, runnable)
}
}.isSuccess
}
}
22 changes: 22 additions & 0 deletions ui/kotlinx-coroutines-javafx/test/JavaFxTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -34,4 +34,26 @@ class JavaFxTest : TestBase() {
finish(4)
}
}

@Test
fun testImmediateDispatcherYield() {
if (!initPlatform()) {
println("Skipping JavaFxTest in headless environment")
return // ignore test in headless environments
}

runBlocking(Dispatchers.JavaFx) {
expect(1)
check(Platform.isFxApplicationThread())
// launch in the immediate dispatcher
launch(Dispatchers.JavaFx.immediate) {
expect(2)
yield()
expect(4)
}
expect(3) // after yield
yield() // yield back
finish(5)
}
}
}
15 changes: 15 additions & 0 deletions ui/kotlinx-coroutines-swing/test/SwingTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package kotlinx.coroutines.swing

import javafx.application.*
import kotlinx.coroutines.*
import org.junit.*
import org.junit.Test
Expand Down Expand Up @@ -83,4 +84,18 @@ class SwingTest : TestBase() {
private suspend fun join(component: SwingTest.SwingComponent) {
component.coroutineContext[Job]!!.join()
}

@Test
fun testImmediateDispatcherYield() = runBlocking(Dispatchers.Swing) {
expect(1)
// launch in the immediate dispatcher
launch(Dispatchers.Swing.immediate) {
expect(2)
yield()
expect(4)
}
expect(3) // after yield
yield() // yield back
finish(5)
}
}

0 comments on commit 3ab34d8

Please sign in to comment.