Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

completionHandler is now an internal API of the ReceiveChannel operators - what's the alternative? #994

Closed
ZakTaccardi opened this issue Feb 18, 2019 · 1 comment
Labels
docs KDoc and API reference question

Comments

@ZakTaccardi
Copy link

ZakTaccardi commented Feb 18, 2019

I wrote two custom RxJava style operators until #437 / #254 are addressed. I want to ensure that they are cleaned up correctly, so I leveraged onCompletion = consumesAll(*sources). My 0.x code is below:

/**
 * Merges multiple [ReceiveChannel]s of the same type [T] into a single [ReceiveChannel]
 */
fun <T> merge(
    vararg sources: ReceiveChannel<T>,
    context: CoroutineContext = Dispatchers.Unconfined
): ReceiveChannel<T> =
    GlobalScope.produce(context, onCompletion = consumesAll(*sources)) {
        sources.forEach { source ->
            launch { source.consumeEach { send(it) } }
        }
    }

/**
 * Emit [E] only when it is not equal to the previous emission. The first emission will always be
 * emitted. Use this to not emit the same value twice in a raw.
 *
 * `.equals()` equality comparison will be used.
 *
 * Equivalent to RxJava's `.distinctUntilChanged()` operator.
 */
fun <E> ReceiveChannel<E>.distinctUntilChanged(
    context: CoroutineContext = Dispatchers.Unconfined
): ReceiveChannel<E> = GlobalScope.produce(context, onCompletion = consumes()) {
    val last = AtomicReference<E>()
    var wasInitialized = false

    consumeEach { emission ->
        if (!wasInitialized) {
            // first emission
            last.set(emission)
            wasInitialized = true
            send(emission)
        } else {
            // we have a previous emission to compare to
            if (emission != last.get()) {
                // a distinct value has appeared
                last.set(emission)
                send(emission)
            }
        }
    }
}

Unfortunately for me, the 1.x APIs for this are now an @InternalCoroutinesApi because of their use of the onCompletion: CompletionHandler. How can I modify these I don't need to rely on the internal onCompletion API?

Would changing it to the following be a safe alternative? If so, could this be pointed out in the documentation?

GlobalScope.produce(context) {
    this@produce.coroutineContext[Job]!!.invokeOnCompletion(consumesAll(*sources))
    // ...
}
@qwwdfsad
Copy link
Collaborator

Would changing it to the following be a safe alternative?

Yes, this is how it is actually implemented internally. Also, probably this is a good argument for #934

If so, could this be pointed out in the documentation?

Not the part with invokeOnCompletion, but with produce own Job and its lifecycle. Thanks for pointing that out.

This API became internal because we are currently actively prototyping cold-streams.
With cold-streams we expect to all intermediate operators on channels to be no longer needed (because channels were not intended to be used as rx replacement) and then API of produce/actor/and its operators will be revisited again.

@qwwdfsad qwwdfsad added the docs KDoc and API reference label Feb 19, 2019
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
docs KDoc and API reference question
Projects
None yet
Development

No branches or pull requests

2 participants