Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add recv_many for mpsc channels #6010

Merged
merged 37 commits into from
Oct 17, 2023
Merged

Add recv_many for mpsc channels #6010

merged 37 commits into from
Oct 17, 2023

Conversation

aschweig
Copy link
Contributor

@aschweig aschweig commented Sep 15, 2023

Introduces recv_many that allows mpsc receivers to get all available messages on a queue in bulk, as opposed to one-at-a-time. In the use case where message arrivals are bursty and only the last sent message matters, e.g., when updating a dashboard, this approach provides an easy approach to avoiding unnecessary work resulting in substantial speedup. The approach can also provide a modest raw speedup over recv as shown in the 'uncontented_bounded_updater_*' and uncontented_unbounded_updater_* benchmarks included in this PR.

Sample usage from proposed test in sync_mpsc.rs:

async fn send_recv_many_unbounded() {
    let (tx, mut rx) = mpsc::unbounded_channel::<i32>();

    // Using `try_send`
    assert_ok!(tx.send(7));
    assert_ok!(tx.send(13));
    assert_ok!(tx.send(100));
    assert_ok!(tx.send(1002));

    let mut buffer = vec![0; 0];
    assert_eq!(rx.recv_many(&mut buffer).await, 4);
    assert_eq!(vec![7,13,100,1002], buffer);
    assert!(buffer.capacity() >= 4);

    drop(tx);

    assert!(rx.recv().await.is_none());
}

Calls to recv_many will clear the passed-in buffer and, if needed, reserve additional capacity.

Pull request for feature enhancement discussed here:

#5844

edit:clarified origin of code sample.

@github-actions github-actions bot added the R-loom-sync Run loom sync tests on this PR label Sep 15, 2023
@Darksonn
Copy link
Contributor

In the use case where message arrivals are bursty and only the last sent message matters, e.g., when updating a dashboard, this approach provides an easy approach to avoiding unnecessary work resulting in substantial speedup.

Why not use the watch channel for this?

@Darksonn Darksonn added A-tokio Area: The main tokio crate M-sync Module: tokio/sync labels Sep 20, 2023
@aschweig
Copy link
Contributor Author

Agree that the watch channel is appropriate if one target is to be updated.

recv_many enables use of a single channel for an arbitrary number of topics while retaining FIFO mailbox order; the latter property is important for the calculation of some rolling statistics like sliding window averages (e.g., average of last 5 observations). It also allows the receiver to measure channel volume. In the actor use-case, I find the ergonomics of a single channel/ActorMessage preferable to using tokio::select! over distinct control and data channels; recv_many allows the single channel to achieve some of the message eliding gains of watch.

@aschweig
Copy link
Contributor Author

I'd like to solicit feedback on adding another parameter to recv_many to set the maximum number of retrieved elements in the buffer, where a non-positive value would be unbounded. For example, the following would populate no more than 1000 elements:

rx.recv_many(&mut buffer, 1000) // at most 1000 elements populated; helpful?

Are there meaningful cases when memory is at a premium but an unbounded queue is required? Or, thinking about an actor, for liveness reasons, an actor might want to limit the number of messages received in one chunk. While actors could yield_now().await; while processing very large message buffers -- the goal is to reduce complexity and this helps along these lines.

@Darksonn
Copy link
Contributor

Usually this kind of API never increases the capacity of the vector unless it's equal to the length. This defines a limit.

@aschweig
Copy link
Contributor Author

Usually this kind of API never increases the capacity of the vector unless it's equal to the length. This defines a limit.

I think I am a bit confused on your meaning -- can you point me in the direction of some examples? Couldn't this again result in unbounded capacity growth if the caller fails to clear the buffer between calls?

In the proposed chan.rs fn recv_many -- the capacity is currently increased if the number of acquired permits suggests more messages than the vector buffer capacity, with the goal of anticipating the total required capacity up front:

                            let capacity = self.inner.semaphore.num_acquired();
                            if buffer.capacity() < capacity {
                                buffer.reserve(capacity);
                            }

Maybe the cleanest way to bound the buffer is to allow recv_many to increase the capacity to some small constant, e.g., 2 if the capacity is zero. I choose the magic value 2 here as the smallest number consistent with receiving more than one. Users who want a specific additional capacity would need to reserve that capacity up-front before the call.

@Darksonn
Copy link
Contributor

I mean that if buffer.capacity() < capacity, then it doesn't receive all of the messages.

But an argument with a limit is also fine.

tokio/src/sync/mpsc/chan.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/list.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/chan.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/bounded.rs Outdated Show resolved Hide resolved
@aschweig
Copy link
Contributor Author

aschweig commented Sep 24, 2023

Usually this kind of API never increases the capacity of the vector unless it's equal to the length. This defines a limit.

I think I am a bit confused on your meaning -- can you point me in the direction of some examples? Couldn't this again result in unbounded capacity growth if the caller fails to clear the buffer between calls?

In the proposed chan.rs fn recv_many -- the capacity is currently increased if the number of acquired permits suggests more messages than the vector buffer capacity, with the goal of anticipating the total required capacity up front:

                            let capacity = self.inner.semaphore.num_acquired();
                            if buffer.capacity() < capacity {
                                buffer.reserve(capacity);
                            }

Maybe the cleanest way to bound the buffer is to allow recv_many to increase the capacity to some small constant, e.g., 2 if the capacity is zero. I choose the magic value 2 here as the smallest number consistent with receiving more than one. Users who want a specific additional capacity would need to reserve that capacity up-front before the call.

I mean that if buffer.capacity() < capacity, then it doesn't receive all of the messages.

But an argument with a limit is also fine.

I am going to rework my implementation to accept a capacity argument. That will simplify the interaction with the Semaphore. I will also simplify the logic in chan.rs recv_many.

@aschweig
Copy link
Contributor Author

So, I decided to let the input buffer set the capacity. In the event of a zero-capacity vector is passed, it reserves BLOCK_CAP capacity to avert a run-time error. I removed peek() and num_acquired() as they are not required for the recv_many use-case.

tokio/src/sync/mpsc/chan.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/chan.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/bounded.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point, I think the main thing remaining is adjusting documentation.

tokio/src/sync/mpsc/chan.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/unbounded.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/unbounded.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/unbounded.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/unbounded.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/unbounded.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/unbounded.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/unbounded.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/unbounded.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/unbounded.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have a few more documentation nits. I know we've gone through many review iterations for this. Please let me know if it's been too many. I would be happy to merge it as it is now if so.

tokio/src/sync/mpsc/bounded.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/bounded.rs Outdated Show resolved Hide resolved
Comment on lines 249 to 252
/// If at the time of the call `buffer` has unused capacity,
/// `recv_many` extends the buffer with no more elements than
/// its unused capacity. Should `buffer` have no initial unused
/// capacity, additional elements are first reserved.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was looking at this paragraph, and I think it could be improved like this:

Suggested change
/// If at the time of the call `buffer` has unused capacity,
/// `recv_many` extends the buffer with no more elements than
/// its unused capacity. Should `buffer` have no initial unused
/// capacity, additional elements are first reserved.
/// If `buffer` has unused capacity, then this call will not reserve
/// additional space in `buffer`. This means that the maximum number of
/// received messages is `buffer.capacity() - buffer.len()`. However, if
/// the capacity is equal to the length, then this call will increase the
/// capacity to make space for additional elements.

This actually raises a question: Perhaps it makes sense to only reserve space when we return a message? This way, we don't consume extra memory until a message arrives, and if the channel gets closed, we don't reserve any space.

What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like your language here better.

I also like the idea of not allocating memory unless needed. If the channels are very lightweight perhaps there are often created and a finite number of messages sent and then closed. Why force the caller to always allocate one more message than is actually sent?

In chan.rs, recv_many:

let mut insufficient_capacity = buffer.capacity() == buffer.len();
...
                () => {
                    while (buffer.len() < buffer.capacity() || insufficient_capacity) {
                        match rx_fields.list.pop(&self.inner.tx) {
                            Some(Read::Value(value)) => {
                                if insufficient_capacity {
                                    buffer.reserve(super::BLOCK_CAP);
                                    insufficient_capacity = false;
                                }
                                buffer.push(value);
                            }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it make sense to urge users to manage the buffer's capacity outside recv_many?
e.g.,

If buffer has unused capacity, then this call will not reserve
additional space in buffer. This means that the maximum number of
received messages is buffer.capacity() - buffer.len().
Efficient client code should ensure buffer has unused capacity,
but if the capacity is equal to the length and there is at least one message
in the channel's queue, then this call will conservatively increase the capacity
to make space for additional elements.

tokio/src/sync/mpsc/bounded.rs Outdated Show resolved Hide resolved
Comment on lines 235 to 238
/// This method extends `buffer` by no more than a fixed number
/// of values as specified by `limit`. If `limit` is zero,
/// then a default is used. The return value is the number
/// of values added to `buffer`.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the limit is zero, then I think it would be okay to just return 0 immediately.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the caller wants a no-op, they can instead simply not-call the method, instead of calling with a zero limit.

Furthermore, immediately returning 0 means that this behavior becomes unique to a single value of limit; all other values of limit only return 0 if the channel is closed and no values are pending. So it breaks the API's guarantee for a case of no practical value to someone seeking to receive messages on the channel.

I considered the following possible ways to handle a 0 value for limit:

  • The fail-fast approach -- assert!(limit > 0);.
  • Have it so that limit=0 acts the same way as limit=usize::max_value().
  • Have it so that limit=0 results in some reasonable default behavior, e.g., retrieve 2 or 8 messages or BLOCK_CAP. I liked BLOCK_CAP because it corresponded to the internal chunking used.

Copy link
Contributor Author

@aschweig aschweig Oct 4, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A tangentially related discussion: #2742
If limit is computed at run-time and 0, then my sense is both assert!(...) and immediately return 0 violate the principal of least surprise as in both cases the function doesn't do what it is named. But the assert approach also pops up within the same file, so it unsurprising from an implementation standpoint.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm. I think immediately returning 0 is the least surprising behavior. After all, you asked to receive zero messages, and you got zero messages.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recall, you wrote:

Perhaps we can swap these paragraphs, and add something to the paragraph about 0 along the lines of "this method will never return 0 if the channel is not closed" or "if the channel is not closed, then this method will never return 0".

Nonetheless, I see that the same no-op convention is used in unistd.h's read function... and evidence of users running into the types of bugs I anticipate:
https://stackoverflow.com/a/3074217

If this is acceptable, I'll update the handling and the documentation.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If what is acceptable?

I mainly see people be confused about length zero reads when they do this:

let mut vec = Vec::with_capacity(1024);
io.read(&mut vec);

and are surprised because &mut vec becomes a slice of length zero.

But that doesn't happen in our case.

Copy link
Contributor Author

@aschweig aschweig Oct 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Understood regarding io.read(&mut vec); -- here the issue is resolved as we take a Vec.

The behavior of recv_many(buf, 0) is similar to calling rd.take(0). I am unclear when in an async context take(0) is ever useful -- but it is allowed -- so at least recv_many(buf, 0) is no worse.

Contrived example adapted from Example: split a file into chunks

#[tokio::test]
async fn take_zero() {
    use tokio::fs::File;
    use std::path::PathBuf;
    for chunk_size in [0, 1, 1024] {
        let mut input_path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
        input_path.push("tests/io_take.rs");
        let input_file = File::open(input_path).await.unwrap();
        let mut output_path = PathBuf::from(env!("CARGO_TARGET_TMPDIR"));
        output_path.push("takezero");
        if output_path.exists() {
            tokio::fs::remove_file(&output_path).await.unwrap();
        }
        assert!(!output_path.exists());

        // the `.take` method is coming from the AsyncReadExt trait imported above
        let mut reader = input_file.take(chunk_size);
        let mut output = File::create(&output_path).await.unwrap();
        loop {
            let bytes_copied = tokio::io::copy(&mut reader, &mut output).await.unwrap();
            if bytes_copied == 0 {
                break;
            }
            // our reader is now exhausted, but that doesn't mean the underlying reader
            // is. So we recover it, and we create the next chunked reader
            reader = reader.into_inner().take(chunk_size);
        }
        let file_size = tokio::fs::metadata(output_path).await.unwrap().len();
        assert!((chunk_size > 0 && file_size > 0) || chunk_size == 0 && file_size == 0);
    }
}

I think this is now resolved (for me).
edit: removed comment; I have pushed code to return 0 immediately when limit=0.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think allowing a zero length is useful for cases where the length is computed dynamically. I've seen code along these lines:

let len = io.read_u64().await? as usize;
let mut buffer = Vec::with_capacity(len);
io.take(len).read_to_end(&mut buffer)?;

if there are length-zero messages, then the above makes use of take(0) behavior sometimes.

@aschweig aschweig force-pushed the recv_many branch 3 times, most recently from 5b95665 to 5d2a8a7 Compare October 8, 2023 03:56
Aaron Schweiger added 6 commits October 9, 2023 08:55
Circumstances may exist where multiple messages are queued for the
receiver by the time the receiving task runs -- and the receiving
task itself may benefit from batch as opposed to item-by-item
processing, e.g., update a row based on a primary key but
only the most recent update is needed; recv_many
enables the receiver to process messages last-first
and skip earlier updates for rows already updated.

The change involved introducing a peek function to sync/mpsc/list.rs;
it is based on fn pop.  fn recv_many returns Poll<Vec<T>> contrasting
recv's Poll<Option<T>>.  The new function returns
Ready(vec![]) where recv returned Ready(None).

The mpsc.rs trait Semaphore was extended with two functions,
num_acquired() and add_permits(n) -- which are used to size
the vector and update the underlying semaphores in a single
call.

Simple tests were added to tests/sync_mpsc.rs.
The implementation uses a passed-in buffer to return results.
The buffer is always cleared, and its capacity may be
increased to match the number of queued messages.
This avoids excessive memory allocation.

This commit also adds new benchmarks in benches/sync_mpsc_recv_many.rs
It borrows substantially from benches/sync_mpsc.rs

The benchmarks are framed around the idea that only the last value
is needed, e.g., what might be typical in a dashboard.

The following benchmarks compare extracting the last value from
5_000 recv's vs. a variable number of recv_many:
  contention_bounded_updater_recv vs.  contention_bounded_updater_recv_many
  contention_bounded_full_updater_recv vs.  contention_bounded_full_updater_recv_many
  contention_unbounded_updater_recv vs. contention_unbounded_updater_recv_many
  uncontented_bounded_updater_recv vs.  uncontented_bounded_updater_recv_many
  uncontented_unbounded_updater_recv vs.  uncontented_unbounded_updater_recv_many

Initial tests suggests a ~30% performance improvement for the
uncontented tests.

The benchmarks contention_bounded_updater_publish_recv and
contention_bounded_updater_publish_recv_many are different;
first the number of messages recevied are only 1_000.
In these test, we contemplate an actor who needs to receive a message
and forward it over an API that takes 1ns to complete.
An actor accepting a single message-at-a-time
will hit the 1ns lag repeatedly; the actor that can pick only
the last message out of the buffer from recv_many has much less work.
Previous benchmarks in sync_mpsc_recv_many.rs included a
contrived example that is removed.
Aaron Schweiger and others added 12 commits October 9, 2023 08:55
The documentation is changed to better highlight the limited
circumstance in which `recv_many` may return 0,
'..will never return 0 unless...'.
Two new tests are somewhat analogous to the doctests were added:
  - send_recv_many_bounded_capacity()
  - send_recv_many_unbounded_capacity()
These ensure that for buffers that have not reached their
capacity a call to `recv_many` only fills up to the capacity
leaving other messages still queued. A subsequent
call on a filled buffer would reserve additional elements.  The test
also looks at pending messages holding the channel open --
compared to the doctest example which showcases non-dropped
senders.
Positive values of `limit` set the maximum number of elements
that can be added to `buffer` in a single call.
A zero value of `limit` sets the maximum number of elements
that can be added to a default value, currently `super::BLOCK_CAP`.
Returning 0 immediately when limit=0 is consistent with
the current take(0) behavior.  Tests were updated.
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for being patient with my reviews. I'm sorry it took so many rounds. I think we're reaching the end.

tokio/src/sync/mpsc/chan.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/chan.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/chan.rs Outdated Show resolved Hide resolved
tokio/tests/sync_mpsc.rs Outdated Show resolved Hide resolved
tokio/tests/sync_mpsc.rs Outdated Show resolved Hide resolved
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! Just a single grammar fix, and I'll merge this.

tokio/src/sync/mpsc/bounded.rs Outdated Show resolved Hide resolved
tokio/src/sync/mpsc/bounded.rs Outdated Show resolved Hide resolved
@aschweig
Copy link
Contributor Author

Thanks for your help on this. I note that there have been changes to master since approval and that github offers the "Update branch" button. Should I rebase this PR or leave as-is?

@Darksonn Darksonn merged commit 881b510 into tokio-rs:master Oct 17, 2023
69 checks passed
@Darksonn
Copy link
Contributor

Sorry for the delay. I've merged it now.

kodiakhq bot pushed a commit to pdylanross/fatigue that referenced this pull request Nov 10, 2023
Bumps tokio from 1.33.0 to 1.34.0.

Release notes
Sourced from tokio's releases.

Tokio v1.34.0
Fixed

io: allow clear_readiness after io driver shutdown (#6067)
io: fix integer overflow in take (#6080)
io: fix I/O resource hang (#6134)
sync: fix broadcast::channel link (#6100)

Changed

macros: use ::core qualified imports instead of ::std inside tokio::test macro (#5973)

Added

fs: update cfg attr in fs::read_dir to include aix (#6075)
sync: add mpsc::Receiver::recv_many (#6010)
tokio: added vita target support (#6094)

#5973: tokio-rs/tokio#5973
#6067: tokio-rs/tokio#6067
#6080: tokio-rs/tokio#6080
#6134: tokio-rs/tokio#6134
#6100: tokio-rs/tokio#6100
#6075: tokio-rs/tokio#6075
#6010: tokio-rs/tokio#6010
#6094: tokio-rs/tokio#6094



Commits

49eb26f chore: prepare Tokio v1.34.0 release (#6138)
19d96c0 io: increase ScheduledIo tick resolution (#6135)
30b2eb1 io: fix possible I/O resource hang (#6134)
8ec3e0d metrics: update stats when unparking in multi-thread (#6131)
161ecec stream: fix typo in peekable docs (#6130)
61fcc3b time: remove cached elapsed value from driver state (#6097)
944024e chore: update rust-version to 1.63 in all crates (#6126)
65f861f stream: add StreamExt::peekable (#6095)
4c85801 ci: fix docs on latest nightly (#6120)
ed32cd1 task: add tests for tracing instrumentation of tasks (#6112)
Additional commits viewable in compare view




Dependabot will resolve any conflicts with this PR as long as you don't alter it yourself. You can also trigger a rebase manually by commenting @dependabot rebase.


Dependabot commands and options

You can trigger Dependabot actions by commenting on this PR:

@dependabot rebase will rebase this PR
@dependabot recreate will recreate this PR, overwriting any edits that have been made to it
@dependabot merge will merge this PR after your CI passes on it
@dependabot squash and merge will squash and merge this PR after your CI passes on it
@dependabot cancel merge will cancel a previously requested merge and block automerging
@dependabot reopen will reopen this PR if it is closed
@dependabot close will close this PR and stop Dependabot recreating it. You can achieve the same result by closing it manually
@dependabot show <dependency name> ignore conditions will show all of the ignore conditions of the specified dependency
@dependabot ignore this major version will close this PR and stop Dependabot creating any more for this major version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this minor version will close this PR and stop Dependabot creating any more for this minor version (unless you reopen the PR or upgrade to it yourself)
@dependabot ignore this dependency will close this PR and stop Dependabot creating any more for this dependency (unless you reopen the PR or upgrade to it yourself)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
A-tokio Area: The main tokio crate M-sync Module: tokio/sync R-loom-sync Run loom sync tests on this PR
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants