Skip to content

Commit

Permalink
Optional parent job parameter for coroutine builders
Browse files Browse the repository at this point in the history
  • Loading branch information
elizarov committed Nov 30, 2017
1 parent e19ee04 commit e8f694e
Show file tree
Hide file tree
Showing 22 changed files with 252 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
* The parent job may be also explicitly specified using [parent] parameter.
*
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
*
* By default, the coroutine is immediately scheduled for execution.
Expand All @@ -44,32 +46,43 @@ import kotlin.coroutines.experimental.intrinsics.suspendCoroutineOrReturn
* the context of another coroutine, then any uncaught exception leads to the cancellation of parent coroutine.
*
* See [newCoroutineContext] for a description of debugging facilities that are available for newly created coroutine.
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
* @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
* @param block the coroutine code.
*/
public fun launch(
context: CoroutineContext = DefaultDispatcher,
start: CoroutineStart = CoroutineStart.DEFAULT,
parent: Job? = null,
block: suspend CoroutineScope.() -> Unit
): Job {
val newContext = newCoroutineContext(context)
val newContext = newCoroutineContext(context, parent)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
coroutine.initParentJob(context[Job])
coroutine.initParentJob(newContext[Job])
start(block, coroutine, coroutine)
return coroutine
}

/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun launch(
context: CoroutineContext = DefaultDispatcher,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job =
launch(context, start, block = block)

/**
* @suppress **Deprecated**: Use `start = CoroutineStart.XXX` parameter
*/
@Deprecated(message = "Use `start = CoroutineStart.XXX` parameter",
replaceWith = ReplaceWith("launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)"))
public fun launch(context: CoroutineContext, start: Boolean, block: suspend CoroutineScope.() -> Unit): Job =
launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)
launch(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block = block)

/**
* Calls the specified suspending block with a given coroutine context, suspends until it completes, and returns
Expand Down Expand Up @@ -147,7 +160,7 @@ public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, bl
val eventLoop = if (context[ContinuationInterceptor] == null) BlockingEventLoop(currentThread) else null
val newContext = newCoroutineContext(context + (eventLoop ?: EmptyCoroutineContext))
val coroutine = BlockingCoroutine<T>(newContext, currentThread, privateEventLoop = eventLoop != null)
coroutine.initParentJob(context[Job])
coroutine.initParentJob(newContext[Job])
block.startCoroutine(coroutine, coroutine)
return coroutine.joinBlocking()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,12 @@ public val DefaultDispatcher: CoroutineDispatcher = CommonPool
* Coroutine name can be explicitly assigned using [CoroutineName] context element.
* The string "coroutine" is used as a default name.
*/
public fun newCoroutineContext(context: CoroutineContext): CoroutineContext {
@JvmOverloads // for binary compatibility with newCoroutineContext(context: CoroutineContext) version
public fun newCoroutineContext(context: CoroutineContext, parent: Job? = null): CoroutineContext {
val debug = if (DEBUG) context + CoroutineId(COROUTINE_ID.incrementAndGet()) else context
val wp = if (parent == null) debug else debug + parent
return if (context !== DefaultDispatcher && context[ContinuationInterceptor] == null)
debug + DefaultDispatcher else debug
wp + DefaultDispatcher else wp
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ public interface Deferred<out T> : Job {
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
* The parent job may be also explicitly specified using [parent] parameter.
*
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
*
* By default, the coroutine is immediately scheduled for execution.
Expand All @@ -153,29 +155,40 @@ public interface Deferred<out T> : Job {
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
* @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
* @param block the coroutine code.
*/
public fun <T> async(
context: CoroutineContext = DefaultDispatcher,
start: CoroutineStart = CoroutineStart.DEFAULT,
parent: Job? = null,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val newContext = newCoroutineContext(context, parent)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.initParentJob(context[Job])
coroutine.initParentJob(newContext[Job])
start(block, coroutine, coroutine)
return coroutine
}

/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun <T> async(
context: CoroutineContext = DefaultDispatcher,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> =
async(context, start, block = block)

/**
* @suppress **Deprecated**: Use `start = CoroutineStart.XXX` parameter
*/
@Deprecated(message = "Use `start = CoroutineStart.XXX` parameter",
replaceWith = ReplaceWith("async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)"))
public fun <T> async(context: CoroutineContext, start: Boolean, block: suspend CoroutineScope.() -> T): Deferred<T> =
async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block)
async(context, if (start) CoroutineStart.DEFAULT else CoroutineStart.LAZY, block = block)

/**
* @suppress **Deprecated**: `defer` was renamed to `async`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ interface ActorJob<in E> : SendChannel<E> {
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
* The parent job may be also explicitly specified using [parent] parameter.
*
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
*
* By default, the coroutine is immediately scheduled for execution.
Expand All @@ -77,24 +79,36 @@ interface ActorJob<in E> : SendChannel<E> {
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param capacity capacity of the channel's buffer (no buffer by default).
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
* @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
* @param block the coroutine code.
*/
public fun <E> actor(
context: CoroutineContext = DefaultDispatcher,
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
parent: Job? = null,
block: suspend ActorScope<E>.() -> Unit
): SendChannel<E> {
val newContext = newCoroutineContext(context)
val newContext = newCoroutineContext(context, parent)
val channel = Channel<E>(capacity)
val coroutine = if (start.isLazy)
LazyActorCoroutine(newContext, channel, block) else
ActorCoroutine(newContext, channel, active = true)
coroutine.initParentJob(context[Job])
coroutine.initParentJob(newContext[Job])
start(block, coroutine, coroutine)
return coroutine
}

/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun <E> actor(
context: CoroutineContext = DefaultDispatcher,
capacity: Int = 0,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend ActorScope<E>.() -> Unit
): ActorJob<E> =
actor(context, capacity, start, block = block) as ActorJob<E>

private open class ActorCoroutine<E>(
parentContext: CoroutineContext,
channel: Channel<E>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,8 @@ public interface ProducerScope<in E> : CoroutineScope, SendChannel<E> {
* @suppress **Deprecated**: Use `ReceiveChannel`.
*/
@Deprecated(message = "Use `ReceiveChannel`", replaceWith = ReplaceWith("ReceiveChannel"))
interface ProducerJob<out E> : ReceiveChannel<E> {
@Suppress("MULTIPLE_DEFAULTS_INHERITED_FROM_SUPERTYPES_WHEN_NO_EXPLICIT_OVERRIDE")
interface ProducerJob<out E> : ReceiveChannel<E>, Job {
@Deprecated(message = "Use ReceiveChannel itself")
val channel: ReceiveChannel<E>
}
Expand All @@ -59,6 +60,8 @@ interface ProducerJob<out E> : ReceiveChannel<E> {
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
* The parent job may be also explicitly specified using [parent] parameter.
*
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
*
* Uncaught exceptions in this coroutine close the channel with this exception as a cause and
Expand All @@ -68,20 +71,32 @@ interface ProducerJob<out E> : ReceiveChannel<E> {
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param capacity capacity of the channel's buffer (no buffer by default).
* @param parent explicitly specifies the parent job, overrides job from the [context] (if any).*
* @param block the coroutine code.
*/
public fun <E> produce(
context: CoroutineContext = DefaultDispatcher,
capacity: Int = 0,
parent: Job? = null,
block: suspend ProducerScope<E>.() -> Unit
): ReceiveChannel<E> {
val channel = Channel<E>(capacity)
return ProducerCoroutine(newCoroutineContext(context), channel).apply {
initParentJob(context[Job])
block.startCoroutine(this, this)
}
val newContext = newCoroutineContext(context, parent)
val coroutine = ProducerCoroutine(newContext, channel)
coroutine.initParentJob(newContext[Job])
block.startCoroutine(coroutine, coroutine)
return coroutine
}

/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun <E> produce(
context: CoroutineContext = DefaultDispatcher,
capacity: Int = 0,
block: suspend ProducerScope<E>.() -> Unit
): ProducerJob<E> =
produce(context, capacity, block = block) as ProducerJob<E>

/**
* @suppress **Deprecated**: Renamed to `produce`.
*/
Expand All @@ -91,7 +106,7 @@ public fun <E> buildChannel(
capacity: Int = 0,
block: suspend ProducerScope<E>.() -> Unit
): ProducerJob<E> =
produce(context, capacity, block) as ProducerJob<E>
produce(context, capacity, block = block) as ProducerJob<E>

private class ProducerCoroutine<E>(parentContext: CoroutineContext, channel: Channel<E>) :
ChannelCoroutine<E>(parentContext, channel, active = true), ProducerScope<E>, ProducerJob<E>
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ fun main(args: Array<String>) = runBlocking<Unit> {
// now launch ten coroutines for a demo, each working for a different time
val coroutines = List(10) { i ->
// they are all children of our job object
launch(coroutineContext + job) { // we use the context of main runBlocking thread, but with our own job object
launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job
delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
println("Coroutine $i is done")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ interface ReaderScope : CoroutineScope {
fun reader(coroutineContext: CoroutineContext,
channel: ByteChannel,
block: suspend ReaderScope.() -> Unit): ReaderJob {
val coroutine = ReaderCoroutine(newCoroutineContext(coroutineContext), channel)
coroutine.initParentJob(coroutineContext[Job])
val newContext = newCoroutineContext(coroutineContext)
val coroutine = ReaderCoroutine(newContext, channel)
coroutine.initParentJob(newContext[Job])
block.startCoroutine(coroutine, coroutine)
return coroutine
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ interface WriterScope : CoroutineScope {
fun writer(coroutineContext: CoroutineContext,
channel: ByteChannel,
block: suspend WriterScope.() -> Unit): WriterJob {
val coroutine = WriterCoroutine(newCoroutineContext(coroutineContext), channel)
coroutine.initParentJob(coroutineContext[Job])
val newContext = newCoroutineContext(coroutineContext)
val coroutine = WriterCoroutine(newContext, channel)
coroutine.initParentJob(newContext[Job])
block.startCoroutine(coroutine, coroutine)
return coroutine
}
Expand Down
8 changes: 5 additions & 3 deletions coroutines-guide.md
Original file line number Diff line number Diff line change
Expand Up @@ -1182,8 +1182,10 @@ to avoid memory leaks.

We can manage a lifecycle of our coroutines by creating an instance of [Job] that is tied to
the lifecycle of our activity. A job instance is created using [Job()] factory function
as the following example shows. We need to make sure that all the coroutines are started
with this job in their context and then a single invocation of [Job.cancel] terminates them all.
as the following example shows. For convenience, rather than using `launch(coroutineContext + job)` expression,
we can write `launch(coroutineContext, parent = job)` to make explicit the fact that the parent job is being used.

Now, a single invocation of [Job.cancel] cancels all the children we've launched.
Moreover, [Job.join] waits for all of them to complete, so we can also use [cancelAndJoin] here in
this example:

Expand All @@ -1193,7 +1195,7 @@ fun main(args: Array<String>) = runBlocking<Unit> {
// now launch ten coroutines for a demo, each working for a different time
val coroutines = List(10) { i ->
// they are all children of our job object
launch(coroutineContext + job) { // we use the context of main runBlocking thread, but with our own job object
launch(coroutineContext, parent = job) { // we use the context of main runBlocking thread, but with our parent job
delay((i + 1) * 200L) // variable delay 200ms, 400ms, ... etc
println("Coroutine $i is done")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ import kotlin.coroutines.experimental.CoroutineContext
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
* The parent job may be also explicitly specified using [parent] parameter.
*
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
*
* By default, the coroutine is immediately scheduled for execution.
Expand All @@ -47,22 +49,33 @@ import kotlin.coroutines.experimental.CoroutineContext
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
* @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
* @param block the coroutine code.
*/
public fun <T> future(
context: CoroutineContext = DefaultDispatcher,
start: CoroutineStart = CoroutineStart.DEFAULT,
parent: Job? = null,
block: suspend CoroutineScope.() -> T
): ListenableFuture<T> {
require(!start.isLazy) { "$start start is not supported" }
val newContext = newCoroutineContext(context)
val newContext = newCoroutineContext(context, parent)
val job = Job(newContext[Job])
val future = ListenableFutureCoroutine<T>(newContext + job)
job.cancelFutureOnCompletion(future)
start(block, receiver=future, completion=future) // use the specified start strategy
return future
}

/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun <T> future(
context: CoroutineContext = DefaultDispatcher,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): ListenableFuture<T> =
future(context, start, block = block)

private class ListenableFutureCoroutine<T>(
override val context: CoroutineContext
) : AbstractFuture<T>(), Continuation<T>, CoroutineScope {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ import kotlin.coroutines.experimental.suspendCoroutine
* See [CoroutineDispatcher] for the standard context implementations that are provided by `kotlinx.coroutines`.
* The [context][CoroutineScope.context] of the parent coroutine from its [scope][CoroutineScope] may be used,
* in which case the [Job] of the resulting coroutine is a child of the job of the parent coroutine.
* The parent job may be also explicitly specified using [parent] parameter.
*
* If the context does not have any dispatcher nor any other [ContinuationInterceptor], then [DefaultDispatcher] is used.
*
* By default, the coroutine is immediately scheduled for execution.
Expand All @@ -48,15 +50,17 @@ import kotlin.coroutines.experimental.suspendCoroutine
*
* @param context context of the coroutine. The default value is [DefaultDispatcher].
* @param start coroutine start option. The default value is [CoroutineStart.DEFAULT].
* @param parent explicitly specifies the parent job, overrides job from the [context] (if any).
* @param block the coroutine code.
*/
public fun <T> future(
context: CoroutineContext = DefaultDispatcher,
start: CoroutineStart = CoroutineStart.DEFAULT,
parent: Job? = null,
block: suspend CoroutineScope.() -> T
): CompletableFuture<T> {
require(!start.isLazy) { "$start start is not supported" }
val newContext = newCoroutineContext(context)
val newContext = newCoroutineContext(context, parent)
val job = Job(newContext[Job])
val future = CompletableFutureCoroutine<T>(newContext + job)
job.cancelFutureOnCompletion(future)
Expand All @@ -65,6 +69,15 @@ public fun <T> future(
return future
}

/** @suppress **Deprecated**: Binary compatibility */
@Deprecated(message = "Binary compatibility", level = DeprecationLevel.HIDDEN)
public fun <T> future(
context: CoroutineContext = DefaultDispatcher,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): CompletableFuture<T> =
future(context, start, block = block)

private class CompletableFutureCoroutine<T>(
override val context: CoroutineContext
) : CompletableFuture<T>(), Continuation<T>, CoroutineScope {
Expand Down
Loading

0 comments on commit e8f694e

Please sign in to comment.