Skip to content
This repository has been archived by the owner on May 11, 2023. It is now read-only.

Commit

Permalink
zb: Make use of futures_core::ready! marco
Browse files Browse the repository at this point in the history
This makes writing manual async impls easier and will hopefully be part
of the [std](rust-lang/rust#81050) (maybe under
a different name though).
  • Loading branch information
zeenix committed Jul 4, 2021
1 parent 3a0d099 commit cf0ea81
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 30 deletions.
20 changes: 8 additions & 12 deletions zbus/src/azync/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ use std::{
};
use zvariant::ObjectPath;

use futures_core::{stream, Future};
use futures_core::{ready, stream, Future};
use futures_sink::Sink;
use futures_util::{
future::{select, Either},
Expand Down Expand Up @@ -857,10 +857,9 @@ impl Sink<Message> for Connection {

fn poll_close(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<()>> {
let mut raw_out_conn = self.inner.raw_out_conn.lock().expect("poisened lock");
match raw_out_conn.flush(cx) {
Poll::Ready(Ok(_)) => (),
Poll::Ready(Err(e)) => return Poll::Ready(Err(e)),
Poll::Pending => return Poll::Pending,
match ready!(raw_out_conn.flush(cx)) {
Ok(_) => (),
Err(e) => return Poll::Ready(Err(e)),
}

Poll::Ready(raw_out_conn.close())
Expand All @@ -876,7 +875,7 @@ impl stream::Stream for Connection {
let err_fut = stream.error_receiver.next();
let mut select_fut = select(msg_fut, err_fut);

match futures_core::ready!(Pin::new(&mut select_fut).poll(cx)) {
match ready!(Pin::new(&mut select_fut).poll(cx)) {
Either::Left((msg, _)) => Poll::Ready(msg.map(Ok)),
Either::Right((error, _)) => Poll::Ready(error.map(Err)),
}
Expand All @@ -896,13 +895,10 @@ impl<'r, 's> Future for ReceiveMessage<'r, 's> {
loop {
match stream.raw_conn.try_receive_message() {
Err(Error::Io(e)) if e.kind() == ErrorKind::WouldBlock => {
let poll = stream.raw_conn.socket().poll_readable(cx);

match poll {
Poll::Pending => return Poll::Pending,
match ready!(stream.raw_conn.socket().poll_readable(cx)) {
// Guess socket became ready already so let's try it again.
Poll::Ready(Ok(_)) => continue,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
Ok(_) => continue,
Err(e) => return Poll::Ready(Err(e.into())),
}
}
m => return Poll::Ready(m),
Expand Down
14 changes: 7 additions & 7 deletions zbus/src/azync/handshake.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use async_io::Async;
use futures_core::ready;

use std::{
fmt::Debug,
Expand Down Expand Up @@ -119,16 +120,15 @@ where
}
Err(Error::Io(e)) => {
if e.kind() == std::io::ErrorKind::WouldBlock {
let poll = match handshake.next_io_operation() {
IoOperation::Read => handshake.socket().poll_readable(cx),
IoOperation::Write => handshake.socket().poll_writable(cx),
let res = match handshake.next_io_operation() {
IoOperation::Read => ready!(handshake.socket().poll_readable(cx)),
IoOperation::Write => ready!(handshake.socket().poll_writable(cx)),
IoOperation::None => panic!("Invalid handshake state"),
};
match poll {
Poll::Pending => return Poll::Pending,
match res {
// Guess socket became ready already so let's try it again.
Poll::Ready(Ok(_)) => continue,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
Ok(_) => continue,
Err(e) => return Poll::Ready(Err(e.into())),
}
} else {
return Poll::Ready(Err(Error::Io(e)));
Expand Down
8 changes: 3 additions & 5 deletions zbus/src/azync/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -164,9 +164,8 @@ where
let m = self.get_mut();
let (name, stream) = (m.name, m.stream.as_mut());
// there must be a way to simplify the following code..
let p = stream::Stream::poll_next(stream, cx);
match p {
Poll::Ready(Some(item)) => {
match futures_core::ready!(stream::Stream::poll_next(stream, cx)) {
Some(item) => {
if item.0 == name {
if let Some(Ok(v)) = item.1.clone().map(T::try_from) {
Poll::Ready(Some(Some(v)))
Expand All @@ -177,8 +176,7 @@ where
Poll::Pending
}
}
Poll::Ready(None) => Poll::Ready(None),
Poll::Pending => Poll::Pending,
None => Poll::Ready(None),
}
}
}
Expand Down
9 changes: 3 additions & 6 deletions zbus/src/raw/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,10 @@ impl Connection<Async<Box<dyn Socket>>> {
Ok(()) => return Poll::Ready(Ok(())),
Err(e) => {
if e.kind() == ErrorKind::WouldBlock {
let poll = self.socket().poll_writable(cx);

match poll {
Poll::Pending => return Poll::Pending,
match futures_core::ready!(self.socket().poll_writable(cx)) {
// Guess socket became ready already so let's try it again.
Poll::Ready(Ok(_)) => continue,
Poll::Ready(Err(e)) => return Poll::Ready(Err(e.into())),
Ok(_) => continue,
Err(e) => return Poll::Ready(Err(e.into())),
}
} else {
return Poll::Ready(Err(crate::Error::Io(e)));
Expand Down

0 comments on commit cf0ea81

Please sign in to comment.