Skip to content

Commit

Permalink
Job.asCompletableFuture (Kotlin#1113)
Browse files Browse the repository at this point in the history
Job.asCompletableFuture

Fixes Kotlin#1104
  • Loading branch information
qwwdfsad committed Apr 23, 2019
1 parent f1710a7 commit fcfabee
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 46 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
public final class kotlinx/coroutines/future/FutureKt {
public static final fun asCompletableFuture (Lkotlinx/coroutines/Deferred;)Ljava/util/concurrent/CompletableFuture;
public static final fun asCompletableFuture (Lkotlinx/coroutines/Job;)Ljava/util/concurrent/CompletableFuture;
public static final fun asDeferred (Ljava/util/concurrent/CompletionStage;)Lkotlinx/coroutines/Deferred;
public static final fun await (Ljava/util/concurrent/CompletionStage;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun future (Lkotlinx/coroutines/CoroutineScope;Lkotlin/coroutines/CoroutineContext;Lkotlinx/coroutines/CoroutineStart;Lkotlin/jvm/functions/Function2;)Ljava/util/concurrent/CompletableFuture;
Expand Down
28 changes: 23 additions & 5 deletions integration/kotlinx-coroutines-jdk8/src/future/Future.kt
Original file line number Diff line number Diff line change
Expand Up @@ -71,11 +71,7 @@ private class CompletableFutureCoroutine<T>(
*/
public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
val future = CompletableFuture<T>()
future.whenComplete { _, exception ->
cancel(exception?.let {
it as? CancellationException ?: CancellationException("CompletableFuture was completed exceptionally", it)
})
}
setupCancellation(future)
invokeOnCompletion {
try {
future.complete(getCompleted())
Expand All @@ -86,6 +82,28 @@ public fun <T> Deferred<T>.asCompletableFuture(): CompletableFuture<T> {
return future
}

/**
* Converts this job to the instance of [CompletableFuture].
* The job is cancelled when the resulting future is cancelled or otherwise completed.
*/
public fun Job.asCompletableFuture(): CompletableFuture<Unit> {
val future = CompletableFuture<Unit>()
setupCancellation(future)
invokeOnCompletion { cause ->
if (cause === null) future.complete(Unit)
else future.completeExceptionally(cause)
}
return future
}

private fun Job.setupCancellation(future: CompletableFuture<*>) {
future.whenComplete { _, exception ->
cancel(exception?.let {
it as? CancellationException ?: CancellationException("CompletableFuture was completed exceptionally", it)
})
}
}

/**
* Converts this completion stage to an instance of [Deferred].
* When this completion stage is an instance of [Future], then it is cancelled when
Expand Down
124 changes: 124 additions & 0 deletions integration/kotlinx-coroutines-jdk8/test/future/AsFutureTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
/*
* Copyright 2016-2019 JetBrains s.r.o. Use of this source code is governed by the Apache 2.0 license.
*/

package kotlinx.coroutines.future

import kotlinx.coroutines.*
import org.junit.*
import org.junit.Assert.*
import java.util.concurrent.*
import java.util.concurrent.CancellationException

class AsFutureTest : TestBase() {

@Test
fun testCompletedDeferredAsCompletableFuture() = runTest {
expect(1)
val deferred = async(start = CoroutineStart.UNDISPATCHED) {
expect(2) // completed right away
"OK"
}
expect(3)
val future = deferred.asCompletableFuture()
assertEquals("OK", future.await())
finish(4)
}

@Test
fun testCompletedJobAsCompletableFuture() = runTest {
val job = Job().apply { complete() }
val future = job.asCompletableFuture()
assertEquals(Unit, future.await())
}

@Test
fun testWaitForDeferredAsCompletableFuture() = runTest {
expect(1)
val deferred = async {
expect(3) // will complete later
"OK"
}
expect(2)
val future = deferred.asCompletableFuture()
assertEquals("OK", future.await()) // await yields main thread to deferred coroutine
finish(4)
}

@Test
fun testWaitForJobAsCompletableFuture() = runTest {
val job = Job()
val future = job.asCompletableFuture()
assertTrue(job.isActive)
job.complete()
assertFalse(job.isActive)
assertEquals(Unit, future.await())
}

@Test
fun testAsCompletableFutureThrowable() {
val deferred = GlobalScope.async<Unit> { throw OutOfMemoryError() }
val future = deferred.asCompletableFuture()
try {
expect(1)
future.get()
expectUnreached()
} catch (e: ExecutionException) {
assertTrue(future.isCompletedExceptionally)
assertTrue(e.cause is OutOfMemoryError)
finish(2)
}
}

@Test
fun testJobAsCompletableFutureThrowable() {
val job = Job()
CompletableDeferred<Unit>(parent = job).apply { completeExceptionally(OutOfMemoryError()) }
val future = job.asCompletableFuture()
try {
expect(1)
future.get()
expectUnreached()
} catch (e: ExecutionException) {
assertTrue(future.isCompletedExceptionally)
assertTrue(e.cause is OutOfMemoryError)
finish(2)
}
}

@Test
fun testJobAsCompletableFutureCancellation() {
val job = Job()
val future = job.asCompletableFuture()
job.cancel()
try {
expect(1)
future.get()
expectUnreached()
} catch (e: CancellationException) {
assertTrue(future.isCompletedExceptionally)
finish(2)
}
}

@Test
fun testJobCancellation() {
val job = Job()
val future = job.asCompletableFuture()
future.cancel(true)
assertTrue(job.isCancelled)
assertTrue(job.isCompleted)
assertFalse(job.isActive)
}

@Test
fun testDeferredCancellation() {
val deferred = CompletableDeferred<Int>()
val future = deferred.asCompletableFuture()
future.cancel(true)
assertTrue(deferred.isCancelled)
assertTrue(deferred.isCompleted)
assertFalse(deferred.isActive)
assertTrue(deferred.getCompletionExceptionOrNull() is CancellationException)
}
}
41 changes: 0 additions & 41 deletions integration/kotlinx-coroutines-jdk8/test/future/FutureTest.kt
Original file line number Diff line number Diff line change
Expand Up @@ -162,47 +162,6 @@ class FutureTest : TestBase() {
}
}

@Test
fun testCompletedDeferredAsCompletableFuture() = runBlocking {
expect(1)
val deferred = async(start = CoroutineStart.UNDISPATCHED) {
expect(2) // completed right away
"OK"
}
expect(3)
val future = deferred.asCompletableFuture()
assertThat(future.await(), IsEqual("OK"))
finish(4)
}

@Test
fun testWaitForDeferredAsCompletableFuture() = runBlocking {
expect(1)
val deferred = async {
expect(3) // will complete later
"OK"
}
expect(2)
val future = deferred.asCompletableFuture()
assertThat(future.await(), IsEqual("OK")) // await yields main thread to deferred coroutine
finish(4)
}

@Test
fun testAsCompletableFutureThrowable() {
val deferred = GlobalScope.async {
throw OutOfMemoryError()
}

val future = deferred.asCompletableFuture()
try {
future.get()
} catch (e: ExecutionException) {
assertTrue(future.isCompletedExceptionally)
assertTrue(e.cause is OutOfMemoryError)
}
}

@Test
fun testCancellableAwaitFuture() = runBlocking {
expect(1)
Expand Down

0 comments on commit fcfabee

Please sign in to comment.