Skip to content

Commit

Permalink
Coroutines now wait for their children
Browse files Browse the repository at this point in the history
JobSuport.attachChild is introduced;
Job "Completing" state is introduced;
withTimeout is a proper coroutine;
Better diagnostics in cancellation and unexpected exception messages;
Fixed cancellable suspending function to throw CancellationException;
Job.getCompletionException renamed to Job.getCancellationException;
Introduced Deferred.getCompletionExceptionOrNull
Updated docs for Job & Deferred to explain parent/child;
Deprecate and hide legacy Job.invokeOnCompletion signatures;
Updated guide for parent-child relations and related stuff
  • Loading branch information
elizarov committed Sep 27, 2017
1 parent 339ccf3 commit 8b38fa2
Show file tree
Hide file tree
Showing 54 changed files with 1,341 additions and 539 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@ import kotlin.coroutines.experimental.Continuation
import kotlin.coroutines.experimental.CoroutineContext

/**
* Abstract class to simplify writing of coroutine completion objects that
* implement completion [Continuation], [Job], and [CoroutineScope] interfaces.
* It stores the result of continuation in the state of the job.
* Abstract class for coroutines.
*
* * Coroutines implement completion [Continuation], [Job], and [CoroutineScope] interfaces.
* * Coroutine stores the result of continuation in the state of the job.
* * Coroutine waits for children coroutines to finish before completing.
* * Coroutines are cancelled through an intermediate _cancelling_ state.
*
* @param active when `true` coroutine is created in _active_ state, when `false` in _new_ state. See [Job] for details.
* @suppress **This is unstable API and it is subject to change.**
Expand All @@ -35,35 +38,26 @@ public abstract class AbstractCoroutine<in T>(
public final override val context: CoroutineContext = parentContext + this
public final override val coroutineContext: CoroutineContext get() = context

// all coroutines are cancelled through an intermediate cancelling state
final override val hasCancellingState: Boolean get() = true

protected open val defaultResumeMode: Int get() = MODE_ATOMIC_DEFAULT

final override fun resume(value: T) {
loopOnState { state ->
when (state) {
is Incomplete -> if (updateState(state, value, MODE_ATOMIC_DEFAULT)) return
is Cancelled -> return // ignore resumes on cancelled continuation
else -> error("Already resumed, but got value $value")
}
}
makeCompleting(value, defaultResumeMode)
}

final override fun resumeWithException(exception: Throwable) {
loopOnState { state ->
when (state) {
is Incomplete -> {
if (updateState(state, CompletedExceptionally(exception), MODE_ATOMIC_DEFAULT)) return
}
is Cancelled -> {
// ignore resumes on cancelled continuation, but handle exception if a different one is here
if (exception !== state.exception) handleCoroutineException(context, exception)
return
}
else -> throw IllegalStateException("Already resumed, but got exception $exception", exception)
}
}
makeCompleting(CompletedExceptionally(exception), defaultResumeMode)
}

final override fun handleException(exception: Throwable) {
handleCoroutineException(parentContext, exception)
}
}

override fun nameString(): String {
val coroutineName = context.coroutineName ?: return super.nameString()
return "\"$coroutineName\":${super.nameString()}"
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -141,11 +141,10 @@ public suspend fun <T> run(context: CoroutineContext, block: suspend () -> T): T
@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
val currentThread = Thread.currentThread()
val eventLoop = if (context[ContinuationInterceptor] == null) EventLoopImpl(currentThread) else null
val eventLoop = if (context[ContinuationInterceptor] == null) BlockingEventLoop(currentThread) else null
val newContext = newCoroutineContext(context + (eventLoop ?: EmptyCoroutineContext))
val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop = eventLoop != null)
coroutine.initParentJob(context[Job])
eventLoop?.initParentJob(coroutine)
block.startCoroutine(coroutine, coroutine)
return coroutine.joinBlocking()
}
Expand All @@ -156,9 +155,9 @@ private open class StandaloneCoroutine(
private val parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) {
override fun afterCompletion(state: Any?, mode: Int) {
override fun onCancellation(exceptionally: CompletedExceptionally?) {
// note the use of the parent's job context below!
if (state is CompletedExceptionally) handleCoroutineException(parentContext, state.exception)
if (exceptionally != null) handleCoroutineException(parentContext, exceptionally.exception)
}
}

Expand Down Expand Up @@ -209,10 +208,14 @@ private class BlockingCoroutine<T>(
private val eventLoop: EventLoop? = parentContext[ContinuationInterceptor] as? EventLoop

init {
if (privateEventLoop) require(eventLoop is EventLoopImpl)
if (privateEventLoop) require(eventLoop is BlockingEventLoop)
}

override fun afterCompletion(state: Any?, mode: Int) {
// signal termination to event loop (don't accept more tasks)
if (privateEventLoop)
(eventLoop as BlockingEventLoop).isCompleted = true
// wake up blocked thread
if (Thread.currentThread() != blockedThread)
LockSupport.unpark(blockedThread)
}
Expand All @@ -228,11 +231,12 @@ private class BlockingCoroutine<T>(
timeSource.parkNanos(this, parkNanos)
}
// process queued events (that could have been added after last processNextEvent and before cancel
if (privateEventLoop) (eventLoop as EventLoopImpl).shutdown()
if (privateEventLoop) (eventLoop as BlockingEventLoop).shutdown()
timeSource.unregisterTimeLoopThread()
// now return result
val state = this.state
(state as? CompletedExceptionally)?.let { throw it.exception }
return state as T
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ public inline suspend fun <T> suspendAtomicCancellableCoroutine(
* @suppress **This is unstable API and it is subject to change.**
*/
public fun CancellableContinuation<*>.removeOnCancel(node: LockFreeLinkedListNode): DisposableHandle =
invokeOnCompletion(RemoveOnCancel(this, node))
invokeOnCompletion(handler = RemoveOnCancel(this, node))

// --------------- implementation details ---------------

Expand Down Expand Up @@ -245,7 +245,7 @@ internal class CancellableContinuationImpl<in T>(
}

override fun completeResume(token: Any) {
completeUpdateState(token, state, resumeMode)
completeUpdateState(token as Incomplete, state, resumeMode)
}

override fun CoroutineDispatcher.resumeUndispatched(value: T) {
Expand All @@ -258,7 +258,8 @@ internal class CancellableContinuationImpl<in T>(
resumeWithExceptionImpl(exception, if (dc.dispatcher === this) MODE_UNDISPATCHED else resumeMode)
}

override fun toString(): String = super.toString() + "[${delegate.toDebugString()}]"
override fun nameString(): String =
"CancellableContinuation(${delegate.toDebugString()})"
}

private class CompletedIdempotentResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,33 +89,40 @@ public fun newCoroutineContext(context: CoroutineContext): CoroutineContext =
* Executes a block using a given coroutine context.
*/
internal inline fun <T> withCoroutineContext(context: CoroutineContext, block: () -> T): T {
val oldName = updateContext(context)
val oldName = context.updateThreadContext()
try {
return block()
} finally {
restoreContext(oldName)
restoreThreadContext(oldName)
}
}

@PublishedApi
internal fun updateContext(context: CoroutineContext): String? {
internal fun CoroutineContext.updateThreadContext(): String? {
if (!DEBUG) return null
val newId = context[CoroutineId] ?: return null
val coroutineId = this[CoroutineId] ?: return null
val coroutineName = this[CoroutineName]?.name ?: "coroutine"
val currentThread = Thread.currentThread()
val oldName = currentThread.name
val coroutineName = context[CoroutineName]?.name ?: "coroutine"
currentThread.name = buildString(oldName.length + coroutineName.length + 10) {
append(oldName)
append(" @")
append(coroutineName)
append('#')
append(newId.id)
append(coroutineId.id)
}
return oldName
}

internal val CoroutineContext.coroutineName: String? get() {
if (!DEBUG) return null
val coroutineId = this[CoroutineId] ?: return null
val coroutineName = this[CoroutineName]?.name ?: "coroutine"
return "$coroutineName#${coroutineId.id}"
}

@PublishedApi
internal fun restoreContext(oldName: String?) {
internal fun restoreThreadContext(oldName: String?) {
if (oldName != null) Thread.currentThread().name = oldName
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ internal class DispatchTask<in T>(
val job = if (cancellable) context[Job] else null
withCoroutineContext(context) {
when {
job != null && !job.isActive -> continuation.resumeWithException(job.getCompletionException())
job != null && !job.isActive -> continuation.resumeWithException(job.getCancellationException())
exception -> continuation.resumeWithException(value as Throwable)
else -> continuation.resume(value as T)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,14 @@ import kotlin.coroutines.experimental.CoroutineContext

/**
* Helper function for coroutine builder implementations to handle uncaught exception in coroutines.
*
* It tries to handle uncaught exception in the following way:
* * If there is [CoroutineExceptionHandler] in the context, then it is used.
* * Otherwise, if exception is [CancellationException] then it is ignored
* (because that is the supposed mechanism to cancel the running coroutine)
* * Otherwise, if there is a [Job] in the context, then [Job.cancel] is invoked and if it
* returns `true` (it was still active), then the exception is considered to be handled.
* * Otherwise, current thread's [Thread.uncaughtExceptionHandler] is used.
* * Otherwise:
* * if there is a [Job] in the context, then [Job.cancel] is invoked;
* * and current thread's [Thread.uncaughtExceptionHandler] is invoked.
*/
fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
context[CoroutineExceptionHandler]?.let {
Expand All @@ -37,15 +38,23 @@ fun handleCoroutineException(context: CoroutineContext, exception: Throwable) {
}
// ignore CancellationException (they are normal means to terminate a coroutine)
if (exception is CancellationException) return
// quit if successfully pushed exception as cancellation reason
if (context[Job]?.cancel(exception) ?: false) return
// otherwise just use thread's handler
// try cancel job in the context
context[Job]?.cancel(exception)
// use thread's handler
val currentThread = Thread.currentThread()
currentThread.uncaughtExceptionHandler.uncaughtException(currentThread, exception)
}

/**
* An optional element on the coroutine context to handle uncaught exceptions.
*
* By default, when no handler is installed, uncaught exception are handled in the following way:
* * If exception is [CancellationException] then it is ignored
* (because that is the supposed mechanism to cancel the running coroutine)
* * Otherwise:
* * if there is a [Job] in the context, then [Job.cancel] is invoked;
* * and current thread's [Thread.uncaughtExceptionHandler] is invoked.
*
* See [handleCoroutineException].
*/
public interface CoroutineExceptionHandler : CoroutineContext.Element {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import kotlin.coroutines.experimental.CoroutineContext
* | --------------------------------------- | ---------- | ------------- | -------------------------- | ------------- |
* | _New_ (optional initial state) | `false` | `false` | `false` | `false` |
* | _Active_ (default initial state) | `true` | `false` | `false` | `false` |
* | _Completing_ (optional transient state) | `true` | `false` | `false` | `false` |
* | _Cancelling_ (optional transient state) | `false` | `false` | `false` | `true` |
* | _Cancelled_ (final state) | `false` | `true` | `true` | `true` |
* | _Resolved_ (final state) | `false` | `true` | `false` | `false` |
Expand All @@ -46,18 +47,19 @@ import kotlin.coroutines.experimental.CoroutineContext
* _cancelling_ state immediately. A simple implementation of deferred -- [CompletableDeferred],
* that is not backed by a coroutine, does not have a _cancelling_ state, but becomes _cancelled_
* on [cancel] immediately. Coroutines, on the other hand, become _cancelled_ only when they finish
* executing their code.
* executing their code and after all their [children][attachChild] complete.
*
* ```
* +-----+ start +--------+ complete +-----------+
* | New | ---------------> | Active | ---------+-> | Resolved |
* +-----+ +--------+ | |(completed)|
* | | | +-----------+
* | cancel | cancel |
* V V | +-----------+
* +-----------+ finish +------------+ +-> | Failed |
* | Cancelled | <--------- | Cancelling | |(completed)|
* |(completed)| +------------+ +-----------+
* wait children
* +-----+ start +--------+ complete +-------------+ finish +-----------+
* | New | ---------------> | Active | ----------> | Completing | ---+-> | Resolved |
* +-----+ +--------+ +-------------+ | |(completed)|
* | | | | +-----------+
* | cancel | cancel | cancel |
* V V | | +-----------+
* +-----------+ finish +------------+ | +-> | Failed |
* | Cancelled | <--------- | Cancelling | <---------------+ |(completed)|
* |(completed)| +------------+ +-----------+
* +-----------+
* ```
*
Expand All @@ -68,7 +70,9 @@ import kotlin.coroutines.experimental.CoroutineContext
* or the cancellation cause inside the coroutine.
*
* A deferred value can have a _parent_ job. A deferred value with a parent is cancelled when its parent is
* cancelled or completes.
* cancelled or completes. Parent waits for all its [children][attachChild] to complete in _completing_ or
* _cancelling_ state. _Completing_ state is purely internal. For an outside observer a _completing_
* deferred is still active, while internally it is waiting for its children.
*
* All functions on this interface and on all interfaces derived from it are **thread-safe** and can
* be safely invoked from concurrent coroutines without external synchronization.
Expand Down Expand Up @@ -108,10 +112,20 @@ public interface Deferred<out T> : Job {
* [completed exceptionally][isCompletedExceptionally].
*
* This function is designed to be used from [invokeOnCompletion] handlers, when there is an absolute certainty that
* the value is already complete.
* the value is already complete. See also [getCompletionExceptionOrNull].
*/
public fun getCompleted(): T

/**
* Returns *completion exception* result if this deferred [completed exceptionally][isCompletedExceptionally],
* `null` if it is completed normally, or throws [IllegalStateException] if this deferred value has not
* [completed][isCompleted] yet.
*
* This function is designed to be used from [invokeOnCompletion] handlers, when there is an absolute certainty that
* the value is already complete. See also [getCompleted].
*/
public fun getCompletionExceptionOrNull(): Throwable?

/**
* @suppress **Deprecated**: Use `isActive`.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,20 +247,11 @@ internal abstract class EventLoopBase: CoroutineDispatcher(), Delay, EventLoop {
}
}

internal class EventLoopImpl(
internal abstract class ThreadEventLoop(
private val thread: Thread
) : EventLoopBase() {
private var parentJob: Job? = null

override val canComplete: Boolean get() = parentJob != null
override val isCompleted: Boolean get() = parentJob?.isCompleted == true
override fun isCorrectThread(): Boolean = Thread.currentThread() === thread

fun initParentJob(coroutine: Job) {
require(this.parentJob == null)
this.parentJob = coroutine
}

override fun unpark() {
if (Thread.currentThread() !== thread)
timeSource.unpark(thread)
Expand All @@ -274,5 +265,23 @@ internal class EventLoopImpl(
// reschedule the rest of delayed tasks
rescheduleAllDelayed()
}

}

private class EventLoopImpl(thread: Thread) : ThreadEventLoop(thread) {
private var parentJob: Job? = null

override val canComplete: Boolean get() = parentJob != null
override val isCompleted: Boolean get() = parentJob?.isCompleted == true

fun initParentJob(parentJob: Job) {
require(this.parentJob == null)
this.parentJob = parentJob
}
}

internal class BlockingEventLoop(thread: Thread) : ThreadEventLoop(thread) {
override val canComplete: Boolean get() = true
@Volatile
public override var isCompleted: Boolean = false
}
Loading

0 comments on commit 8b38fa2

Please sign in to comment.