Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix asFlow blocking thread #3980

Conversation

Ericliu001
Copy link

Fixes the issue #3979

Changes the ChannelFlow's buffer to UNLIMITED when asFlow() is called

@dkhalanskyjb
Copy link
Collaborator

This PR does not respect the contribution guidelines (https://github.com/Kotlin/kotlinx.coroutines/blob/master/CONTRIBUTING.md#submitting-prs), but more importantly, it's also not the change we'd like to introduce, as this can easily be expressed already by applying buffer().

@Ericliu001
Copy link
Author

This PR does not respect the contribution guidelines (https://github.com/Kotlin/kotlinx.coroutines/blob/master/CONTRIBUTING.md#submitting-prs), but more importantly, it's also not the change we'd like to introduce, as this can easily be expressed already by applying buffer().

Having to apply buffer() as the consumer of the API requires prior knowledge of this potential blocking behavior, i believe the framework should have buffer(UNLIMITED) by default.

Shall we still integrate the UNLIMITED buffer somewhere?

@qwwdfsad
Copy link
Collaborator

qwwdfsad commented Dec 11, 2023

Among other things, UNLIMITED buffer means unlimited buffer size and unlimited memory consumption that we would very much like to avoid.

@Ericliu001
Copy link
Author

Ericliu001 commented Dec 11, 2023

Among other things, UNLIMITED buffer means unlimited buffer size and unlimited memory consumption that we would very much like to avoid.

I think this situation is very much "choosing your poison": either blocking the thread or consuming more memory. The BehaviorSubject in Rx has chosen the latter. IMO, asFlow() would ideally keep the behavior consistent with Rx, so there is no surprises downstream.

https://github.com/ReactiveX/RxJava/blob/3.x/src/main/java/io/reactivex/rxjava3/subjects/BehaviorSubject.java

 if (emitting) {
                        AppendOnlyLinkedArrayList<Object> q = queue;
                        if (q == null) {
                            q = new AppendOnlyLinkedArrayList<>(4);
                            queue = q;
                        }
                        q.add(value);
                        return;
                    }

@psteiger
Copy link

psteiger commented Dec 11, 2023

I agree that asFlow() default of bounded buffer is inherently dangerous, especially on single-threaded, immediate-dispatchers contexts.

Rx2 uses unbounded buffers in all non-back-pressured streams. It's OOM-susceptible by nature. I feel that using an unlimited buffer on Rx <-> coroutines integration in those cases is not adding much more risk, given the already existing OOM risk from Rx.

On the other hand, having a limited buffer can easily cause thread-locking issues. Example:

private object DirectDispatcher : CoroutineDispatcher() {
    override fun dispatch(context: CoroutineContext, block: Runnable) {
        block.run()
    }
}

suspend fun main() = withContext(DirectDispatcher) {
    channelFlow {
            trySendBlocking(1)
        }
        .buffer(0)
        .collect { println("Got $it") }
}

This code will block the thread at trySendBlocking and will never return, because by the time we reach trySendBlocking, the collector is not installed, and we have no buffer space (nor a rendezvous chance), and because the thread is now blocked, the collector will never be installed (producer coroutine here starts producing before the collector is installed as an effect of direct dispatcher).

We can easily see that happening by looking at the implementation:

override suspend fun collect(collector: FlowCollector<T>): Unit =
  coroutineScope {
    collector.emitAll(produceImpl(this))
  }

ChannelFlow collect method impl (produceImpl will never return, collector will never be installed, rendezvous won't happen.)

I have used a direct dispatcher and a rendezvous channel for demonstration purposes, but this scenario can still happen in single-thread context / immediate dispatchers, such as Android + Dispatchers.Main.immediate

@psteiger
Copy link

Related to #3282

@dkhalanskyjb
Copy link
Collaborator

Having to apply buffer() as the consumer of the API requires prior knowledge of this potential blocking behavior

Likewise, if there was no blocking behavior, your message could be rephrased as "having to apply buffer() as the consumer of the API requires prior knowledge of this potential unbounded memory consumption." In any case, someone will miss something by not reading the documentation.

dangerous, especially on single-threaded, immediate-dispatchers contexts.

What's good about these contexts is that usually, the problem is visible immediately during development and can be fixed. On the other hand, an OOM can be hidden, silently sneaking into production.

@psteiger
Copy link

psteiger commented Dec 14, 2023

@dkhalanskyjb I think it's fair to notice what I believe is an inconsistency in the way kotlinx.coroutines handles Observable <> coroutines interop, then:

ObservableSource<T>.collect { } already uses an UNLIMITED channel + trySend under the hood, which is what I would also expect for asFlow().collect { } (and which I believe is a saner and safer default than BUFFERED + trySendBlocking -- allocating a bit more memory is almost always a better alternative than blocking a thread):

public suspend inline fun <T> ObservableSource<T>.collect(action: (T) -> Unit): Unit =
    toChannel().consumeEach(action)

@PublishedApi
internal fun <T> ObservableSource<T>.toChannel(): ReceiveChannel<T> {
    val channel = SubscriptionChannel<T>()
    subscribe(channel)
    return channel
}

private class SubscriptionChannel<T> :
    BufferedChannel<T>(capacity = Channel.UNLIMITED), Observer<T>, MaybeObserver<T>
{
  // ...
    override fun onNext(t: T & Any) {
        trySend(t) // Safe to ignore return value here, expectedly racing with cancellation
    }
  // ...
}

@dkhalanskyjb
Copy link
Collaborator

@psteiger, here are the incompatible things:

  • The way of kotlinx-coroutines' Flow, where we mostly use channels with limited buffers,
  • And the way RxJava (in particular, ObservableSource) works, which is either to use unlimited stack space and process all emissions immediately or to use unbounded buffers.

There are two intended use cases for kotlinx-coroutines-rx2: either someone knows RxJava well but needs to interface with coroutines, or they know kotlinx-coroutines well and need to interface with RxJava.

When converting an RxJava entity to a kotlinx-coroutines entity, we must introduce some inconsistency: either between a typical Flow and the one from RxJava, or between an RxJava entity and the Flow received from it. We chose the latter approach: when converting to a kotlinx-coroutines entity, you now subscribe to how we do things here. That's the behavior that the people well-versed in kotlinx-coroutines would expect. ObservableSource.collect, on the other hand, is not fully in the kotlinx-coroutines world, it's not a converter but an extension function on ObservableSource, so it's kept close to what RxJava folks would want.

Luckily, as stated in #3979 (comment), the inconsistency between Flow and RxJava is easy to mitigate. If you want to change your failure mode, you have a way to do so.

@psteiger
Copy link

psteiger commented Mar 9, 2024

@dkhalanskyjb Thanks for sharing the design details, and overall I find them reasonable. But I still maintain the position for some change here (either in impl, or in docs).

If you want to change your failure mode, you have a way to do so.

No one wants to fail with a deadlock, but unfortunately it's far too easy to use ObservableSource.asFlow() in specific contexts without realizing the risk. It's a default that is dangerous in specific contexts; versus some other potential default that is safe against deadlocks in all contexts.

If the end decision is to keep the implementation, I'd call for an emphasized warning on asFlow docs on which particular cases asFlow usage can be dangerous.

@Mustafaubaid4
Copy link

Mustafaubaid4 commented Mar 9, 2024 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

5 participants