Skip to content

Commit

Permalink
use futures-channel, futures-io, futures-task, and futures-util speci…
Browse files Browse the repository at this point in the history
…fically

Signed-off-by: Andrew Whitehead <[email protected]>
  • Loading branch information
andrewwhitehead committed Aug 15, 2022
1 parent 6ca89ef commit a282c7d
Show file tree
Hide file tree
Showing 33 changed files with 128 additions and 95 deletions.
9 changes: 5 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,10 @@ tcp-transport = []

[dependencies]
thiserror = "1"
futures = "0.3"
futures-util = "0.3"
futures-channel = { version = "0.3", features = ["sink"] }
futures-io = "0.3"
futures-task = "0.3"
futures-util = { version = "0.3", features = ["sink"] }
async-trait = "0.1"
parking_lot = "0.12"
rand = "0.8"
Expand All @@ -40,10 +42,9 @@ async-std = { version = "1", features = ["attributes"], optional = true }
chrono = "0.4"
criterion = "0.3"
pretty_env_logger = "0.4"
zmq2 = "0.5.0"
zmq2 = "0.5"
hex = "0.4"


[lib]
bench = false

Expand Down
4 changes: 2 additions & 2 deletions examples/task_worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod async_helpers;

use futures::FutureExt;
use futures_util::{select, FutureExt};
use std::io::Write;
use std::{error::Error, time::Duration};
use zeromq::{Socket, SocketRecv, SocketSend};
Expand All @@ -22,7 +22,7 @@ async fn main() -> Result<(), Box<dyn Error>> {

// Process messages from receiver and controller
loop {
futures::select! {
select! {
message = receiver.recv().fuse() => {
// Process task
let message = message.unwrap();
Expand Down
4 changes: 2 additions & 2 deletions src/async_rt/task/join_handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ use async_std::task as rt_task;
#[cfg(feature = "tokio-runtime")]
use tokio::task as rt_task;

use super::JoinError;

use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use super::JoinError;

pub struct JoinHandle<T>(rt_task::JoinHandle<T>);
impl<T> Future for JoinHandle<T> {
type Output = Result<T, JoinError>;
Expand Down
4 changes: 2 additions & 2 deletions src/async_rt/task/mod.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
mod join_handle;

use std::any::Any;

pub use join_handle::JoinHandle;

use std::any::Any;
use std::future::Future;

pub fn spawn<T>(task: T) -> JoinHandle<T::Output>
Expand Down
6 changes: 4 additions & 2 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,14 @@ use crate::util::PeerIdentity;
use crate::{
MultiPeerBackend, SocketBackend, SocketEvent, SocketOptions, SocketType, ZmqError, ZmqResult,
};

use async_trait::async_trait;
use crossbeam_queue::SegQueue;
use dashmap::DashMap;
use futures::channel::mpsc;
use futures::SinkExt;
use futures_channel::mpsc;
use futures_util::SinkExt;
use parking_lot::Mutex;

use std::sync::Arc;

pub(crate) struct Peer {
Expand Down
1 change: 1 addition & 0 deletions src/codec/command.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::error::CodecError;
use crate::SocketType;

use bytes::{Buf, BufMut, Bytes, BytesMut};

use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::Display;
Expand Down
10 changes: 6 additions & 4 deletions src/codec/framed.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use crate::codec::ZmqCodec;

use asynchronous_codec::{FramedRead, FramedWrite};
use futures_io::{AsyncRead, AsyncWrite};

// Enables us to have multiple bounds on the dyn trait in `InnerFramed`
pub trait FrameableRead: futures::AsyncRead + Unpin + Send + Sync {}
impl<T> FrameableRead for T where T: futures::AsyncRead + Unpin + Send + Sync {}
pub trait FrameableWrite: futures::AsyncWrite + Unpin + Send + Sync {}
impl<T> FrameableWrite for T where T: futures::AsyncWrite + Unpin + Send + Sync {}
pub trait FrameableRead: AsyncRead + Unpin + Send + Sync {}
impl<T> FrameableRead for T where T: AsyncRead + Unpin + Send + Sync {}
pub trait FrameableWrite: AsyncWrite + Unpin + Send + Sync {}
impl<T> FrameableWrite for T where T: AsyncWrite + Unpin + Send + Sync {}

pub(crate) type ZmqFramedRead = asynchronous_codec::FramedRead<Box<dyn FrameableRead>, ZmqCodec>;
pub(crate) type ZmqFramedWrite = asynchronous_codec::FramedWrite<Box<dyn FrameableWrite>, ZmqCodec>;
Expand Down
1 change: 1 addition & 0 deletions src/codec/greeting.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use super::error::CodecError;
use super::mechanism::ZmqMechanism;

use bytes::{Bytes, BytesMut};

use std::convert::TryFrom;

pub type ZmtpVersion = (u8, u8);
Expand Down
11 changes: 7 additions & 4 deletions src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@ pub(crate) use zmq_codec::ZmqCodec;

use crate::message::ZmqMessage;
use crate::{ZmqError, ZmqResult};
use futures::task::Poll;
use futures::Sink;

use futures_task::noop_waker;
use futures_util::Sink;

use std::pin::Pin;
use std::task::{Context, Poll};

#[derive(Debug, Clone)]
pub enum Message {
Expand All @@ -33,8 +36,8 @@ pub(crate) trait TrySend {

impl TrySend for ZmqFramedWrite {
fn try_send(mut self: Pin<&mut Self>, item: Message) -> ZmqResult<()> {
let waker = futures::task::noop_waker();
let mut cx = futures::task::Context::from_waker(&waker);
let waker = noop_waker();
let mut cx = Context::from_waker(&waker);
match self.as_mut().poll_ready(&mut cx) {
Poll::Ready(Ok(())) => {
self.as_mut().start_send(item)?;
Expand Down
1 change: 1 addition & 0 deletions src/codec/zmq_codec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use crate::ZmqMessage;

use asynchronous_codec::{Decoder, Encoder};
use bytes::{Buf, BufMut, Bytes, BytesMut};

use std::convert::TryFrom;

#[derive(Debug, Clone, Copy)]
Expand Down
6 changes: 4 additions & 2 deletions src/dealer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use crate::{
CaptureSocket, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions,
SocketRecv, SocketSend, SocketType, ZmqMessage, ZmqResult,
};

use async_trait::async_trait;
use futures::channel::mpsc;
use futures::StreamExt;
use futures_channel::mpsc;
use futures_util::StreamExt;

use std::collections::hash_map::RandomState;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down
6 changes: 3 additions & 3 deletions src/endpoint/host.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
use super::EndpointError;
use crate::ZmqError;

use std::convert::TryFrom;
use std::fmt;
use std::net::{IpAddr, Ipv4Addr, Ipv6Addr};
use std::str::FromStr;

use super::EndpointError;
use crate::ZmqError;

/// Represents a host address. Does not include the port, and may be either an
/// ip address or a domain name
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
Expand Down
6 changes: 3 additions & 3 deletions src/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@ mod error;
mod host;
mod transport;

pub use host::Host;
pub use transport::Transport;

use once_cell::sync::Lazy;
use regex::Regex;

use std::fmt;
use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;

pub use error::EndpointError;
pub use host::Host;
pub use transport::Transport;

pub type Port = u16;

Expand Down
4 changes: 2 additions & 2 deletions src/endpoint/transport.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use super::EndpointError;

use std::convert::TryFrom;
use std::fmt;
use std::str::FromStr;

use super::EndpointError;

/// The type of transport used by a given endpoint
#[derive(Debug, Clone, Hash, Copy, PartialEq, Eq)]
#[non_exhaustive]
Expand Down
9 changes: 5 additions & 4 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use crate::endpoint::EndpointError;
use crate::task_handle::TaskError;
use crate::ZmqMessage;

use futures_channel::mpsc;
use thiserror::Error;

pub type ZmqResult<T> = Result<T, ZmqError>;
Expand Down Expand Up @@ -48,14 +49,14 @@ pub enum ZmqError {
UnsupportedVersion(ZmtpVersion),
}

impl From<futures::channel::mpsc::TrySendError<Message>> for ZmqError {
fn from(_: futures::channel::mpsc::TrySendError<Message>) -> Self {
impl From<mpsc::TrySendError<Message>> for ZmqError {
fn from(_: mpsc::TrySendError<Message>) -> Self {
ZmqError::BufferFull("Failed to send message. Send queue full/broken")
}
}

impl From<futures::channel::mpsc::SendError> for ZmqError {
fn from(_: futures::channel::mpsc::SendError) -> Self {
impl From<mpsc::SendError> for ZmqError {
fn from(_: mpsc::SendError) -> Self {
ZmqError::BufferFull("Failed to send message. Send queue full/broken")
}
}
22 changes: 12 additions & 10 deletions src/fair_queue.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use futures::task::{ArcWake, Context, Poll, Waker};
use futures::Stream;
use futures_task::{waker_ref, ArcWake};
use futures_util::Stream;
use parking_lot::Mutex;

use std::cmp::Ordering;
use std::collections::{BinaryHeap, HashMap};
use std::hash::Hash;
use std::pin::Pin;
use std::sync::atomic;
use std::sync::Arc;
use std::task::{Context, Poll, Waker};

pub(crate) struct QueueInner<S, K: Clone> {
counter: atomic::AtomicUsize,
Expand Down Expand Up @@ -115,7 +117,7 @@ where
inner: fair_queue.inner.clone(),
event: event.clone(),
});
let waker_ref = futures::task::waker_ref(&waker);
let waker_ref = waker_ref(&waker);
let mut cx = Context::from_waker(&waker_ref);
match io_stream.as_mut().poll_next(&mut cx) {
Poll::Ready(Some(res)) => {
Expand Down Expand Up @@ -162,13 +164,13 @@ impl<S, K: Clone> FairQueue<S, K> {
mod test {
use crate::async_rt;
use crate::fair_queue::FairQueue;
use futures::StreamExt;
use futures_util::{stream, StreamExt};

#[async_rt::test]
async fn test_fair_queue_ready() {
let a = futures::stream::iter(vec!["a1", "a2", "a3"]);
let b = futures::stream::iter(vec!["b1", "b2", "b3"]);
let c = futures::stream::iter(vec!["c1", "c2", "c3"]);
let a = stream::iter(vec!["a1", "a2", "a3"]);
let b = stream::iter(vec!["b1", "b2", "b3"]);
let c = stream::iter(vec!["c1", "c2", "c3"]);

let mut f_queue: FairQueue<_, u64> = FairQueue::new(false);
{
Expand Down Expand Up @@ -201,9 +203,9 @@ mod test {

#[async_rt::test]
async fn test_fair_queue_different_size() {
let a = futures::stream::iter(vec!["a1", "a2", "a3"]);
let b = futures::stream::iter(vec!["b1"]);
let c = futures::stream::iter(vec!["c1", "c2"]);
let a = stream::iter(vec!["a1", "a2", "a3"]);
let b = stream::iter(vec!["b1"]);
let c = stream::iter(vec!["c1", "c2"]);

let mut f_queue: FairQueue<_, u64> = FairQueue::new(false);
{
Expand Down
9 changes: 5 additions & 4 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ pub mod __async_rt {
pub use crate::dealer::*;
pub use crate::endpoint::{Endpoint, Host, Transport, TryIntoEndpoint};
pub use crate::error::{ZmqError, ZmqResult};
pub use crate::message::*;
pub use crate::pull::*;
pub use crate::push::*;
pub use crate::r#pub::*;
pub use crate::rep::*;
pub use crate::req::*;
pub use crate::router::*;
pub use crate::sub::*;
pub use message::*;

use crate::codec::*;
use crate::transport::AcceptStopHandle;
Expand All @@ -46,10 +46,11 @@ extern crate enum_primitive_derive;

use async_trait::async_trait;
use asynchronous_codec::FramedWrite;
use futures::channel::mpsc;
use futures::FutureExt;
use futures_channel::mpsc;
use futures_util::{select, FutureExt};
use num_traits::ToPrimitive;
use parking_lot::Mutex;

use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::{Debug, Display};
Expand Down Expand Up @@ -321,7 +322,7 @@ pub async fn proxy<Frontend: SocketSend + SocketRecv, Backend: SocketSend + Sock
mut capture: Option<Box<dyn CaptureSocket>>,
) -> ZmqResult<()> {
loop {
futures::select! {
select! {
frontend_mess = frontend.recv().fuse() => {
match frontend_mess {
Ok(message) => {
Expand Down
1 change: 1 addition & 0 deletions src/message.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use bytes::Bytes;

use std::collections::vec_deque::{Iter, VecDeque};
use std::convert::{From, TryFrom};
use std::fmt;
Expand Down
8 changes: 4 additions & 4 deletions src/pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ use crate::{

use async_trait::async_trait;
use dashmap::DashMap;
use futures::channel::{mpsc, oneshot};
use futures::FutureExt;
use futures_channel::{mpsc, oneshot};
use futures_util::{select, FutureExt, StreamExt};
use parking_lot::Mutex;

use std::collections::HashMap;
use std::io::ErrorKind;
use std::pin::Pin;
Expand Down Expand Up @@ -116,10 +117,9 @@ impl MultiPeerBackend for PubSocketBackend {
let backend = self;
let peer_id = peer_id.clone();
async_rt::task::spawn(async move {
use futures::StreamExt;
let mut stop_receiver = stop_receiver.fuse();
loop {
futures::select! {
select! {
_ = stop_receiver => {
break;
},
Expand Down
6 changes: 4 additions & 2 deletions src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ use crate::{
Endpoint, MultiPeerBackend, Socket, SocketEvent, SocketOptions, SocketRecv, SocketType,
ZmqMessage, ZmqResult,
};

use async_trait::async_trait;
use futures::channel::mpsc;
use futures::StreamExt;
use futures_channel::mpsc;
use futures_util::StreamExt;

use std::collections::hash_map::RandomState;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down
4 changes: 3 additions & 1 deletion src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@ use crate::{
CaptureSocket, Endpoint, MultiPeerBackend, Socket, SocketBackend, SocketEvent, SocketOptions,
SocketSend, SocketType, ZmqMessage, ZmqResult,
};

use async_trait::async_trait;
use futures::channel::mpsc;
use futures_channel::mpsc;

use std::collections::hash_map::RandomState;
use std::collections::HashMap;
use std::sync::Arc;
Expand Down
Loading

0 comments on commit a282c7d

Please sign in to comment.