Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

stream: add StreamExt::skip_while #2205

Merged
merged 4 commits into from
Feb 2, 2020

Conversation

torepettersen
Copy link
Contributor

This PR introduces StreamExt::skip_while as described in #2104. skip_while is an async version of Iterator::skip_while and will ignore the elements from the underlying stream until the predicate resolves false.

async version of Iterator::skip_while

Refs: tokio-rs#2104
Copy link
Contributor

@Darksonn Darksonn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it looks good, although I have a suggestion: How about wrapping the predicate in an option instead of using a boolean? This would run the destructor of any resources moved into the predicate when they are no longer needed.

tokio/src/stream/mod.rs Outdated Show resolved Hide resolved
@carllerche
Copy link
Member

@Darksonn I'm not following the suggestion. Where would the Option be?

I think the proposed impl is intended to match the std version: https://doc.rust-lang.org/std/iter/trait.Iterator.html#method.skip_while

@Darksonn
Copy link
Contributor

Darksonn commented Feb 1, 2020

I'm not following the suggestion. Where would the Option be?

It would wrap the predicate field in the SkipWhile struct?

@carllerche
Copy link
Member

@Darksonn OH! yes, that is good 👍 I thought you were talking about the signature of the predicate.

So, the suggestion is to change the struct to:

pin_project! {
    /// Stream for the [`skip_while`](super::StreamExt::skip_while) method.
    #[must_use = "streams do nothing unless polled"]
    pub struct SkipWhile<St, F> {
        #[pin]
        stream: St,
        predicate: Option<F>,
    }
}

@carllerche
Copy link
Member

This also has the other advantage of being able to have a smaller struct size if the predicate includes any NonNull.

@torepettersen
Copy link
Contributor Author

Nice suggestion 👍
I will take care of it.

@Darksonn
Copy link
Contributor

Darksonn commented Feb 2, 2020

The only other thing I can think of is that you might end up looping forever if given a stream that is always ready with non-matching items, e.g.

stream::iter(std::iter::repeat(1)).skip_while(|x| *x == 1)

See also #2160.

@torepettersen
Copy link
Contributor Author

The only other thing I can think of is that you might end up looping forever if given a stream that is always ready with non-matching items, e.g.

stream::iter(std::iter::repeat(1)).skip_while(|x| *x == 1)

Definitely a good point.
But seems like we still have to wait for a way to yield back to the executor. Or am I missing something?

@Darksonn
Copy link
Contributor

Darksonn commented Feb 2, 2020

But seems like we still have to wait for a way to yield back to the executor. Or am I missing something?

I mean, you can just call cx.waker().wake_by_ref() before returning Poll::Pending.

@carllerche
Copy link
Member

I would not worry about the "loop forever" problem in this PR. It is a more general issue that all other stream combinators have today and IMO we should continue investigation in #2160 and avoid one off handling in combinators

Copy link
Member

@carllerche carllerche left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM thanks 👍

@carllerche carllerche merged commit 513671f into tokio-rs:master Feb 2, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

3 participants