Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enchancement -- add recv_many for mpsc channels? #5844

Closed
aschweig opened this issue Jul 1, 2023 · 9 comments
Closed

Enchancement -- add recv_many for mpsc channels? #5844

aschweig opened this issue Jul 1, 2023 · 9 comments
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-sync Module: tokio/sync

Comments

@aschweig
Copy link
Contributor

aschweig commented Jul 1, 2023

Inspired by Alice Ryhl's Actors with Tokio it occurs to me that the mpsc channels might support batch receipt of messages.

Circumstances can exist where multiple messages are queued for the receiver by the time the receiving task runs -- and the receiving task may benefit from batch as opposed to item-by-item delivery, e.g., update a row based on a primary key but only the most recent update is needed. I propose a new method, recv_many, that enables the receiver to process effectively all available messages -- in this example allowing more efficient last-first processing. The public signature in bounded.rs and unbounded.rs would replace recv's Option with a Vec:

pub async fn recv_many(&mut self) -> Vec<T>

I have pushed one possible implementation of the feature.
This commit adds fn peek to sync/mpsc/list.rs and fn num_acquired & add_permits to the chan.rs Semaphore trait; the latter two additions to minimize reallocations and semaphore updates.

@aschweig aschweig added A-tokio Area: The main tokio crate C-feature-request Category: A feature request. labels Jul 1, 2023
@Darksonn Darksonn added the M-sync Module: tokio/sync label Jul 1, 2023
@wathenjiang
Copy link
Contributor

wathenjiang commented Jul 2, 2023

std::sync::mpsc::channel has no method to recv multiple messages as well. Does your version in such use scene has a better performance?

@aschweig
Copy link
Contributor Author

aschweig commented Jul 2, 2023

My proposed approach in chan.rs provides only modest optimization over what is there already. It basically pops each item off the list as the old recv method does, but is able to pre-allocate memory and free semaphores in a single update.

@wathenjiang
Copy link
Contributor

wathenjiang commented Jul 2, 2023

I see.
There is an altinative, perhaps we can pre-allocate memory ourselves before we call Receiver.recv(), instead of in Receiver::recv_many.
And pop n messages once, and put back n permits back to semaphore may lead to more latency cost in some case, which means some senders may not be woken up util n permits put back to semaphore.

@Darksonn
Copy link
Contributor

Darksonn commented Jul 2, 2023

It's true that this could act as an optimization, but I'm not convinced that this is enough to justify adding yet another method.

@aschweig
Copy link
Contributor Author

aschweig commented Jul 3, 2023

In Castor Actors the low-level actor processes a list of messages in bulk; single-message "Simple" actors essentially iterate on the available list.

Here, recv_many enables a similar approach; I see a specific throughput benefit where they may be a series of slow external update operations and only the last matters.

An alternative might be to have a channel with an effective queue size of 1 -- send would not block and would replace the existing item, if present. But: this approach necessitates as many channels as there are things to update.

@aschweig
Copy link
Contributor Author

aschweig commented Sep 9, 2023

I updated my experimental recv_many implementation with substantially two changes:

  1. A mutable buffer is now passed in to recv_many -- so as to be able to reduce the number of memory allocations. recv_many returns the number of items populated. The buffer is cleared, but capacity is only every increased.

  2. I've added performance benchmarks in sync_mpsc_recv_many.rs:

Running 12 tests
test contention_bounded_full_updater_recv         ... bench:   1,578,502 ns/iter (+/- 399,531)
test contention_bounded_full_updater_recv_many    ... bench:   1,404,322 ns/iter (+/- 449,397)
test contention_bounded_updater_publish_recv      ... bench:  56,597,463 ns/iter (+/- 1,476,583)
test contention_bounded_updater_publish_recv_many ... bench:     243,983 ns/iter (+/- 134,409)
test contention_bounded_updater_recv              ... bench:   1,413,729 ns/iter (+/- 365,168)
test contention_bounded_updater_recv_many         ... bench:   1,458,341 ns/iter (+/- 514,528)
test contention_unbounded_updater_recv            ... bench:   1,919,008 ns/iter (+/- 899,328)
test contention_unbounded_updater_recv_many       ... bench:   1,958,693 ns/iter (+/- 1,158,911)
test uncontented_bounded_updater_recv             ... bench:     541,680 ns/iter (+/- 14,341)
test uncontented_bounded_updater_recv_many        ... bench:     380,017 ns/iter (+/- 7,289)
test uncontented_unbounded_updater_recv           ... bench:     330,919 ns/iter (+/- 7,252)
test uncontented_unbounded_updater_recv_many      ... bench:     246,028 ns/iter (+/- 23,509)

The benchmarks are based on benches/sync_mpsc.rs; the test contention_bounded_full_updater_recv is virtually identical to contention_bounded_full -- except it returns the last value received. The test contention_bounded_full_updater_recv_many does the same thing but with recv_many. Each of the *updater_recv and *updater_recv_many tests are likewise constructed.

The tests contention_bounded_updater_publish_recv and contention_bounded_updater_publish_recv are different. They add a thread_sleep for 1ns each time the last value is updated; for the recv_many version there are far fewer of these slow updates. Consider an actor who needs to update a dashboard somewhere -- in the event of a burst of messages -- a one-at-a-time actor needs sophisticated logic to forward on only the last message of the burst. Here recv_many doesn't show raw performance improvement, but affords better ergonomics to achieve performance for a specific but typical use-case.

Commit is here with recently rebased changes:
aschweig@7f132d6

@leptonyu
Copy link

leptonyu commented Dec 11, 2023

poll_recv_many is not exposed, can you expose it, too? Similar as poll_recv. @aschweig

@Darksonn
Copy link
Contributor

Closing as this issue was resolved by #6010.

@leptonyu If you have an additional feature request, then please open a new issue.

@leptonyu
Copy link

#6209

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate C-feature-request Category: A feature request. M-sync Module: tokio/sync
Projects
None yet
Development

No branches or pull requests

4 participants