-
Notifications
You must be signed in to change notification settings - Fork 1.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
CancellableContinuation.invokeOnCancellation should accept a suspend callback #4154
Comments
I now realize the Kotlin cancellation is synchronous, and any suspend functions called during the Any other solutions to this situation? I can think of 2:
suspend fun waitForEvent(): Unit = suspendCancellableCoroutine { continuation ->
setWaitForEventCallback { continuation.resume(Unit) }
continuation.invokeOnCancellation {
runBlocking { clearWaitForEventCallback().await() } // Block the thread that cancels.
}
}
val cancellationLock = Mutex(false)
suspend fun waitForEvent() {
lock.withLock {} // Wait for previous cancellation.
suspendCancellableCoroutine { continuation ->
setWaitForEventCallback { continuation.resume(Unit) }
cancellationLock.lock() // Lock only after setWaitForEventCallback.
continuation.invokeOnCancellation {
clearWaitForEventCallback().addListener({ cancellationLock.tryUnlock() }, <Executor not from this Job, which may have been cancelled>)
}
}
cancellationLock.tryUnlock() // Finished successfully.
}
fun Mutex.tryUnlock() {
try { lock.unlick() }
catch (e: IllegalStateException) {}
} |
Learning about Anyone from the |
Maybe the question is - why is I'd say all 3 need to accept |
Here's what I came up with: import kotlinx.coroutines.*
fun setWaitForEventCallback(block: () -> Unit) {
println("setting the callback")
}
suspend fun clearWaitForEventCallback() {
println("starting the clear...")
delay(100)
println("cleared!")
}
suspend fun CoroutineScope.subscribe() {
suspendCancellableCoroutine<Unit> { cont ->
setWaitForEventCallback { cont.resume(Unit) {} }
cont.invokeOnCancellation {
launch(start = CoroutineStart.ATOMIC) {
withContext(NonCancellable) {
clearWaitForEventCallback()
}
}
}
}
}
fun main() {
runBlocking {
println("Launching")
val job = launch {
subscribe()
}
println("Launched. Wait a bit.")
delay(50)
println("Now cancelling.")
job.cancelAndJoin()
println("Success!")
}
} Prints:
Here's how I arrived at it:
Does it make sense? I should note that it's very uncommon for unsubscription to be asynchronous. Most cases where I've seen this are just from inexperienced developers making everything async, so this could be just Do you have any insight as to why this unsubscription is async (whereas subscribing is synchronous for some reason)? Is it a mistake on the library authors' part, or is it driven by some actual need?
I'm not sure there is a one-size-fits-all solution as to who should actually execute this
With all these flowOf(1, 2, 3).onCompletion { emit(4) }.collect { println(it) } |
Thanks @dkhalanskyjb!
I'm confused - which
This is an Android API, and "subscribing" is basically sending a callback over an IPC (sending a Binder), and clearing the subscription is also an IPC for the same reason. I personally think some IPCs in Android should be blocking, but there's a growing group that think it shouldn't and there's no consensus. There's also constraints on blocking some threads in Android, including the main and Binder threads.
Subscription is also async, that's the main reason I use
Yeah I didn't forget, it's just that That means to me that either the API should be split to 2 (e.g. Making that callback run in |
The receiver of suspend fun CoroutineScope.subscribe() { which is, at that point, the val job = launch {
Ok, noted, thanks!
Is this just for API purity, or do you have actual use cases where these two paths should be distinct? An extra lambda is not free: it's going to add to the learning curve in the cases where it's not needed. All APIs have some cost, we need to have at least some reason to add them. Also, then, it would have to be three lambdas, actually:
It will simply rethrow the error the flow was going to throw anyway.
It's simpler than that, actually: if you're inside |
Oh I missed that receiver, it seems unconventional to accept a context/scope both by declaring the function as a
It's because we've seen that the 2 cases act very differently, one of them is not truly accepting a suspend function, and any suspension and emission will silently fail (as cancellation errors are eaten up). To be clear I'm OK with a single API, it just needs to work for both cases. I can also understand that at this point it's not worth it to change, from a developer migration perspective.
That could just be a nullable exception provided to
Oh, not just in cancellation, also in downstream errors? Looks like you can't continue emitting regardless of which error stopped the flow. Odd...
Yeah that's sensible. The only issue then is that with Let me know what you think |
Sure.
This logic breaks once we are adding parallelism to the mix. Imagine that you've entered
This can happen on any code path due to concurrent cancellation.
You can use some existing coroutine scope if you want. |
Yeah that's true. I wonder if there's a semantic difference between cancellation happening before or after
Yes of course, that's equivalent to synchronous APIs as you basically assume the action will be done by the method returning. That being said, fire-and-forget that executes asynchronously is very ill advised, as it could cause races with follow-up subscriptions (note that in our use case there's a single global callback, the |
In the kotlinx.coroutines framework, cancellation is registered only once. Later attempts to cancel an already-cancelled coroutine will be ignored. If you need custom behavior for repeated cancellation attempts, this should be implemented explicitly via some kind of message passing.
It can be made to work by assigning unique identifiers to subscriptions, like fun subscribe(): Subscription
fun unsubscribe(subscription: Subscription): Job // you can wait for this specific subscription to finish if you want, but you don't have to |
@odedniv, is your original question answered? I think we've strayed a bit from the topic. It's better to stay on point and open separate issues/discussions if there are further questions: this way, if someone finds this issue later, they will have an easier time reading it. |
I agree, we drifted with the discussion on This issue is not really a question, it was a FR - I think just like the A follow-up to this feature request is that since To answer your last comment, which is outside the focus of this issue (we can stop talking about it here):
Sorry, I misread
Yes, but this is not the Java callback API I'm working with. There are different reasonable ways to implement cancellation APIs, the API I'm using roughly uses: // "onData" is the actual callback, and there can be exactly one callback subscribed, follow-up calls will REPLACE it.
// "onSet" is just notifying that "setting the callback" finished.
fun setCallback(onSet, onData)
// Clears the one and only callback.
// Similarly, "onClear" is just notifying that "clearing the callback" finished.
fun clearCallback(onClear) Which isn't great but not unreasonable - I need calls to both set and clear to be serial, i.e. if I call the suspend method that sets the callback, cancel it before it's done, and call it again - it must be set-clear-set and not set-set-clear (which would happen if I'm not waiting for the clear). |
A
There can be more than one
You can write any normal code in
There was never a goal to let people write complex logic in import kotlinx.coroutines.*
fun setWaitForEventCallback(block: () -> Unit) {
println("setting the callback")
}
suspend fun clearWaitForEventCallback() {
println("starting the clear...")
delay(100)
println("cleared!")
}
suspend fun subscribe() {
try {
suspendCancellableCoroutine<Unit> { cont ->
setWaitForEventCallback { cont.resume(Unit) {} }
}
} catch (e: CancellationException) {
withContext(NonCancellable) {
clearWaitForEventCallback()
}
throw e
}
}
fun main() {
runBlocking {
println("Launching")
val job = launch {
subscribe()
}
println("Launched. Wait a bit.")
delay(50)
println("Now cancelling.")
job.cancelAndJoin()
println("Success!")
}
} |
Not sure what that means as it is part of the public API, that IIUC is supposed to help the developer do the right thing.
Oh that's nice, but then it feels like Given that it's so easy to implement without In other words, I still stand by the feature request, but maybe with your solution (that I don't think would be automatically obvious to developers) the feature request is of a lower priority (to me). Feel free to close this issue if you disagree. Also note that I recently learned the hard way that catching |
@qwwdfsad, what do you think about marking |
I was sitting on this idea for a while as well. Filed #4180 |
Use case
I have a Java async API that lets me set a callback and clear it. Cancelling that API means clearing the callback, but the API to clear the callback is also async.
I want to convert this to a suspend function, so I use:
However this is incorrect, as I'm not waiting for
clearWaitForEventCallback
, which means there's nothing allowing the caller to synchronize the cancellation of a previous job with a new call:What I really want to call is
clearWaitForEventCallback.await()
, which would allow the caller to join the job after cancellation to ensure synchronization:The Shape of the API
I believe adding overloads this way will not be a breaking API change, and so doesn't require a major version bump.
Prior Art
In a normal suspend function, I would catch the
CancellationException
and would still be in a coroutine, allowing me to invoke other async functions.This means that in theory,
invokeOnCancellation
can accept a coroutine context. The workaround to the current design flaw is that I have to do this:This workaround is not obvious, and so every developer needs to figure something out which can easily arrive at an errored solution (as I have already).
The text was updated successfully, but these errors were encountered: