Skip to content

Commit

Permalink
Filled buffer handling now fail-safe, not fail-fast
Browse files Browse the repository at this point in the history
Implements approach of Darksonn, where the buffer is extended
when the capacity is equal to the length at the time of the call
to recv_many.
  • Loading branch information
Aaron Schweiger authored and Aaron Schweiger committed Sep 28, 2023
1 parent ac1b2bc commit 0a1c510
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 25 deletions.
11 changes: 6 additions & 5 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -239,8 +239,8 @@ impl<T> Receiver<T> {
/// not yet been closed, this method will sleep until a message is sent or
/// the channel is closed.
///
/// Asserts that the passed-in buffer has capacity greater than its
/// length.
/// If at the time of the call the buffer has no unused capacity,
/// `BLOCK_CAP` additional elements are reserved.
///
/// # Example
/// ```
Expand All @@ -256,15 +256,16 @@ impl<T> Receiver<T> {
///
/// // At most 2 at a time
/// let mut buffer : Vec<&str> = Vec::with_capacity(2);
/// assert_eq!(2, rx.recv_many(&mut buffer).await, "did not fill to capacity");
/// assert_eq!(2, rx.recv_many(&mut buffer).await);
/// assert_eq!(vec!["first","second"], buffer);
/// buffer.clear(); // Reuse the buffer
/// assert_eq!(1, rx.recv_many(&mut buffer).await);
/// assert_eq!(vec!["third"], buffer);
/// tx.send("fourth").await.unwrap();
/// tx.send("fifth").await.unwrap();
/// assert_eq!(1, rx.recv_many(&mut buffer).await);
/// assert_eq!(vec!["third","fourth"], buffer);
/// // assert_eq!(2, rx.recv_many(&mut buffer).await); // 'buffer must have non-zero unused capacity'
/// assert_eq!(1, rx.recv_many(&mut buffer).await);
/// assert_eq!(vec!["third","fourth","fifth"], buffer);
/// }
/// ```
pub async fn recv_many(&mut self, buffer: &mut Vec<T>) -> usize {
Expand Down
9 changes: 4 additions & 5 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,14 +297,13 @@ impl<T, S: Semaphore> Rx<T, S> {

/// Receives values into `buffer` up to its capacity
///
/// If the buffer initially has 0 capacity, reserves `super::BLOCK_CAP` elements
/// If the buffer initially has 0 remaining capacity, reserves `super::BLOCK_CAP` elements
pub(crate) fn recv_many(&mut self, cx: &mut Context<'_>, buffer: &mut Vec<T>) -> Poll<usize> {
use super::block::Read;

assert!(
buffer.capacity() > buffer.len(),
"buffer must have non-zero unused capacity"
);
if buffer.len() == buffer.capacity() {
buffer.reserve(super::BLOCK_CAP);
}

let initial_length = buffer.len();

Expand Down
5 changes: 2 additions & 3 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,8 +181,8 @@ impl<T> UnboundedReceiver<T> {
/// not yet been closed, this method will sleep until a message is sent or
/// the channel is closed.
///
/// Asserts that the passed-in buffer has capacity greater than its
/// length.
/// If at the time of the call the buffer has no unused capacity,
/// `BLOCK_CAP` additional elements are reserved.
///
/// # Example:
///
Expand All @@ -201,7 +201,6 @@ impl<T> UnboundedReceiver<T> {
/// assert_eq!(1, rx.recv_many(&mut buffer).await);
/// assert_eq!(vec!["hello"], buffer);
/// assert_eq!(0, rx.recv_many(&mut buffer).await);
/// // assert_eq!(vec!["hello"], buffer);
/// }
pub async fn recv_many(&mut self, buffer: &mut Vec<T>) -> usize {
use crate::future::poll_fn;
Expand Down
13 changes: 1 addition & 12 deletions tokio/tests/sync_mpsc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,17 +120,6 @@ async fn async_send_recv_with_buffer() {
assert_eq!(None, rx.recv().await);
}

#[tokio::test]
#[cfg(feature = "full")]
#[should_panic(expected = "buffer must have non-zero unused capacity")]
async fn async_send_recv_many_zero() {
let (tx, mut rx) = mpsc::channel(2);
assert_ok!(tx.send(7).await);
let mut buffer = vec![0; 0];
assert_eq!(0, buffer.capacity());
assert_eq!(1, rx.recv_many(&mut buffer).await);
}

#[tokio::test]
#[cfg(feature = "full")]
async fn async_send_recv_many_with_buffer() {
Expand Down Expand Up @@ -222,7 +211,7 @@ async fn send_recv_many_unbounded() {
assert_ok!(tx.send(100));
assert_ok!(tx.send(1002));

let mut buffer: Vec<i32> = Vec::with_capacity(4);
let mut buffer: Vec<i32> = Vec::with_capacity(0);
let mut count = 0;
while count < 4 {
count += rx.recv_many(&mut buffer).await;
Expand Down

0 comments on commit 0a1c510

Please sign in to comment.