Skip to content

Commit

Permalink
stream: make stream adapters public (#6658)
Browse files Browse the repository at this point in the history
  • Loading branch information
sharpened-nacho committed Jul 2, 2024
1 parent b2e4c5f commit c8f3539
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 18 deletions.
16 changes: 15 additions & 1 deletion tokio-stream/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,22 @@ pub mod wrappers;

mod stream_ext;
pub use stream_ext::{collect::FromStream, StreamExt};
/// Adapters for [`Stream`]s created by methods in [`StreamExt`].
pub mod adapters {
pub use crate::stream_ext::{
Chain, Filter, FilterMap, Fuse, Map, MapWhile, Merge, Peekable, Skip, SkipWhile, Take,
TakeWhile, Then,
};
cfg_time! {
pub use crate::stream_ext::{ChunksTimeout, Timeout, TimeoutRepeating};
}
}

cfg_time! {
pub use stream_ext::timeout::{Elapsed, Timeout};
#[deprecated = "Import those symbols from adapters instead"]
#[doc(hidden)]
pub use stream_ext::timeout::Timeout;
pub use stream_ext::timeout::Elapsed;
}

mod empty;
Expand Down
32 changes: 16 additions & 16 deletions tokio-stream/src/stream_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,66 +8,66 @@ mod any;
use any::AnyFuture;

mod chain;
use chain::Chain;
pub use chain::Chain;

pub(crate) mod collect;
use collect::{Collect, FromStream};

mod filter;
use filter::Filter;
pub use filter::Filter;

mod filter_map;
use filter_map::FilterMap;
pub use filter_map::FilterMap;

mod fold;
use fold::FoldFuture;

mod fuse;
use fuse::Fuse;
pub use fuse::Fuse;

mod map;
use map::Map;
pub use map::Map;

mod map_while;
use map_while::MapWhile;
pub use map_while::MapWhile;

mod merge;
use merge::Merge;
pub use merge::Merge;

mod next;
use next::Next;

mod skip;
use skip::Skip;
pub use skip::Skip;

mod skip_while;
use skip_while::SkipWhile;
pub use skip_while::SkipWhile;

mod take;
use take::Take;
pub use take::Take;

mod take_while;
use take_while::TakeWhile;
pub use take_while::TakeWhile;

mod then;
use then::Then;
pub use then::Then;

mod try_next;
use try_next::TryNext;

mod peekable;
use peekable::Peekable;
pub use peekable::Peekable;

cfg_time! {
pub(crate) mod timeout;
pub(crate) mod timeout_repeating;
use timeout::Timeout;
use timeout_repeating::TimeoutRepeating;
pub use timeout::Timeout;
pub use timeout_repeating::TimeoutRepeating;
use tokio::time::{Duration, Interval};
mod throttle;
use throttle::{throttle, Throttle};
mod chunks_timeout;
use chunks_timeout::ChunksTimeout;
pub use chunks_timeout::ChunksTimeout;
}

/// An extension trait for the [`Stream`] trait that provides a variety of
Expand Down
11 changes: 10 additions & 1 deletion tokio-stream/tests/stream_chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,14 @@ mod support {
}

use support::mpsc;
use tokio_stream::adapters::Chain;

#[tokio::test]
async fn basic_usage() {
let one = stream::iter(vec![1, 2, 3]);
let two = stream::iter(vec![4, 5, 6]);

let mut stream = one.chain(two);
let mut stream = visibility_test(one, two);

assert_eq!(stream.size_hint(), (6, Some(6)));
assert_eq!(stream.next().await, Some(1));
Expand All @@ -39,6 +40,14 @@ async fn basic_usage() {
assert_eq!(stream.next().await, None);
}

fn visibility_test<I, S1, S2>(s1: S1, s2: S2) -> Chain<S1, S2>
where
S1: Stream<Item = I>,
S2: Stream<Item = I>,
{
s1.chain(s2)
}

#[tokio::test]
async fn pending_first() {
let (tx1, rx1) = mpsc::unbounded_channel_stream();
Expand Down

0 comments on commit c8f3539

Please sign in to comment.