Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce SharedFlow and sharing operators #2069

Merged
merged 32 commits into from
Oct 13, 2020
Merged

Introduce SharedFlow and sharing operators #2069

merged 32 commits into from
Oct 13, 2020

Conversation

elizarov
Copy link
Contributor

@elizarov elizarov commented May 29, 2020

Summary of changes:

  • SharedFlow, MutableSharedFlow and its constructor.
  • StateFlow implements SharedFlow.
  • SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
  • BufferOverflow strategy in kotlinx.coroutines.channels package.
  • shareIn and stateIn operators and SharingStarted strategies for them.
  • SharedFlow.flowOn error lint (up from StateFlow).
  • Precise cancellable() operator fusion.
  • Precise distinctUntilChanged() operator fusion.
  • StateFlow.compareAndSet function.
  • asStateFlow and asSharedFlow read-only view functions.
  • Consistently clarified docs on cold vs hot flows.
  • Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
  • Channel(...) constructor function has onBufferOverflow parameter.
  • buffer(...) operator has onBufferOverflow parameter.
  • shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
  • shareIn/stateIn fuse with upstream flowOn for more efficient execution.
  • conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
  • Added reactive operator migration hints.
  • WhileSubscribed with kotlin.time.Duration params

Fixes #2034
Fixes #2047

@elizarov
Copy link
Contributor Author

I just noticed this sentence for the first time. Do I understand correctly that this means that if the buffer overflow mode is DROP_OLDEST or SUSPEND, it will effectively act like be DROP_LATEST until the first subscriber subscribes?

Yes, just can think of it this way. The correct mental model, though, is that until the first subscriber subscribers there's always an internal very-fast subscriber that consumes all subscribed messages immediately (so that emit never suspends) so that all the other user-created subscriptions just see replay most recently emitted values.

@elizarov elizarov changed the title [DRAFT] Introduce SharedFlow and sharing operators Introduce SharedFlow and sharing operators Jun 15, 2020
@elizarov elizarov marked this pull request as ready for review June 15, 2020 16:14
@elizarov elizarov requested a review from qwwdfsad June 15, 2020 16:43
@elizarov
Copy link
Contributor Author

This PR is now complete, squashed, and rebased onto the develop branch.

@zach-klippenstein
Copy link
Contributor

The correct mental model, though, is that until the first subscriber subscribers there's always an internal very-fast subscriber that consumes all subscribed messages immediately (so that emit never suspends) so that all the other user-created subscriptions just see replay most recently emitted values.

That's clear and makes sense for the SUSPEND case, but I'm more confused about DROP_OLDEST now. If the overflow mode is DROP_OLDEST and more than replay items are emitted before the first subscriber subscribes, are the oldest ones or the latest ones dropped?

* [emit][MutableSharedFlow.emit] does not suspend while there is a buffer space remaining (optional, cannot be negative, defaults to zero).
* @param onBufferOverflow configures an action on buffer overflow (optional, defaults to
* [suspending][BufferOverflow.SUSPEND] attempt to [emit][MutableSharedFlow.emit] a value,
* supported only when `replay > 0` or `extraBufferCapacity > 0`).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does "supported" mean here? If replay == 0, is this value just ignored? DROP_LATEST and DROP_OLDEST seem like they would both mean emitters would never suspend but otherwise be equivalent, but SUSPEND could still suspend emitters.

Copy link
Contributor Author

@elizarov elizarov Jun 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It means that IllegalArgumentException is thrown if you try to do replay = 0 and onBufferOverflow != SUSPEND

Copy link
Contributor

@zach-klippenstein zach-klippenstein Jun 16, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I see the general "This function throws [IllegalArgumentException] on unsupported values of parameters of combinations thereof" warning at above, but I think it would be more clear to explicitly put your reply to my comment in the docs. It's a little more verbose, but given the complexity of the interactions of the various values of these parameters I think it would be worth it.

@elizarov
Copy link
Contributor Author

That's clear and makes sense for the SUSPEND case, but I'm more confused about DROP_OLDEST now. If the overflow mode is DROP_OLDEST and more than replay items are emitted before the first subscriber subscribes, are the oldest ones or the latest ones dropped?

onBufferOverflow happens only when there is a buffer. There is a buffer only when there are slow subscribers that cannot keep up. No slow subscribers, no buffer, no overflow, does not matter what overflow strategy is set at all.

@1zaman
Copy link
Contributor

1zaman commented Jun 16, 2020

While looking through the documentation, I came across quite a few minor grammatical errors. If you like I can point them out here, but maybe it can benefit from some proofreading.

@elizarov
Copy link
Contributor Author

elizarov commented Jun 16, 2020

While looking through the documentation, I came across quite a few minor grammatical errors. If you like I can point them out here, but maybe it can benefit from some proofreading.

You can directly point to places in the code via GitHub interface (and "Files Changed" tab) and even provide suggested replacements.

Copy link
Contributor

@1zaman 1zaman left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here is the grammar review of the documentation and comments. Most issues are quite minor, but there are a lot of them. Many are probably subjective though, so feel free to change or ignore as seems appropriate.

I have provided suggested replacements, but did not make any adjustments for line wrapping, which might also need to be accounted for when implementing the changes.

kotlinx-coroutines-core/common/src/channels/Broadcast.kt Outdated Show resolved Hide resolved
kotlinx-coroutines-core/common/src/channels/Channel.kt Outdated Show resolved Hide resolved
kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt Outdated Show resolved Hide resolved
kotlinx-coroutines-core/jvm/test/flow/SharingStressTest.kt Outdated Show resolved Hide resolved
reactive/kotlinx-coroutines-reactive/src/ReactiveFlow.kt Outdated Show resolved Hide resolved
ui/coroutines-guide-ui.md Outdated Show resolved Hide resolved
@elizarov elizarov force-pushed the shared-flow branch 5 times, most recently from b2f2c1f to 78e7e75 Compare July 3, 2020 15:31
@p-lr
Copy link

p-lr commented Jul 7, 2020

Any chance to have this reviewed by @qwwdfsad and merged before Kotlin 1.4 is released? ;)
I'm super excited by those changes.

@elizarov elizarov force-pushed the shared-flow branch 4 times, most recently from 4e1d3f5 to 6566e1e Compare July 7, 2020 17:22
@qwwdfsad qwwdfsad self-requested a review October 12, 2020 15:25
Copy link
Collaborator

@qwwdfsad qwwdfsad left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

synchronized(this) {
val oldState = _state.value
if (expectedState != null && oldState != expectedState) return false // CAS support
if (oldState == newState) return true // Don't do anything if value is not changing, but CAS -> true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oldState is volatile and can be read without a lock.
Its value can be changed during a comparison, but it's inseparable from an arbitrary interleaving of two updates.
Tho oldValue still has to be re-read under the lock, so maybe it's not worth it

@qwwdfsad
Copy link
Collaborator

Last minute change: let's mark ConflatedBroacastChannel and BroadcastChannel as obsolete to completely deprecate them in the future

@elizarov elizarov merged commit 34c3464 into develop Oct 13, 2020
@elizarov elizarov deleted the shared-flow branch October 13, 2020 11:03
@prithivraj
Copy link

This is amazing!
Will this be part of 1.4.0?
When can we start using it?

@LouisCAD
Copy link
Contributor

@prithivraj Look at the label, that will answer your first question.
Regarding when… I think time will tell. Also, there'll be a "Coroutines Update" talk later today here: https://kotlinlang.org/lp/event-14/, you might be interested.

recheej pushed a commit to recheej/kotlinx.coroutines that referenced this pull request Dec 28, 2020
* Introduce SharedFlow and sharing operators

Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes Kotlin#2034
Fixes Kotlin#2047

Co-authored-by: Ibraheem Zaman <[email protected]>
Co-authored-by: Thomas Vos <[email protected]>
Co-authored-by: Travis Wyatt <[email protected]>
recheej pushed a commit to recheej/kotlinx.coroutines that referenced this pull request Dec 28, 2020
* Introduce SharedFlow and sharing operators

Summary of changes:
* SharedFlow, MutableSharedFlow and its constructor.
* StateFlow implements SharedFlow.
* SharedFlow.onSubscription operator, clarified docs in other onXxx operators.
* BufferOverflow strategy in kotlinx.coroutines.channels package.
* shareIn and stateIn operators and SharingStarted strategies for them.
* SharedFlow.flowOn error lint (up from StateFlow).
* Precise cancellable() operator fusion.
* Precise distinctUntilChanged() operator fusion.
* StateFlow.compareAndSet function.
* asStateFlow and asSharedFlow read-only view functions.
* Consistently clarified docs on cold vs hot flows.
* Future deprecation notice for BroadcastChannel, ConflatedBroadcastChannel, broadcast, and broadcastIn.
* Channel(...) constructor function has onBufferOverflow parameter.
* buffer(...) operator has onBufferOverflow parameter.
* shareIn/stateIn buffer and overflow strategy are configured via upstream buffer operators.
* shareIn/stateIn fuse with upstream flowOn for more efficient execution.
* conflate() is implemented as buffer(onBufferOverflow=KEEP_LATEST), non-suspending strategies are reasonably supported with 0 and default capacities.
* Added reactive operator migration hints.
* WhileSubscribed with kotlin.time.Duration params

Fixes Kotlin#2034
Fixes Kotlin#2047

Co-authored-by: Ibraheem Zaman <[email protected]>
Co-authored-by: Thomas Vos <[email protected]>
Co-authored-by: Travis Wyatt <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet