Skip to content

Commit

Permalink
Use fast path in CompletionStage.await() and make it cancellable;
Browse files Browse the repository at this point in the history
Make CompletionStage.asDeferred cancel original future on deferred
cancellation if possible.
  • Loading branch information
elizarov committed Mar 1, 2018
1 parent 19c1f2e commit eb4f9be
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 40 deletions.
8 changes: 4 additions & 4 deletions integration/kotlinx-coroutines-jdk8/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ Extension functions:

| **Name** | **Description**
| -------- | ---------------
| [CompletionStage.await][java.util.concurrent.CompletionStage.await] | Awaits for completion of the completion stage (non-cancellable)
| [CompletableFuture.await][java.util.concurrent.CompletableFuture.await] | Awaits for completion of the future (cancellable)
| [CompletionStage.await][java.util.concurrent.CompletionStage.await] | Awaits for completion of the completion stage
| [CompletionStage.asDeferred][java.util.concurrent.CompletionStage.asDeferred] | Converts completion stage to an instance of [Deferred]
| [Deferred.asCompletableFuture][kotlinx.coroutines.experimental.Deferred.asCompletableFuture] | Converts a deferred value to the future

## Example
Expand Down Expand Up @@ -52,11 +52,11 @@ Integration with JDK8 [`CompletableFuture`][java.util.concurrent.CompletableFutu
<!--- MODULE kotlinx-coroutines-core -->
<!--- INDEX kotlinx.coroutines.experimental -->
[CoroutineScope]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-coroutine-scope/index.html
[Deferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines.experimental/-deferred/index.html
<!--- MODULE kotlinx-coroutines-jdk8 -->
<!--- INDEX kotlinx.coroutines.experimental.future -->
[java.util.concurrent.CompletableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.experimental.future/java.util.concurrent.-completable-future/index.html
[future]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.experimental.future/future.html
[java.util.concurrent.CompletionStage.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.experimental.future/java.util.concurrent.-completion-stage/await.html
[java.util.concurrent.CompletableFuture.await]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.experimental.future/java.util.concurrent.-completable-future/await.html
[java.util.concurrent.CompletionStage.asDeferred]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.experimental.future/java.util.concurrent.-completion-stage/as-deferred.html
[kotlinx.coroutines.experimental.Deferred.asCompletableFuture]: https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-jdk8/kotlinx.coroutines.experimental.future/kotlinx.coroutines.experimental.-deferred/as-completable-future.html
<!--- END -->
Original file line number Diff line number Diff line change
Expand Up @@ -106,17 +106,9 @@ public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
}

/**
* Awaits for completion of the completion stage without blocking a thread.
*
* This suspending function is not cancellable, because there is no way to cancel a `CompletionStage`.
* Use `CompletableFuture.await()` for cancellable wait.
*/
public suspend fun <T> CompletionStage<T>.await(): T = suspendCoroutine { cont: Continuation<T> ->
whenComplete(ContinuationConsumer(cont))
}

/**
* Converts this future to an instance of [Deferred].
* Converts this completion stage to an instance of [Deferred].
* When this completion stage is an instance of [Future], then it is cancelled when
* the resulting deferred is cancelled.
*/
public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
// Fast path if already completed
Expand All @@ -138,26 +130,37 @@ public fun <T> CompletionStage<T>.asDeferred(): Deferred<T> {
result.completeExceptionally(exception)
}
}
if (this is Future<*>) result.cancelFutureOnCompletion(this)
return result
}

/**
* Awaits for completion of the future without blocking a thread.
*
* @suppress **Deprecated**: For binary compatibility only
*/
@Deprecated("For binary compatibility only", level = DeprecationLevel.HIDDEN)
public suspend fun <T> CompletableFuture<T>.await(): T =
(this as CompletionStage<T>).await()

/**
* Awaits for completion of the completion stage without blocking a thread.
*
* This suspending function is cancellable.
* If the [Job] of the current coroutine is cancelled or completed while this suspending function is waiting, this function
* stops waiting for the future and immediately resumes with [CancellationException].
* stops waiting for the completion stage and immediately resumes with [CancellationException].
*
* Note, that `CompletableFuture` does not support prompt removal of installed listeners, so on cancellation of this wait
* a few small objects will remain in the `CompletableFuture` stack of completion actions until the future completes.
* Note, that `CompletionStage` implementation does not support prompt removal of installed listeners, so on cancellation of this wait
* a few small objects will remain in the `CompletionStage` stack of completion actions until it completes itself.
* However, the care is taken to clear the reference to the waiting coroutine itself, so that its memory can be
* released even if the future never completes.
* released even if the completion stage never completes.
*/
public suspend fun <T> CompletableFuture<T>.await(): T {
public suspend fun <T> CompletionStage<T>.await(): T {
// fast path when CompletableFuture is already done (does not suspend)
if (isDone) {
if (this is Future<*> && isDone()) {
try {
return get()
@Suppress("UNCHECKED")
return get() as T
} catch (e: ExecutionException) {
throw e.cause ?: e // unwrap original cause from ExecutionException
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,24 +212,6 @@ class FutureTest : TestBase() {
finish(6)
}

@Test
fun testNonCancellableAwaitCompletionStage() = runBlocking {
expect(1)
val completable = CompletableFuture<String>()
val toAwait: CompletionStage<String> = completable
val job = launch(coroutineContext, CoroutineStart.UNDISPATCHED) {
expect(2)
assertThat(toAwait.await(), IsEqual("OK")) // suspends
expect(5)
}
expect(3)
job.cancel() // cancel the job
completable.complete("OK") // ok, because await on completion stage is not cancellable
expect(4) // job processing of was scheduled, not executed yet
yield() // yield main thread to job
finish(6)
}

@Test
fun testContinuationWrapped() {
val depth = AtomicInteger()
Expand Down

0 comments on commit eb4f9be

Please sign in to comment.