Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add is_closed, is_empty and len to mpsc::Receiver and mpsc::UnboundedReceiver #6348

Merged
merged 15 commits into from
Mar 24, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
move empty channel check into is_empty function
changes the Receiver is_closed function to have the same behaviour from the Sender.
adds is_empty to the receiver to check if the channel is empty
  • Loading branch information
balliegojr committed Mar 2, 2024
commit 816aea781af26e518256a10316c9ceef54760a29
7 changes: 3 additions & 4 deletions tokio/src/sync/mpsc/block.rs
Original file line number Diff line number Diff line change
Expand Up @@ -168,18 +168,17 @@ impl<T> Block<T> {
Some(Read::Value(value.assume_init()))
}

/// Returns true if the sender half of the list is closed and there is no
/// value in the slot to be consumed
/// Returns true if there is a value in the slot to be consumed
///
/// # Safety
///
/// To maintain safety, the caller must ensure:
///
/// * No concurrent access to the slot.
pub(crate) fn is_closed_and_empty(&self, slot_index: usize) -> bool {
pub(crate) fn has_value(&self, slot_index: usize) -> bool {
let offset = offset(slot_index);
let ready_bits = self.header.ready_slots.load(Acquire);
!is_ready(ready_bits, offset) && is_tx_closed(ready_bits)
is_ready(ready_bits, offset)
}

/// Writes a value to the block at the given offset.
Expand Down
40 changes: 26 additions & 14 deletions tokio/src/sync/mpsc/bounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -465,8 +465,7 @@ impl<T> Receiver<T> {

/// Checks if a channel is closed.
///
/// This method returns `true` if the channel has been closed and there are
/// no remaining messages in the channel's buffer. The channel is closed
/// This method returns `true` if the channel has been closed. The channel is closed
/// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called.
///
/// [`Sender`]: crate::sync::mpsc::Sender
Expand All @@ -478,25 +477,38 @@ impl<T> Receiver<T> {
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(10);
/// let (_tx, mut rx) = mpsc::channel::<()>(10);
/// assert!(!rx.is_closed());
///
/// tx.send(100).await.unwrap();
/// tx.send(200).await.unwrap();
///
/// rx.close();
/// assert!(!rx.is_closed());
/// assert_eq!(rx.recv().await, Some(100));
///
/// assert!(!rx.is_closed());
/// assert_eq!(rx.recv().await, Some(200));
/// assert!(rx.recv().await.is_none());
///
/// assert!(rx.is_closed());
/// }
/// ```
pub fn is_closed(&mut self) -> bool {
self.chan.is_closed_and_empty()
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}

/// Checks if a channel is empty.
///
/// This method returns `true` if the channel has no messages.
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::channel(10);
/// assert!(rx.is_empty());
///
/// tx.send(0).await.unwrap();
/// assert!(!rx.is_empty());
/// }
///
/// ```
pub fn is_empty(&mut self) -> bool {
self.chan.is_empty()
}

/// Polls to receive the next message on this channel.
Expand Down
16 changes: 9 additions & 7 deletions tokio/src/sync/mpsc/chan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,21 +241,23 @@ impl<T, S: Semaphore> Rx<T, S> {
self.inner.notify_rx_closed.notify_waiters();
}

pub(crate) fn is_closed_and_empty(&mut self) -> bool {
// There two internal states that can represent a closed channel that has no messages
pub(crate) fn is_closed(&self) -> bool {
// There two internal states that can represent a closed channel
//
// 1. When `close` is called.
// In this case, the inner semaphore will be closed, and the channel has no
// outstanding messages if the list is at the tail position.
// In this case, the inner semaphore will be closed.
//
// 2. When all senders are dropped.
// In this case, the semaphore remains unclosed, and the `index` in the list won't
// reach the tail position. It is necessary to check the list if the last block is
// `closed` and has no value available.
// `closed`.
self.inner.semaphore.is_closed() || self.inner.tx_count.load(Acquire) == 0
}

pub(crate) fn is_empty(&mut self) -> bool {
self.inner.rx_fields.with_mut(|rx_fields_ptr| {
let rx_fields = unsafe { &mut *rx_fields_ptr };
(self.inner.semaphore.is_closed() && rx_fields.list.is_at_tail_position(&self.inner.tx))
|| rx_fields.list.is_closed_and_empty(&self.inner.tx)
rx_fields.list.is_empty(&self.inner.tx)
})
}

Expand Down
9 changes: 2 additions & 7 deletions tokio/src/sync/mpsc/list.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,23 +230,18 @@ impl<T> fmt::Debug for Tx<T> {
}

impl<T> Rx<T> {
pub(crate) fn is_closed_and_empty(&mut self, tx: &Tx<T>) -> bool {
pub(crate) fn is_empty(&mut self, tx: &Tx<T>) -> bool {
// Advance `head`, if needed
if self.try_advancing_head() {
self.reclaim_blocks(tx);
}

unsafe {
let block = self.head.as_ref();
block.is_closed_and_empty(self.index)
!block.has_value(self.index)
}
}

pub(crate) fn is_at_tail_position(&mut self, tx: &Tx<T>) -> bool {
let tail_position = tx.tail_position.load(Acquire);
tail_position == self.index
}

/// Pops the next value off the queue.
pub(crate) fn pop(&mut self, tx: &Tx<T>) -> Option<block::Read<T>> {
// Advance `head`, if needed
Expand Down
40 changes: 26 additions & 14 deletions tokio/src/sync/mpsc/unbounded.rs
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,7 @@ impl<T> UnboundedReceiver<T> {

/// Checks if a channel is closed.
///
/// This method returns `true` if the channel has been closed and there are
/// no remaining messages in the channel's buffer. The channel is closed
/// This method returns `true` if the channel has been closed. The channel is closed
/// when all [`UnboundedSender`] have been dropped, or when [`UnboundedReceiver::close`] is called.
///
/// [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender
Expand All @@ -345,25 +344,38 @@ impl<T> UnboundedReceiver<T> {
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::unbounded_channel();
/// let (_tx, mut rx) = mpsc::unbounded_channel::<()>();
/// assert!(!rx.is_closed());
///
/// let _ = tx.send(100);
/// let _ = tx.send(200);
///
/// rx.close();
/// assert!(!rx.is_closed());
/// assert_eq!(rx.recv().await, Some(100));
///
/// assert!(!rx.is_closed());
/// assert_eq!(rx.recv().await, Some(200));
/// assert!(rx.recv().await.is_none());
///
/// assert!(rx.is_closed());
/// }
/// ```
pub fn is_closed(&mut self) -> bool {
self.chan.is_closed_and_empty()
pub fn is_closed(&self) -> bool {
self.chan.is_closed()
}

/// Checks if a channel is empty.
///
/// This method returns `true` if the channel has no messages.
///
/// # Examples
/// ```
/// use tokio::sync::mpsc;
///
/// #[tokio::main]
/// async fn main() {
/// let (tx, mut rx) = mpsc::unbounded_channel();
/// assert!(rx.is_empty());
///
/// tx.send(0).unwrap();
/// assert!(!rx.is_empty());
/// }
///
/// ```
pub fn is_empty(&mut self) -> bool {
self.chan.is_empty()
}

/// Polls to receive the next message on this channel.
Expand Down
Loading
Loading