From 48587f8b48d8ae6258c467bd1ff51043d1ac59a5 Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Thu, 15 Feb 2024 12:42:24 +0100 Subject: [PATCH 01/13] Add `is_closed` to `mpsc::Receiver` and `mpsc::UnboundedReceiver` Fixes: #4638 --- tokio/src/sync/mpsc/block.rs | 14 ++++++ tokio/src/sync/mpsc/bounded.rs | 33 ++++++++++++++ tokio/src/sync/mpsc/chan.rs | 18 ++++++++ tokio/src/sync/mpsc/list.rs | 17 ++++++++ tokio/src/sync/mpsc/unbounded.rs | 33 ++++++++++++++ tokio/tests/sync_mpsc.rs | 75 ++++++++++++++++++++++++++++++++ 6 files changed, 190 insertions(+) diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs index befcfd29efa..0639acef4bf 100644 --- a/tokio/src/sync/mpsc/block.rs +++ b/tokio/src/sync/mpsc/block.rs @@ -168,6 +168,20 @@ impl Block { Some(Read::Value(value.assume_init())) } + /// Returns true if the sender half of the list is closed and there is no + /// value in the slot to be consumed + /// + /// # Safety + /// + /// To maintain safety, the caller must ensure: + /// + /// * No concurrent access to the slot. + pub(crate) fn is_closed_and_empty(&self, slot_index: usize) -> bool { + let offset = offset(slot_index); + let ready_bits = self.header.ready_slots.load(Acquire); + !is_ready(ready_bits, offset) && is_tx_closed(ready_bits) + } + /// Writes a value to the block at the given offset. /// /// # Safety diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 3cdba3dc237..1494589bb92 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -463,6 +463,39 @@ impl Receiver { self.chan.close(); } + /// Checks if a channel is closed. + /// + /// This method returns `true` if the channel has been closed and there are + /// no remaining messages in the channel's buffer. The channel is closed + /// when all senders have been dropped, or when [`close`] is called. + /// + /// # Examples + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(10); + /// assert!(!rx.is_closed()); + /// + /// tx.send(100).await.unwrap(); + /// tx.send(200).await.unwrap(); + /// + /// rx.close(); + /// assert!(!rx.is_closed()); + /// assert_eq!(rx.recv().await, Some(100)); + /// + /// assert!(!rx.is_closed()); + /// assert_eq!(rx.recv().await, Some(200)); + /// assert!(rx.recv().await.is_none()); + /// + /// assert!(rx.is_closed()); + /// } + /// ``` + pub fn is_closed(&mut self) -> bool { + self.chan.is_closed_and_empty() + } + /// Polls to receive the next message on this channel. /// /// This method returns: diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index c05a4abb7c0..f6c590f6642 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -241,6 +241,24 @@ impl Rx { self.inner.notify_rx_closed.notify_waiters(); } + pub(crate) fn is_closed_and_empty(&mut self) -> bool { + // There two internal states that can represent a closed channel that has no messages + // + // 1. When `close` is called. + // In this case, the inner semaphore will be closed, and the channel has no + // outstanding messages if the list is at the tail position. + // + // 2. When all senders are dropped. + // In this case, the semaphore remains unclosed, and the `index` in the list won't + // reach the tail position. It is necessary to check the list if the last block is + // `closed` and has no value available. + self.inner.rx_fields.with_mut(|rx_fields_ptr| { + let rx_fields = unsafe { &mut *rx_fields_ptr }; + (self.inner.semaphore.is_closed() && rx_fields.list.is_at_tail_position(&self.inner.tx)) + || rx_fields.list.is_closed_and_empty(&self.inner.tx) + }) + } + /// Receive the next value pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { use super::block::Read; diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index a8b48a87574..bc184964cdb 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -230,6 +230,23 @@ impl fmt::Debug for Tx { } impl Rx { + pub(crate) fn is_closed_and_empty(&mut self, tx: &Tx) -> bool { + // Advance `head`, if needed + if self.try_advancing_head() { + self.reclaim_blocks(tx); + } + + unsafe { + let block = self.head.as_ref(); + block.is_closed_and_empty(self.index) + } + } + + pub(crate) fn is_at_tail_position(&mut self, tx: &Tx) -> bool { + let tail_position = tx.tail_position.load(Acquire); + tail_position == self.index + } + /// Pops the next value off the queue. pub(crate) fn pop(&mut self, tx: &Tx) -> Option> { // Advance `head`, if needed diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index b87b07ba653..1c228094891 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -330,6 +330,39 @@ impl UnboundedReceiver { self.chan.close(); } + /// Checks if a channel is closed. + /// + /// This method returns `true` if the channel has been closed and there are + /// no remaining messages in the channel's buffer. The channel is closed + /// when all senders have been dropped, or when [`close`] is called. + /// + /// # Examples + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::unbounded_channel(); + /// assert!(!rx.is_closed()); + /// + /// let _ = tx.send(100); + /// let _ = tx.send(200); + /// + /// rx.close(); + /// assert!(!rx.is_closed()); + /// assert_eq!(rx.recv().await, Some(100)); + /// + /// assert!(!rx.is_closed()); + /// assert_eq!(rx.recv().await, Some(200)); + /// assert!(rx.recv().await.is_none()); + /// + /// assert!(rx.is_closed()); + /// } + /// ``` + pub fn is_closed(&mut self) -> bool { + self.chan.is_closed_and_empty() + } + /// Polls to receive the next message on this channel. /// /// This method returns: diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 1b581ce98c1..094833e4e86 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1017,4 +1017,79 @@ async fn test_tx_capacity() { assert_eq!(tx.max_capacity(), 10); } +#[tokio::test] +async fn text_rx_is_closed() { + let (tx, mut rx) = mpsc::channel(10); + + assert!(!rx.is_closed()); + + tx.send(100).await.unwrap(); + tx.send(200).await.unwrap(); + + rx.close(); + assert!(!rx.is_closed()); + + assert_eq!(rx.recv().await, Some(100)); + assert!(!rx.is_closed()); + + assert_eq!(rx.recv().await, Some(200)); + assert!(rx.recv().await.is_none()); + assert!(rx.is_closed()); + + let (tx, mut rx) = mpsc::channel(10); + + assert!(!rx.is_closed()); + + tx.send(100).await.unwrap(); + tx.send(200).await.unwrap(); + + drop(tx); + assert!(!rx.is_closed()); + + assert_eq!(rx.recv().await, Some(100)); + assert!(!rx.is_closed()); + + assert_eq!(rx.recv().await, Some(200)); + assert!(rx.recv().await.is_none()); + assert!(rx.is_closed()); +} + +#[tokio::test] +async fn text_rx_is_closed_unbounded() { + let (tx, mut rx) = mpsc::unbounded_channel(); + assert!(!rx.is_closed()); + + tx.send(100).unwrap(); + tx.send(200).unwrap(); + + rx.close(); + assert!(!rx.is_closed()); + + assert_eq!(rx.recv().await, Some(100)); + assert!(!rx.is_closed()); + + assert_eq!(rx.recv().await, Some(200)); + assert!(rx.recv().await.is_none()); + assert!(rx.is_closed()); + + let (tx, mut rx) = mpsc::unbounded_channel(); + assert!(!rx.is_closed()); + + tx.send(100).unwrap(); + tx.send(200).unwrap(); + + drop(tx); + assert!(!rx.is_closed()); + + assert_eq!(rx.recv().await, Some(100)); + assert!(!rx.is_closed()); + + assert_eq!(rx.recv().await, Some(200)); + assert!(rx.recv().await.is_none()); + assert!(rx.is_closed()); + + rx.close(); + assert!(rx.is_closed()); +} + fn is_debug(_: &T) {} From be9bbeb6a963eee04267fb08da4e64fc34493a00 Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Thu, 15 Feb 2024 13:35:43 +0100 Subject: [PATCH 02/13] fixes documentation links --- tokio/src/sync/mpsc/bounded.rs | 5 ++++- tokio/src/sync/mpsc/unbounded.rs | 5 ++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 1494589bb92..863b5fc28bf 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -467,7 +467,10 @@ impl Receiver { /// /// This method returns `true` if the channel has been closed and there are /// no remaining messages in the channel's buffer. The channel is closed - /// when all senders have been dropped, or when [`close`] is called. + /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called. + /// + /// [`Sender`]: crate::sync::mpsc::Sender + /// [`Receiver::close`]: crate::sync::mpsc::Receiver::close /// /// # Examples /// ``` diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 1c228094891..2678547c8d3 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -334,7 +334,10 @@ impl UnboundedReceiver { /// /// This method returns `true` if the channel has been closed and there are /// no remaining messages in the channel's buffer. The channel is closed - /// when all senders have been dropped, or when [`close`] is called. + /// when all [`UnboundedSender`] have been dropped, or when [`UnboundedReceiver::close`] is called. + /// + /// [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender + /// [`UnboundedReceiver::close`]: crate::sync::mpsc::UnboundedReceiver::close /// /// # Examples /// ``` From ccecf3537c1e7a192ac0ef589a056e7ca9b49918 Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Sat, 24 Feb 2024 21:35:09 +0100 Subject: [PATCH 03/13] splits is_closed tests into multiple smaller tests --- tokio/tests/sync_mpsc.rs | 100 +++++++++++++++++----------------- tokio/tests/sync_mpsc_weak.rs | 9 +++ 2 files changed, 60 insertions(+), 49 deletions(-) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 094833e4e86..7e9573e8d0d 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1018,77 +1018,79 @@ async fn test_tx_capacity() { } #[tokio::test] -async fn text_rx_is_closed() { - let (tx, mut rx) = mpsc::channel(10); - - assert!(!rx.is_closed()); - - tx.send(100).await.unwrap(); - tx.send(200).await.unwrap(); - +async fn test_rx_is_closed_when_calling_close_with_sender() { + // is_closed should return true after calling close but still has a sender + let (_tx, mut rx) = mpsc::channel::<()>(10); rx.close(); - assert!(!rx.is_closed()); - assert_eq!(rx.recv().await, Some(100)); - assert!(!rx.is_closed()); - - assert_eq!(rx.recv().await, Some(200)); - assert!(rx.recv().await.is_none()); assert!(rx.is_closed()); +} - let (tx, mut rx) = mpsc::channel(10); - - assert!(!rx.is_closed()); - - tx.send(100).await.unwrap(); - tx.send(200).await.unwrap(); +#[tokio::test] +async fn test_rx_is_closed_when_dropping_all_senders() { + // is_closed should return true after dropping all senders + let (tx, mut rx) = mpsc::channel::<()>(10); + let another_tx = tx.clone(); + let task = tokio::spawn(async move { + drop(another_tx); + }); drop(tx); - assert!(!rx.is_closed()); + let _ = task.await; - assert_eq!(rx.recv().await, Some(100)); - assert!(!rx.is_closed()); - - assert_eq!(rx.recv().await, Some(200)); - assert!(rx.recv().await.is_none()); assert!(rx.is_closed()); } #[tokio::test] -async fn text_rx_is_closed_unbounded() { - let (tx, mut rx) = mpsc::unbounded_channel(); +async fn test_rx_is_not_closed_when_there_are_senders() { + // is_closed should return false when there is a sender + let (_tx, mut rx) = mpsc::channel::<()>(10); assert!(!rx.is_closed()); +} - tx.send(100).unwrap(); - tx.send(200).unwrap(); - - rx.close(); +#[tokio::test] +async fn test_rx_is_not_closed_when_there_are_senders_and_buffer_filled() { + // is_closed should return false when there is a sender, even if enough messages have been sent to fill the channel + let (tx, mut rx) = mpsc::channel(10); + for i in 0..10 { + assert!(tx.send(i).await.is_ok()); + } assert!(!rx.is_closed()); +} - assert_eq!(rx.recv().await, Some(100)); +#[tokio::test] +async fn test_rx_is_not_closed_when_there_are_messages_but_not_senders() { + // is_closed should return false when there is a permit (but no senders) + let (tx, mut rx) = mpsc::channel(10); + for i in 0..10 { + assert!(tx.send(i).await.is_ok()); + } + drop(tx); assert!(!rx.is_closed()); +} - assert_eq!(rx.recv().await, Some(200)); - assert!(rx.recv().await.is_none()); - assert!(rx.is_closed()); - - let (tx, mut rx) = mpsc::unbounded_channel(); +#[tokio::test] +async fn test_rx_is_not_closed_when_there_are_messages_and_close_is_called() { + // is_closed should return false when there is a permit (but no senders) + let (tx, mut rx) = mpsc::channel(10); + for i in 0..10 { + assert!(tx.send(i).await.is_ok()); + } + rx.close(); assert!(!rx.is_closed()); +} - tx.send(100).unwrap(); - tx.send(200).unwrap(); - +#[tokio::test] +async fn test_rx_is_closed_after_consuming_messages() { + // is_closed should return false when there is a permit (but no senders) + let (tx, mut rx) = mpsc::channel(10); + for i in 0..10 { + assert!(tx.send(i).await.is_ok()); + } drop(tx); - assert!(!rx.is_closed()); - assert_eq!(rx.recv().await, Some(100)); assert!(!rx.is_closed()); - - assert_eq!(rx.recv().await, Some(200)); - assert!(rx.recv().await.is_none()); - assert!(rx.is_closed()); - - rx.close(); + while (rx.recv().await).is_some() {} assert!(rx.is_closed()); } diff --git a/tokio/tests/sync_mpsc_weak.rs b/tokio/tests/sync_mpsc_weak.rs index fad4c72f799..0c4eaf1bfcd 100644 --- a/tokio/tests/sync_mpsc_weak.rs +++ b/tokio/tests/sync_mpsc_weak.rs @@ -511,3 +511,12 @@ fn test_tx_count_weak_unbounded_sender() { assert!(tx_weak.upgrade().is_none() && tx_weak2.upgrade().is_none()); } + +#[tokio::test] +async fn test_rx_is_closed_when_dropping_all_senders_except_weak_senders() { + // is_closed should return true after dropping all senders except for a weak sender + let (tx, mut rx) = mpsc::channel::<()>(10); + let _weak_sender = tx.clone().downgrade(); + drop(tx); + assert!(rx.is_closed()); +} From c1c870b9ff7e6cab9d3ed135de056b4114dc427c Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Sun, 25 Feb 2024 14:53:24 +0100 Subject: [PATCH 04/13] add is_closed tests for unbounded channel --- tokio/tests/sync_mpsc.rs | 67 +++++++++++++++++++++++++++++++++++ tokio/tests/sync_mpsc_weak.rs | 9 +++++ 2 files changed, 76 insertions(+) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 7e9573e8d0d..a69ee79977d 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1094,4 +1094,71 @@ async fn test_rx_is_closed_after_consuming_messages() { assert!(rx.is_closed()); } +#[tokio::test] +async fn test_rx_unbounded_is_closed_when_calling_close_with_sender() { + // is_closed should return true after calling close but still has a sender + let (_tx, mut rx) = mpsc::unbounded_channel::<()>(); + rx.close(); + + assert!(rx.is_closed()); +} + +#[tokio::test] +async fn test_rx_unbounded_is_closed_when_dropping_all_senders() { + // is_closed should return true after dropping all senders + let (tx, mut rx) = mpsc::unbounded_channel::<()>(); + let another_tx = tx.clone(); + let task = tokio::spawn(async move { + drop(another_tx); + }); + + drop(tx); + let _ = task.await; + + assert!(rx.is_closed()); +} + +#[tokio::test] +async fn test_rx_unbounded_is_not_closed_when_there_are_senders() { + // is_closed should return false when there is a sender + let (_tx, mut rx) = mpsc::unbounded_channel::<()>(); + assert!(!rx.is_closed()); +} + +#[tokio::test] +async fn test_rx_unbounded_is_not_closed_when_there_are_messages_but_not_senders() { + // is_closed should return false when there is a permit (but no senders) + let (tx, mut rx) = mpsc::unbounded_channel(); + for i in 0..10 { + assert!(tx.send(i).is_ok()); + } + drop(tx); + assert!(!rx.is_closed()); +} + +#[tokio::test] +async fn test_rx_unbounded_is_not_closed_when_there_are_messages_and_close_is_called() { + // is_closed should return false when there is a permit (but no senders) + let (tx, mut rx) = mpsc::unbounded_channel(); + for i in 0..10 { + assert!(tx.send(i).is_ok()); + } + rx.close(); + assert!(!rx.is_closed()); +} + +#[tokio::test] +async fn test_rx_unbounded_is_closed_after_consuming_messages() { + // is_closed should return false when there is a permit (but no senders) + let (tx, mut rx) = mpsc::unbounded_channel(); + for i in 0..10 { + assert!(tx.send(i).is_ok()); + } + drop(tx); + + assert!(!rx.is_closed()); + while (rx.recv().await).is_some() {} + assert!(rx.is_closed()); +} + fn is_debug(_: &T) {} diff --git a/tokio/tests/sync_mpsc_weak.rs b/tokio/tests/sync_mpsc_weak.rs index 0c4eaf1bfcd..80b248cbd07 100644 --- a/tokio/tests/sync_mpsc_weak.rs +++ b/tokio/tests/sync_mpsc_weak.rs @@ -520,3 +520,12 @@ async fn test_rx_is_closed_when_dropping_all_senders_except_weak_senders() { drop(tx); assert!(rx.is_closed()); } + +#[tokio::test] +async fn test_rx_unbounded_is_closed_when_dropping_all_senders_except_weak_senders() { + // is_closed should return true after dropping all senders except for a weak sender + let (tx, mut rx) = mpsc::unbounded_channel::<()>(); + let _weak_sender = tx.clone().downgrade(); + drop(tx); + assert!(rx.is_closed()); +} From e655f9ecd475e55329d84862512c32f227817da7 Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Mon, 26 Feb 2024 14:36:24 +0100 Subject: [PATCH 05/13] Add permit test for Receiver is_closed function --- tokio/tests/sync_mpsc.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index a69ee79977d..66620d5a025 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1081,8 +1081,16 @@ async fn test_rx_is_not_closed_when_there_are_messages_and_close_is_called() { } #[tokio::test] -async fn test_rx_is_closed_after_consuming_messages() { +async fn test_rx_is_not_closed_when_there_are_permits_but_not_senders() { // is_closed should return false when there is a permit (but no senders) + let (tx, mut rx) = mpsc::channel::<()>(10); + let _permit = tx.reserve_owned().await.expect("Failed to reserve permit"); + assert!(!rx.is_closed()); +} + +#[tokio::test] +async fn test_rx_is_closed_after_consuming_messages() { + // is_closed should return true after consuming messages let (tx, mut rx) = mpsc::channel(10); for i in 0..10 { assert!(tx.send(i).await.is_ok()); From 2d964a24b85190c47cbe31d04a010c743bbc8352 Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Mon, 26 Feb 2024 14:37:05 +0100 Subject: [PATCH 06/13] fix wrong permit comments in Receiver is_closed tests --- tokio/tests/sync_mpsc.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 66620d5a025..c7814161de5 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1060,7 +1060,7 @@ async fn test_rx_is_not_closed_when_there_are_senders_and_buffer_filled() { #[tokio::test] async fn test_rx_is_not_closed_when_there_are_messages_but_not_senders() { - // is_closed should return false when there is a permit (but no senders) + // is_closed should return false when there are messages in the buffer, but no senders let (tx, mut rx) = mpsc::channel(10); for i in 0..10 { assert!(tx.send(i).await.is_ok()); @@ -1071,7 +1071,7 @@ async fn test_rx_is_not_closed_when_there_are_messages_but_not_senders() { #[tokio::test] async fn test_rx_is_not_closed_when_there_are_messages_and_close_is_called() { - // is_closed should return false when there is a permit (but no senders) + // is_closed should return false when there are messages in the buffer, and close is called let (tx, mut rx) = mpsc::channel(10); for i in 0..10 { assert!(tx.send(i).await.is_ok()); @@ -1135,7 +1135,7 @@ async fn test_rx_unbounded_is_not_closed_when_there_are_senders() { #[tokio::test] async fn test_rx_unbounded_is_not_closed_when_there_are_messages_but_not_senders() { - // is_closed should return false when there is a permit (but no senders) + // is_closed should return false when there are messages in the buffer, but no senders let (tx, mut rx) = mpsc::unbounded_channel(); for i in 0..10 { assert!(tx.send(i).is_ok()); @@ -1146,7 +1146,7 @@ async fn test_rx_unbounded_is_not_closed_when_there_are_messages_but_not_senders #[tokio::test] async fn test_rx_unbounded_is_not_closed_when_there_are_messages_and_close_is_called() { - // is_closed should return false when there is a permit (but no senders) + // is_closed should return false when there are messages in the buffer, and close is called let (tx, mut rx) = mpsc::unbounded_channel(); for i in 0..10 { assert!(tx.send(i).is_ok()); @@ -1157,7 +1157,7 @@ async fn test_rx_unbounded_is_not_closed_when_there_are_messages_and_close_is_ca #[tokio::test] async fn test_rx_unbounded_is_closed_after_consuming_messages() { - // is_closed should return false when there is a permit (but no senders) + // is_closed should return true after consuming messages let (tx, mut rx) = mpsc::unbounded_channel(); for i in 0..10 { assert!(tx.send(i).is_ok()); From 816aea781af26e518256a10316c9ceef54760a29 Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Sat, 2 Mar 2024 21:40:42 +0100 Subject: [PATCH 07/13] move empty channel check into is_empty function changes the Receiver is_closed function to have the same behaviour from the Sender. adds is_empty to the receiver to check if the channel is empty --- tokio/src/sync/mpsc/block.rs | 7 +- tokio/src/sync/mpsc/bounded.rs | 40 ++++++--- tokio/src/sync/mpsc/chan.rs | 16 ++-- tokio/src/sync/mpsc/list.rs | 9 +- tokio/src/sync/mpsc/unbounded.rs | 40 ++++++--- tokio/tests/sync_mpsc.rs | 145 ++++++++++++++++++++++++------- tokio/tests/sync_mpsc_weak.rs | 4 +- 7 files changed, 183 insertions(+), 78 deletions(-) diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs index 0639acef4bf..c8fe2066fdd 100644 --- a/tokio/src/sync/mpsc/block.rs +++ b/tokio/src/sync/mpsc/block.rs @@ -168,18 +168,17 @@ impl Block { Some(Read::Value(value.assume_init())) } - /// Returns true if the sender half of the list is closed and there is no - /// value in the slot to be consumed + /// Returns true if there is a value in the slot to be consumed /// /// # Safety /// /// To maintain safety, the caller must ensure: /// /// * No concurrent access to the slot. - pub(crate) fn is_closed_and_empty(&self, slot_index: usize) -> bool { + pub(crate) fn has_value(&self, slot_index: usize) -> bool { let offset = offset(slot_index); let ready_bits = self.header.ready_slots.load(Acquire); - !is_ready(ready_bits, offset) && is_tx_closed(ready_bits) + is_ready(ready_bits, offset) } /// Writes a value to the block at the given offset. diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 863b5fc28bf..a98f07f0cfd 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -465,8 +465,7 @@ impl Receiver { /// Checks if a channel is closed. /// - /// This method returns `true` if the channel has been closed and there are - /// no remaining messages in the channel's buffer. The channel is closed + /// This method returns `true` if the channel has been closed. The channel is closed /// when all [`Sender`] have been dropped, or when [`Receiver::close`] is called. /// /// [`Sender`]: crate::sync::mpsc::Sender @@ -478,25 +477,38 @@ impl Receiver { /// /// #[tokio::main] /// async fn main() { - /// let (tx, mut rx) = mpsc::channel(10); + /// let (_tx, mut rx) = mpsc::channel::<()>(10); /// assert!(!rx.is_closed()); /// - /// tx.send(100).await.unwrap(); - /// tx.send(200).await.unwrap(); - /// /// rx.close(); - /// assert!(!rx.is_closed()); - /// assert_eq!(rx.recv().await, Some(100)); - /// - /// assert!(!rx.is_closed()); - /// assert_eq!(rx.recv().await, Some(200)); - /// assert!(rx.recv().await.is_none()); /// /// assert!(rx.is_closed()); /// } /// ``` - pub fn is_closed(&mut self) -> bool { - self.chan.is_closed_and_empty() + pub fn is_closed(&self) -> bool { + self.chan.is_closed() + } + + /// Checks if a channel is empty. + /// + /// This method returns `true` if the channel has no messages. + /// + /// # Examples + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::channel(10); + /// assert!(rx.is_empty()); + /// + /// tx.send(0).await.unwrap(); + /// assert!(!rx.is_empty()); + /// } + /// + /// ``` + pub fn is_empty(&mut self) -> bool { + self.chan.is_empty() } /// Polls to receive the next message on this channel. diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index f6c590f6642..16f00321438 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -241,21 +241,23 @@ impl Rx { self.inner.notify_rx_closed.notify_waiters(); } - pub(crate) fn is_closed_and_empty(&mut self) -> bool { - // There two internal states that can represent a closed channel that has no messages + pub(crate) fn is_closed(&self) -> bool { + // There two internal states that can represent a closed channel // // 1. When `close` is called. - // In this case, the inner semaphore will be closed, and the channel has no - // outstanding messages if the list is at the tail position. + // In this case, the inner semaphore will be closed. // // 2. When all senders are dropped. // In this case, the semaphore remains unclosed, and the `index` in the list won't // reach the tail position. It is necessary to check the list if the last block is - // `closed` and has no value available. + // `closed`. + self.inner.semaphore.is_closed() || self.inner.tx_count.load(Acquire) == 0 + } + + pub(crate) fn is_empty(&mut self) -> bool { self.inner.rx_fields.with_mut(|rx_fields_ptr| { let rx_fields = unsafe { &mut *rx_fields_ptr }; - (self.inner.semaphore.is_closed() && rx_fields.list.is_at_tail_position(&self.inner.tx)) - || rx_fields.list.is_closed_and_empty(&self.inner.tx) + rx_fields.list.is_empty(&self.inner.tx) }) } diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index bc184964cdb..0ba3986c665 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -230,7 +230,7 @@ impl fmt::Debug for Tx { } impl Rx { - pub(crate) fn is_closed_and_empty(&mut self, tx: &Tx) -> bool { + pub(crate) fn is_empty(&mut self, tx: &Tx) -> bool { // Advance `head`, if needed if self.try_advancing_head() { self.reclaim_blocks(tx); @@ -238,15 +238,10 @@ impl Rx { unsafe { let block = self.head.as_ref(); - block.is_closed_and_empty(self.index) + !block.has_value(self.index) } } - pub(crate) fn is_at_tail_position(&mut self, tx: &Tx) -> bool { - let tail_position = tx.tail_position.load(Acquire); - tail_position == self.index - } - /// Pops the next value off the queue. pub(crate) fn pop(&mut self, tx: &Tx) -> Option> { // Advance `head`, if needed diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 2678547c8d3..534b94748bc 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -332,8 +332,7 @@ impl UnboundedReceiver { /// Checks if a channel is closed. /// - /// This method returns `true` if the channel has been closed and there are - /// no remaining messages in the channel's buffer. The channel is closed + /// This method returns `true` if the channel has been closed. The channel is closed /// when all [`UnboundedSender`] have been dropped, or when [`UnboundedReceiver::close`] is called. /// /// [`UnboundedSender`]: crate::sync::mpsc::UnboundedSender @@ -345,25 +344,38 @@ impl UnboundedReceiver { /// /// #[tokio::main] /// async fn main() { - /// let (tx, mut rx) = mpsc::unbounded_channel(); + /// let (_tx, mut rx) = mpsc::unbounded_channel::<()>(); /// assert!(!rx.is_closed()); /// - /// let _ = tx.send(100); - /// let _ = tx.send(200); - /// /// rx.close(); - /// assert!(!rx.is_closed()); - /// assert_eq!(rx.recv().await, Some(100)); - /// - /// assert!(!rx.is_closed()); - /// assert_eq!(rx.recv().await, Some(200)); - /// assert!(rx.recv().await.is_none()); /// /// assert!(rx.is_closed()); /// } /// ``` - pub fn is_closed(&mut self) -> bool { - self.chan.is_closed_and_empty() + pub fn is_closed(&self) -> bool { + self.chan.is_closed() + } + + /// Checks if a channel is empty. + /// + /// This method returns `true` if the channel has no messages. + /// + /// # Examples + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, mut rx) = mpsc::unbounded_channel(); + /// assert!(rx.is_empty()); + /// + /// tx.send(0).unwrap(); + /// assert!(!rx.is_empty()); + /// } + /// + /// ``` + pub fn is_empty(&mut self) -> bool { + self.chan.is_empty() } /// Polls to receive the next message on this channel. diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index c7814161de5..83d3745e883 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1029,7 +1029,7 @@ async fn test_rx_is_closed_when_calling_close_with_sender() { #[tokio::test] async fn test_rx_is_closed_when_dropping_all_senders() { // is_closed should return true after dropping all senders - let (tx, mut rx) = mpsc::channel::<()>(10); + let (tx, rx) = mpsc::channel::<()>(10); let another_tx = tx.clone(); let task = tokio::spawn(async move { drop(another_tx); @@ -1044,14 +1044,14 @@ async fn test_rx_is_closed_when_dropping_all_senders() { #[tokio::test] async fn test_rx_is_not_closed_when_there_are_senders() { // is_closed should return false when there is a sender - let (_tx, mut rx) = mpsc::channel::<()>(10); + let (_tx, rx) = mpsc::channel::<()>(10); assert!(!rx.is_closed()); } #[tokio::test] async fn test_rx_is_not_closed_when_there_are_senders_and_buffer_filled() { // is_closed should return false when there is a sender, even if enough messages have been sent to fill the channel - let (tx, mut rx) = mpsc::channel(10); + let (tx, rx) = mpsc::channel(10); for i in 0..10 { assert!(tx.send(i).await.is_ok()); } @@ -1059,47 +1059,94 @@ async fn test_rx_is_not_closed_when_there_are_senders_and_buffer_filled() { } #[tokio::test] -async fn test_rx_is_not_closed_when_there_are_messages_but_not_senders() { - // is_closed should return false when there are messages in the buffer, but no senders - let (tx, mut rx) = mpsc::channel(10); +async fn test_rx_is_closed_when_there_are_no_senders_and_there_are_messages() { + // is_closed should return true when there are messages in the buffer, but no senders + let (tx, rx) = mpsc::channel(10); for i in 0..10 { assert!(tx.send(i).await.is_ok()); } drop(tx); - assert!(!rx.is_closed()); + assert!(rx.is_closed()); } #[tokio::test] -async fn test_rx_is_not_closed_when_there_are_messages_and_close_is_called() { - // is_closed should return false when there are messages in the buffer, and close is called +async fn test_rx_is_closed_when_there_are_messages_and_close_is_called() { + // is_closed should return true when there are messages in the buffer, and close is called let (tx, mut rx) = mpsc::channel(10); for i in 0..10 { assert!(tx.send(i).await.is_ok()); } rx.close(); - assert!(!rx.is_closed()); + assert!(rx.is_closed()); } #[tokio::test] async fn test_rx_is_not_closed_when_there_are_permits_but_not_senders() { // is_closed should return false when there is a permit (but no senders) - let (tx, mut rx) = mpsc::channel::<()>(10); + let (tx, rx) = mpsc::channel::<()>(10); let _permit = tx.reserve_owned().await.expect("Failed to reserve permit"); assert!(!rx.is_closed()); } #[tokio::test] -async fn test_rx_is_closed_after_consuming_messages() { - // is_closed should return true after consuming messages +async fn test_rx_is_empty_when_no_messages_were_sent() { + let (_tx, mut rx) = mpsc::channel::<()>(10); + assert!(rx.is_empty()) +} + +#[tokio::test] +async fn test_rx_is_not_empty_when_there_are_messages_in_the_buffer() { + let (tx, mut rx) = mpsc::channel::<()>(10); + assert!(tx.send(()).await.is_ok()); + assert!(!rx.is_empty()) +} + +#[tokio::test] +async fn test_rx_is_not_empty_when_the_buffer_is_full() { + let (tx, mut rx) = mpsc::channel(10); + for i in 0..10 { + assert!(tx.send(i).await.is_ok()); + } + assert!(!rx.is_empty()) +} + +#[tokio::test] +async fn test_rx_is_not_empty_when_all_but_one_messages_are_consumed() { + let (tx, mut rx) = mpsc::channel(10); + for i in 0..10 { + assert!(tx.send(i).await.is_ok()); + } + + for _ in 0..9 { + assert!(rx.recv().await.is_some()); + } + + assert!(!rx.is_empty()) +} + +#[tokio::test] +async fn test_rx_is_empty_when_all_messages_are_consumed() { + let (tx, mut rx) = mpsc::channel(10); + for i in 0..10 { + assert!(tx.send(i).await.is_ok()); + } + while rx.try_recv().is_ok() {} + assert!(rx.is_empty()) +} + +#[tokio::test] +async fn test_rx_is_empty_all_senders_are_dropped_and_messages_consumed() { let (tx, mut rx) = mpsc::channel(10); for i in 0..10 { assert!(tx.send(i).await.is_ok()); } drop(tx); - assert!(!rx.is_closed()); - while (rx.recv().await).is_some() {} - assert!(rx.is_closed()); + for _ in 0..10 { + assert!(rx.recv().await.is_some()); + } + + assert!(rx.is_empty()) } #[tokio::test] @@ -1114,7 +1161,7 @@ async fn test_rx_unbounded_is_closed_when_calling_close_with_sender() { #[tokio::test] async fn test_rx_unbounded_is_closed_when_dropping_all_senders() { // is_closed should return true after dropping all senders - let (tx, mut rx) = mpsc::unbounded_channel::<()>(); + let (tx, rx) = mpsc::unbounded_channel::<()>(); let another_tx = tx.clone(); let task = tokio::spawn(async move { drop(another_tx); @@ -1129,44 +1176,82 @@ async fn test_rx_unbounded_is_closed_when_dropping_all_senders() { #[tokio::test] async fn test_rx_unbounded_is_not_closed_when_there_are_senders() { // is_closed should return false when there is a sender - let (_tx, mut rx) = mpsc::unbounded_channel::<()>(); + let (_tx, rx) = mpsc::unbounded_channel::<()>(); assert!(!rx.is_closed()); } #[tokio::test] -async fn test_rx_unbounded_is_not_closed_when_there_are_messages_but_not_senders() { - // is_closed should return false when there are messages in the buffer, but no senders - let (tx, mut rx) = mpsc::unbounded_channel(); +async fn test_rx_unbounded_is_closed_when_there_are_no_senders_and_there_are_messages() { + // is_closed should return true when there are messages in the buffer, but no senders + let (tx, rx) = mpsc::unbounded_channel(); for i in 0..10 { assert!(tx.send(i).is_ok()); } drop(tx); - assert!(!rx.is_closed()); + assert!(rx.is_closed()); } #[tokio::test] -async fn test_rx_unbounded_is_not_closed_when_there_are_messages_and_close_is_called() { - // is_closed should return false when there are messages in the buffer, and close is called +async fn test_rx_unbounded_is_closed_when_there_are_messages_and_close_is_called() { + // is_closed should return true when there are messages in the buffer, and close is called let (tx, mut rx) = mpsc::unbounded_channel(); for i in 0..10 { assert!(tx.send(i).is_ok()); } rx.close(); - assert!(!rx.is_closed()); + assert!(rx.is_closed()); +} + +#[tokio::test] +async fn test_rx_unbounded_is_empty_when_no_messages_were_sent() { + let (_tx, mut rx) = mpsc::unbounded_channel::<()>(); + assert!(rx.is_empty()) +} + +#[tokio::test] +async fn test_rx_unbounded_is_not_empty_when_there_are_messages_in_the_buffer() { + let (tx, mut rx) = mpsc::unbounded_channel(); + assert!(tx.send(()).is_ok()); + assert!(!rx.is_empty()) +} + +#[tokio::test] +async fn test_rx_unbounded_is_not_empty_when_all_but_one_messages_are_consumed() { + let (tx, mut rx) = mpsc::unbounded_channel(); + for i in 0..10 { + assert!(tx.send(i).is_ok()); + } + + for _ in 0..9 { + assert!(rx.recv().await.is_some()); + } + + assert!(!rx.is_empty()) } #[tokio::test] -async fn test_rx_unbounded_is_closed_after_consuming_messages() { - // is_closed should return true after consuming messages +async fn test_rx_unbounded_is_empty_when_all_messages_are_consumed() { + let (tx, mut rx) = mpsc::unbounded_channel(); + for i in 0..10 { + assert!(tx.send(i).is_ok()); + } + while rx.try_recv().is_ok() {} + assert!(rx.is_empty()) +} + +#[tokio::test] +async fn test_rx_unbounded_is_empty_all_senders_are_dropped_and_messages_consumed() { let (tx, mut rx) = mpsc::unbounded_channel(); for i in 0..10 { assert!(tx.send(i).is_ok()); } drop(tx); - assert!(!rx.is_closed()); - while (rx.recv().await).is_some() {} - assert!(rx.is_closed()); + for _ in 0..10 { + assert!(rx.recv().await.is_some()); + } + + assert!(rx.is_empty()) } fn is_debug(_: &T) {} diff --git a/tokio/tests/sync_mpsc_weak.rs b/tokio/tests/sync_mpsc_weak.rs index 80b248cbd07..ec92f29f4b6 100644 --- a/tokio/tests/sync_mpsc_weak.rs +++ b/tokio/tests/sync_mpsc_weak.rs @@ -515,7 +515,7 @@ fn test_tx_count_weak_unbounded_sender() { #[tokio::test] async fn test_rx_is_closed_when_dropping_all_senders_except_weak_senders() { // is_closed should return true after dropping all senders except for a weak sender - let (tx, mut rx) = mpsc::channel::<()>(10); + let (tx, rx) = mpsc::channel::<()>(10); let _weak_sender = tx.clone().downgrade(); drop(tx); assert!(rx.is_closed()); @@ -524,7 +524,7 @@ async fn test_rx_is_closed_when_dropping_all_senders_except_weak_senders() { #[tokio::test] async fn test_rx_unbounded_is_closed_when_dropping_all_senders_except_weak_senders() { // is_closed should return true after dropping all senders except for a weak sender - let (tx, mut rx) = mpsc::unbounded_channel::<()>(); + let (tx, rx) = mpsc::unbounded_channel::<()>(); let _weak_sender = tx.clone().downgrade(); drop(tx); assert!(rx.is_closed()); From 8a3851abfe72b7702ff1fc4051da0e9bbc25513e Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Sun, 3 Mar 2024 13:47:18 +0100 Subject: [PATCH 08/13] remove mut requirement from the is_empty function --- tokio/src/sync/mpsc/bounded.rs | 4 ++-- tokio/src/sync/mpsc/chan.rs | 8 ++++---- tokio/src/sync/mpsc/list.rs | 7 +------ tokio/src/sync/mpsc/unbounded.rs | 4 ++-- tokio/tests/sync_mpsc.rs | 10 +++++----- 5 files changed, 14 insertions(+), 19 deletions(-) diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index a98f07f0cfd..84b7748c6cf 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -499,7 +499,7 @@ impl Receiver { /// /// #[tokio::main] /// async fn main() { - /// let (tx, mut rx) = mpsc::channel(10); + /// let (tx, rx) = mpsc::channel(10); /// assert!(rx.is_empty()); /// /// tx.send(0).await.unwrap(); @@ -507,7 +507,7 @@ impl Receiver { /// } /// /// ``` - pub fn is_empty(&mut self) -> bool { + pub fn is_empty(&self) -> bool { self.chan.is_empty() } diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 16f00321438..0c6d2d0448b 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -254,10 +254,10 @@ impl Rx { self.inner.semaphore.is_closed() || self.inner.tx_count.load(Acquire) == 0 } - pub(crate) fn is_empty(&mut self) -> bool { - self.inner.rx_fields.with_mut(|rx_fields_ptr| { - let rx_fields = unsafe { &mut *rx_fields_ptr }; - rx_fields.list.is_empty(&self.inner.tx) + pub(crate) fn is_empty(&self) -> bool { + self.inner.rx_fields.with(|rx_fields_ptr| { + let rx_fields = unsafe { &*rx_fields_ptr }; + rx_fields.list.is_empty() }) } diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index 0ba3986c665..ade16e2b349 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -230,12 +230,7 @@ impl fmt::Debug for Tx { } impl Rx { - pub(crate) fn is_empty(&mut self, tx: &Tx) -> bool { - // Advance `head`, if needed - if self.try_advancing_head() { - self.reclaim_blocks(tx); - } - + pub(crate) fn is_empty(&self) -> bool { unsafe { let block = self.head.as_ref(); !block.has_value(self.index) diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 534b94748bc..9f946bc8206 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -366,7 +366,7 @@ impl UnboundedReceiver { /// /// #[tokio::main] /// async fn main() { - /// let (tx, mut rx) = mpsc::unbounded_channel(); + /// let (tx, rx) = mpsc::unbounded_channel(); /// assert!(rx.is_empty()); /// /// tx.send(0).unwrap(); @@ -374,7 +374,7 @@ impl UnboundedReceiver { /// } /// /// ``` - pub fn is_empty(&mut self) -> bool { + pub fn is_empty(&self) -> bool { self.chan.is_empty() } diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 83d3745e883..83420ee4a0f 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1090,20 +1090,20 @@ async fn test_rx_is_not_closed_when_there_are_permits_but_not_senders() { #[tokio::test] async fn test_rx_is_empty_when_no_messages_were_sent() { - let (_tx, mut rx) = mpsc::channel::<()>(10); + let (_tx, rx) = mpsc::channel::<()>(10); assert!(rx.is_empty()) } #[tokio::test] async fn test_rx_is_not_empty_when_there_are_messages_in_the_buffer() { - let (tx, mut rx) = mpsc::channel::<()>(10); + let (tx, rx) = mpsc::channel::<()>(10); assert!(tx.send(()).await.is_ok()); assert!(!rx.is_empty()) } #[tokio::test] async fn test_rx_is_not_empty_when_the_buffer_is_full() { - let (tx, mut rx) = mpsc::channel(10); + let (tx, rx) = mpsc::channel(10); for i in 0..10 { assert!(tx.send(i).await.is_ok()); } @@ -1204,13 +1204,13 @@ async fn test_rx_unbounded_is_closed_when_there_are_messages_and_close_is_called #[tokio::test] async fn test_rx_unbounded_is_empty_when_no_messages_were_sent() { - let (_tx, mut rx) = mpsc::unbounded_channel::<()>(); + let (_tx, rx) = mpsc::unbounded_channel::<()>(); assert!(rx.is_empty()) } #[tokio::test] async fn test_rx_unbounded_is_not_empty_when_there_are_messages_in_the_buffer() { - let (tx, mut rx) = mpsc::unbounded_channel(); + let (tx, rx) = mpsc::unbounded_channel(); assert!(tx.send(()).is_ok()); assert!(!rx.is_empty()) } From ab5278be2baceb7cb84861c1cc967024df4db057 Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Sun, 3 Mar 2024 13:47:41 +0100 Subject: [PATCH 09/13] add len function to bounded and unbounded receivers --- tokio/src/sync/mpsc/block.rs | 5 ++ tokio/src/sync/mpsc/bounded.rs | 19 ++++++ tokio/src/sync/mpsc/chan.rs | 7 ++ tokio/src/sync/mpsc/list.rs | 15 +++++ tokio/src/sync/mpsc/unbounded.rs | 19 ++++++ tokio/tests/sync_mpsc.rs | 108 +++++++++++++++++++++++++++++++ 6 files changed, 173 insertions(+) diff --git a/tokio/src/sync/mpsc/block.rs b/tokio/src/sync/mpsc/block.rs index c8fe2066fdd..de204de9ad4 100644 --- a/tokio/src/sync/mpsc/block.rs +++ b/tokio/src/sync/mpsc/block.rs @@ -208,6 +208,11 @@ impl Block { self.header.ready_slots.fetch_or(TX_CLOSED, Release); } + pub(crate) unsafe fn is_closed(&self) -> bool { + let ready_bits = self.header.ready_slots.load(Acquire); + is_tx_closed(ready_bits) + } + /// Resets the block to a blank state. This enables reusing blocks in the /// channel. /// diff --git a/tokio/src/sync/mpsc/bounded.rs b/tokio/src/sync/mpsc/bounded.rs index 84b7748c6cf..4e768e4c85b 100644 --- a/tokio/src/sync/mpsc/bounded.rs +++ b/tokio/src/sync/mpsc/bounded.rs @@ -511,6 +511,25 @@ impl Receiver { self.chan.is_empty() } + /// Returns the number of messages in the channel. + /// + /// # Examples + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::channel(10); + /// assert_eq!(0, rx.len()); + /// + /// tx.send(0).await.unwrap(); + /// assert_eq!(1, rx.len()); + /// } + /// ``` + pub fn len(&self) -> usize { + self.chan.len() + } + /// Polls to receive the next message on this channel. /// /// This method returns: diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index 0c6d2d0448b..e3a4f56b56c 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -261,6 +261,13 @@ impl Rx { }) } + pub(crate) fn len(&self) -> usize { + self.inner.rx_fields.with(|rx_fields_ptr| { + let rx_fields = unsafe { &*rx_fields_ptr }; + rx_fields.list.len(&self.inner.tx) + }) + } + /// Receive the next value pub(crate) fn recv(&mut self, cx: &mut Context<'_>) -> Poll> { use super::block::Read; diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index ade16e2b349..473a6af4edd 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -237,6 +237,21 @@ impl Rx { } } + pub(crate) fn len(&self, tx: &Tx) -> usize { + let tail_position = tx.tail_position.load(Acquire); + let tail = tx.block_tail.load(Acquire); + + unsafe { + let tail_block = &mut *tail; + + if tail_block.is_closed() { + tail_position - self.index - 1 + } else { + tail_position - self.index + } + } + } + /// Pops the next value off the queue. pub(crate) fn pop(&mut self, tx: &Tx) -> Option> { // Advance `head`, if needed diff --git a/tokio/src/sync/mpsc/unbounded.rs b/tokio/src/sync/mpsc/unbounded.rs index 9f946bc8206..48518a4a492 100644 --- a/tokio/src/sync/mpsc/unbounded.rs +++ b/tokio/src/sync/mpsc/unbounded.rs @@ -378,6 +378,25 @@ impl UnboundedReceiver { self.chan.is_empty() } + /// Returns the number of messages in the channel. + /// + /// # Examples + /// ``` + /// use tokio::sync::mpsc; + /// + /// #[tokio::main] + /// async fn main() { + /// let (tx, rx) = mpsc::unbounded_channel(); + /// assert_eq!(0, rx.len()); + /// + /// tx.send(0).unwrap(); + /// assert_eq!(1, rx.len()); + /// } + /// ``` + pub fn len(&self) -> usize { + self.chan.len() + } + /// Polls to receive the next message on this channel. /// /// This method returns: diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index 83420ee4a0f..d234243d130 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1149,6 +1149,60 @@ async fn test_rx_is_empty_all_senders_are_dropped_and_messages_consumed() { assert!(rx.is_empty()) } +#[tokio::test] +async fn test_rx_len_on_empty_channel() { + let (_tx, rx) = mpsc::channel::<()>(100); + assert_eq!(rx.len(), 0); +} + +#[tokio::test] +async fn test_rx_len_on_empty_channel_without_senders() { + // when all senders are dropped, a "closed" value is added to the end of the linked list. + // here we test that the "closed" value does not change the len of the channel. + + let (tx, rx) = mpsc::channel::<()>(100); + drop(tx); + assert_eq!(rx.len(), 0); +} + +#[tokio::test] +async fn test_rx_len_on_filled_channel() { + let (tx, rx) = mpsc::channel(100); + + for i in 0..100 { + assert!(tx.send(i).await.is_ok()); + } + assert_eq!(rx.len(), 100); +} + +#[tokio::test] +async fn test_rx_len_on_filled_channel_without_senders() { + let (tx, rx) = mpsc::channel(100); + + for i in 0..100 { + assert!(tx.send(i).await.is_ok()); + } + drop(tx); + assert_eq!(rx.len(), 100); +} + +#[tokio::test] +async fn test_rx_len_when_consuming_all_messages() { + let (tx, mut rx) = mpsc::channel(100); + + for i in 0..100 { + assert!(tx.send(i).await.is_ok()); + assert_eq!(rx.len(), i + 1); + } + + drop(tx); + + for i in (0..100).rev() { + assert!(rx.recv().await.is_some()); + assert_eq!(rx.len(), i); + } +} + #[tokio::test] async fn test_rx_unbounded_is_closed_when_calling_close_with_sender() { // is_closed should return true after calling close but still has a sender @@ -1254,4 +1308,58 @@ async fn test_rx_unbounded_is_empty_all_senders_are_dropped_and_messages_consume assert!(rx.is_empty()) } +#[tokio::test] +async fn test_rx_unbounded_len_on_empty_channel() { + let (_tx, rx) = mpsc::unbounded_channel::<()>(); + assert_eq!(rx.len(), 0); +} + +#[tokio::test] +async fn test_rx_unbounded_len_on_empty_channel_without_senders() { + // when all senders are dropped, a "closed" value is added to the end of the linked list. + // here we test that the "closed" value does not change the len of the channel. + + let (tx, rx) = mpsc::unbounded_channel::<()>(); + drop(tx); + assert_eq!(rx.len(), 0); +} + +#[tokio::test] +async fn test_rx_unbounded_len_with_multiple_messages() { + let (tx, rx) = mpsc::unbounded_channel(); + + for i in 0..100 { + assert!(tx.send(i).is_ok()); + } + assert_eq!(rx.len(), 100); +} + +#[tokio::test] +async fn test_rx_unbounded_len_with_multiple_messages_and_dropped_senders() { + let (tx, rx) = mpsc::unbounded_channel(); + + for i in 0..100 { + assert!(tx.send(i).is_ok()); + } + drop(tx); + assert_eq!(rx.len(), 100); +} + +#[tokio::test] +async fn test_rx_unbounded_len_when_consuming_all_messages() { + let (tx, mut rx) = mpsc::unbounded_channel(); + + for i in 0..100 { + assert!(tx.send(i).is_ok()); + assert_eq!(rx.len(), i + 1); + } + + drop(tx); + + for i in (0..100).rev() { + assert!(rx.recv().await.is_some()); + assert_eq!(rx.len(), i); + } +} + fn is_debug(_: &T) {} From 8d505ddb33495af644d345a1f4365193e4b29861 Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Fri, 8 Mar 2024 13:07:34 +0100 Subject: [PATCH 10/13] reduces the unsafe scope in the len function --- tokio/src/sync/mpsc/list.rs | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index 473a6af4edd..01861adea4d 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -241,14 +241,15 @@ impl Rx { let tail_position = tx.tail_position.load(Acquire); let tail = tx.block_tail.load(Acquire); - unsafe { + let is_closed = unsafe { let tail_block = &mut *tail; + tail_block.is_closed() + }; - if tail_block.is_closed() { - tail_position - self.index - 1 - } else { - tail_position - self.index - } + if is_closed { + tail_position - self.index - 1 + } else { + tail_position - self.index } } From 1e50d9fdd60f6338494c2de25d16f03289a2a91a Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Sat, 9 Mar 2024 21:44:37 +0100 Subject: [PATCH 11/13] fixes the is_empty function --- tokio/src/sync/mpsc/chan.rs | 2 +- tokio/src/sync/mpsc/list.rs | 28 +++++++++++++++++++++---- tokio/src/sync/tests/loom_mpsc.rs | 34 +++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 5 deletions(-) diff --git a/tokio/src/sync/mpsc/chan.rs b/tokio/src/sync/mpsc/chan.rs index e3a4f56b56c..bd1c17533a5 100644 --- a/tokio/src/sync/mpsc/chan.rs +++ b/tokio/src/sync/mpsc/chan.rs @@ -257,7 +257,7 @@ impl Rx { pub(crate) fn is_empty(&self) -> bool { self.inner.rx_fields.with(|rx_fields_ptr| { let rx_fields = unsafe { &*rx_fields_ptr }; - rx_fields.list.is_empty() + rx_fields.list.is_empty(&self.inner.tx) }) } diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index 01861adea4d..8f53f680780 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -230,10 +230,30 @@ impl fmt::Debug for Tx { } impl Rx { - pub(crate) fn is_empty(&self) -> bool { - unsafe { - let block = self.head.as_ref(); - !block.has_value(self.index) + pub(crate) fn is_empty(&self, tx: &Tx) -> bool { + let block = unsafe { self.head.as_ref() }; + if block.has_value(self.index) { + return false; + } + + // It is possible that a block has no value "now" but the list is still not empty. + // To be sure, it is necessary to check the tail position against the current index. + // + // One edge case is when all the senders are dropped, there will be a last block in the + // tail position, but it will be closed + + let tail_position = tx.tail_position.load(Acquire); + if self.index == tail_position { + true + } else if tail_position - self.index == 1 { + let tail = tx.block_tail.load(Acquire); + + unsafe { + let tail_block = &mut *tail; + tail_block.is_closed() + } + } else { + false } } diff --git a/tokio/src/sync/tests/loom_mpsc.rs b/tokio/src/sync/tests/loom_mpsc.rs index f165e7076e7..1dbe5ea419c 100644 --- a/tokio/src/sync/tests/loom_mpsc.rs +++ b/tokio/src/sync/tests/loom_mpsc.rs @@ -188,3 +188,37 @@ fn try_recv() { } }); } + +#[test] +fn len_nonzero_after_send() { + loom::model(|| { + let (send, recv) = mpsc::channel(10); + let send2 = send.clone(); + + let join = thread::spawn(move || { + block_on(send2.send("message2")).unwrap(); + }); + + block_on(send.send("message1")).unwrap(); + assert!(recv.len() != 0); + + join.join().unwrap(); + }); +} + +#[test] +fn nonempty_after_send() { + loom::model(|| { + let (send, recv) = mpsc::channel(10); + let send2 = send.clone(); + + let join = thread::spawn(move || { + block_on(send2.send("message2")).unwrap(); + }); + + block_on(send.send("message1")).unwrap(); + assert!(!recv.is_empty()); + + join.join().unwrap(); + }); +} From fbd1794269e4bfea2f305c04df3820545dc7dd6f Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Wed, 20 Mar 2024 21:05:35 +0100 Subject: [PATCH 12/13] refactor mpsc list len and is_empty functions --- tokio/src/sync/mpsc/list.rs | 44 ++++++++++++------------------------- 1 file changed, 14 insertions(+), 30 deletions(-) diff --git a/tokio/src/sync/mpsc/list.rs b/tokio/src/sync/mpsc/list.rs index 8f53f680780..90d9b828c8e 100644 --- a/tokio/src/sync/mpsc/list.rs +++ b/tokio/src/sync/mpsc/list.rs @@ -218,6 +218,15 @@ impl Tx { let _ = Box::from_raw(block.as_ptr()); } } + + pub(crate) fn is_closed(&self) -> bool { + let tail = self.block_tail.load(Acquire); + + unsafe { + let tail_block = &*tail; + tail_block.is_closed() + } + } } impl fmt::Debug for Tx { @@ -237,40 +246,15 @@ impl Rx { } // It is possible that a block has no value "now" but the list is still not empty. - // To be sure, it is necessary to check the tail position against the current index. - // - // One edge case is when all the senders are dropped, there will be a last block in the - // tail position, but it will be closed - - let tail_position = tx.tail_position.load(Acquire); - if self.index == tail_position { - true - } else if tail_position - self.index == 1 { - let tail = tx.block_tail.load(Acquire); - - unsafe { - let tail_block = &mut *tail; - tail_block.is_closed() - } - } else { - false - } + // To be sure, it is necessary to check the length of the list. + self.len(tx) == 0 } pub(crate) fn len(&self, tx: &Tx) -> usize { + // When all the senders are dropped, there will be a last block in the tail position, + // but it will be closed let tail_position = tx.tail_position.load(Acquire); - let tail = tx.block_tail.load(Acquire); - - let is_closed = unsafe { - let tail_block = &mut *tail; - tail_block.is_closed() - }; - - if is_closed { - tail_position - self.index - 1 - } else { - tail_position - self.index - } + tail_position - self.index - (tx.is_closed() as usize) } /// Pops the next value off the queue. From 6537a807a522f6ac2aa5a626f2bf4eafcae2ff7a Mon Sep 17 00:00:00 2001 From: Ilson Roberto Balliego Junior Date: Sat, 23 Mar 2024 21:12:22 +0100 Subject: [PATCH 13/13] add missing rx len test scenarios --- tokio/tests/sync_mpsc.rs | 58 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/tokio/tests/sync_mpsc.rs b/tokio/tests/sync_mpsc.rs index d234243d130..4a7eced13ee 100644 --- a/tokio/tests/sync_mpsc.rs +++ b/tokio/tests/sync_mpsc.rs @@ -1203,6 +1203,35 @@ async fn test_rx_len_when_consuming_all_messages() { } } +#[tokio::test] +async fn test_rx_len_when_close_is_called() { + let (tx, mut rx) = mpsc::channel(100); + tx.send(()).await.unwrap(); + rx.close(); + + assert_eq!(rx.len(), 1); +} + +#[tokio::test] +async fn test_rx_len_when_close_is_called_before_dropping_sender() { + let (tx, mut rx) = mpsc::channel(100); + tx.send(()).await.unwrap(); + rx.close(); + drop(tx); + + assert_eq!(rx.len(), 1); +} + +#[tokio::test] +async fn test_rx_len_when_close_is_called_after_dropping_sender() { + let (tx, mut rx) = mpsc::channel(100); + tx.send(()).await.unwrap(); + drop(tx); + rx.close(); + + assert_eq!(rx.len(), 1); +} + #[tokio::test] async fn test_rx_unbounded_is_closed_when_calling_close_with_sender() { // is_closed should return true after calling close but still has a sender @@ -1362,4 +1391,33 @@ async fn test_rx_unbounded_len_when_consuming_all_messages() { } } +#[tokio::test] +async fn test_rx_unbounded_len_when_close_is_called() { + let (tx, mut rx) = mpsc::unbounded_channel(); + tx.send(()).unwrap(); + rx.close(); + + assert_eq!(rx.len(), 1); +} + +#[tokio::test] +async fn test_rx_unbounded_len_when_close_is_called_before_dropping_sender() { + let (tx, mut rx) = mpsc::unbounded_channel(); + tx.send(()).unwrap(); + rx.close(); + drop(tx); + + assert_eq!(rx.len(), 1); +} + +#[tokio::test] +async fn test_rx_unbounded_len_when_close_is_called_after_dropping_sender() { + let (tx, mut rx) = mpsc::unbounded_channel(); + tx.send(()).unwrap(); + drop(tx); + rx.close(); + + assert_eq!(rx.len(), 1); +} + fn is_debug(_: &T) {}