Skip to content

Commit

Permalink
Introduce EXACTLY_ONCE contracts to coroutineScope, supervisorScope, …
Browse files Browse the repository at this point in the history
…withContext, runBlocking, withTimeout and select (Kotlin#2030)
  • Loading branch information
qwwdfsad authored May 15, 2020
1 parent e470df9 commit 397f10e
Show file tree
Hide file tree
Showing 9 changed files with 138 additions and 32 deletions.
11 changes: 9 additions & 2 deletions integration/kotlinx-coroutines-jdk8/src/time/Time.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,16 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines.time

import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.selects.*
import java.time.*
import java.time.temporal.*
import kotlin.contracts.*

/**
* "java.time" adapter method for [kotlinx.coroutines.delay].
Expand Down Expand Up @@ -35,8 +38,12 @@ public fun <R> SelectBuilder<R>.onTimeout(duration: Duration, block: suspend ()
/**
* "java.time" adapter method for [kotlinx.coroutines.withTimeout].
*/
public suspend fun <T> withTimeout(duration: Duration, block: suspend CoroutineScope.() -> T): T =
kotlinx.coroutines.withTimeout(duration.coerceToMillis(), block)
public suspend fun <T> withTimeout(duration: Duration, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return kotlinx.coroutines.withTimeout(duration.coerceToMillis(), block)
}

/**
* "java.time" adapter method for [kotlinx.coroutines.withTimeoutOrNull].
Expand Down
49 changes: 28 additions & 21 deletions kotlinx-coroutines-core/common/src/Builders.common.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@

@file:JvmMultifileClass
@file:JvmName("BuildersKt")
@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines

import kotlinx.atomicfu.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
Expand Down Expand Up @@ -134,31 +136,36 @@ private class LazyDeferredCoroutine<T>(
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T = suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// always check for cancellation of new context
newContext.checkCompletion()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return suspendCoroutineUninterceptedOrReturn sc@ { uCont ->
// compute new context
val oldContext = uCont.context
val newContext = oldContext + context
// always check for cancellation of new context
newContext.checkCompletion()
// FAST PATH #1 -- new context is the same as the old one
if (newContext === oldContext) {
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
// FAST PATH #2 -- the new dispatcher is the same as the old one (something else changed)
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) {
val coroutine = UndispatchedCoroutine(newContext, uCont)
// There are changes in the context, so this thread needs to be updated
withCoroutineContext(newContext, null) {
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
}
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}
// SLOW PATH -- use new dispatcher
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
}

/**
Expand Down
10 changes: 8 additions & 2 deletions kotlinx-coroutines-core/common/src/CoroutineScope.kt
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*

Expand Down Expand Up @@ -183,11 +185,15 @@ public object GlobalScope : CoroutineScope {
* or may throw a corresponding unhandled [Throwable] if there is any unhandled exception in this scope
* (for example, from a crashed coroutine that was started with [launch][CoroutineScope.launch] in this scope).
*/
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R =
suspendCoroutineUninterceptedOrReturn { uCont ->
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = ScopeCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
}

/**
* Creates a [CoroutineScope] that wraps the given coroutine [context].
Expand Down
11 changes: 8 additions & 3 deletions kotlinx-coroutines-core/common/src/Supervisor.kt
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

@file:OptIn(ExperimentalContracts::class)
@file:Suppress("DEPRECATION_ERROR")

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
Expand Down Expand Up @@ -47,11 +48,15 @@ public fun SupervisorJob0(parent: Job? = null) : Job = SupervisorJob(parent)
* A failure of the scope itself (exception thrown in the [block] or cancellation) fails the scope with all its children,
* but does not cancel parent job.
*/
public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R =
suspendCoroutineUninterceptedOrReturn { uCont ->
public suspend fun <R> supervisorScope(block: suspend CoroutineScope.() -> R): R {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = SupervisorCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
}

private class SupervisorJobImpl(parent: Job?) : JobImpl(parent) {
override fun childCancelled(cause: Throwable): Boolean = false
Expand Down
13 changes: 11 additions & 2 deletions kotlinx-coroutines-core/common/src/Timeout.kt
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines

import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.selects.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
Expand All @@ -27,6 +29,9 @@ import kotlin.time.*
* @param timeMillis timeout time in milliseconds.
*/
public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
if (timeMillis <= 0L) throw TimeoutCancellationException("Timed out immediately")
return suspendCoroutineUninterceptedOrReturn { uCont ->
setupTimeout(TimeoutCoroutine(timeMillis, uCont), block)
Expand All @@ -46,8 +51,12 @@ public suspend fun <T> withTimeout(timeMillis: Long, block: suspend CoroutineSco
* Implementation note: how the time is tracked exactly is an implementation detail of the context's [CoroutineDispatcher].
*/
@ExperimentalTime
public suspend fun <T> withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T =
withTimeout(timeout.toDelayMillis(), block)
public suspend fun <T> withTimeout(timeout: Duration, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
return withTimeout(timeout.toDelayMillis(), block)
}

/**
* Runs a given suspending block of code inside a coroutine with a specified [timeout][timeMillis] and returns
Expand Down
10 changes: 8 additions & 2 deletions kotlinx-coroutines-core/common/src/selects/Select.kt
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/
@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines.selects

Expand All @@ -10,6 +11,7 @@ import kotlinx.coroutines.channels.*
import kotlinx.coroutines.internal.*
import kotlinx.coroutines.intrinsics.*
import kotlinx.coroutines.sync.*
import kotlin.contracts.*
import kotlin.coroutines.*
import kotlin.coroutines.intrinsics.*
import kotlin.jvm.*
Expand Down Expand Up @@ -199,8 +201,11 @@ public interface SelectInstance<in R> {
* Note that this function does not check for cancellation when it is not suspended.
* Use [yield] or [CoroutineScope.isActive] to periodically check for cancellation in tight loops if needed.
*/
public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R =
suspendCoroutineUninterceptedOrReturn { uCont ->
public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() -> Unit): R {
contract {
callsInPlace(builder, InvocationKind.EXACTLY_ONCE)
}
return suspendCoroutineUninterceptedOrReturn { uCont ->
val scope = SelectBuilderImpl(uCont)
try {
builder(scope)
Expand All @@ -209,6 +214,7 @@ public suspend inline fun <R> select(crossinline builder: SelectBuilder<R>.() ->
}
scope.getResult()
}
}


@SharedImmutable
Expand Down
52 changes: 52 additions & 0 deletions kotlinx-coroutines-core/common/test/BuilderContractsTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2016-2020 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines

import kotlinx.coroutines.selects.*
import kotlin.test.*

class BuilderContractsTest : TestBase() {

@Test
fun testContracts() = runTest {
// Coroutine scope
val cs: Int
coroutineScope {
cs = 42
}
consume(cs)

// Supervisor scope
val svs: Int
supervisorScope {
svs = 21
}
consume(svs)

// with context scope
val wctx: Int
withContext(Dispatchers.Unconfined) {
wctx = 239
}
consume(wctx)

val wt: Int
withTimeout(Long.MAX_VALUE) {
wt = 123
}
consume(wt)

val s: Int
select<Unit> {
s = 42
Job().apply { complete() }.onJoin {}
}
consume(s)
}

private fun consume(a: Int) {
a.hashCode() // BE codegen verification
}
}
5 changes: 5 additions & 0 deletions kotlinx-coroutines-core/jvm/src/Builders.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@

@file:JvmMultifileClass
@file:JvmName("BuildersKt")
@file:OptIn(ExperimentalContracts::class)

package kotlinx.coroutines

import java.util.concurrent.locks.*
import kotlin.contracts.*
import kotlin.coroutines.*

/**
Expand All @@ -34,6 +36,9 @@ import kotlin.coroutines.*
*/
@Throws(InterruptedException::class)
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
contract {
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
}
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
Expand Down
9 changes: 9 additions & 0 deletions kotlinx-coroutines-core/jvm/test/RunBlockingTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -162,4 +162,13 @@ class RunBlockingTest : TestBase() {

handle.dispose()
}

@Test
fun testContract() {
val rb: Int
runBlocking {
rb = 42
}
rb.hashCode() // unused
}
}

0 comments on commit 397f10e

Please sign in to comment.