From 2cd972a8600e9fe7de89096e32a82589b64ec0df Mon Sep 17 00:00:00 2001 From: Andrew Whitehead Date: Fri, 12 Aug 2022 13:19:26 -0700 Subject: [PATCH] move sockets_compatible to a method and remove dependencies Signed-off-by: Andrew Whitehead --- Cargo.toml | 2 -- src/lib.rs | 75 +++++++++++++++++++++++++++++++++++++++-------------- src/util.rs | 46 +++++--------------------------- 3 files changed, 62 insertions(+), 61 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 0e4709e..4e3e6d7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -25,8 +25,6 @@ rand = "0.8" bytes = "1" tokio = { version = "1", features = ["full"], optional = true } tokio-util = { version = "0.6", features = ["compat"], optional = true } -num-traits = "0.2" -enum-primitive-derive = "0.2" dashmap = "3.11" crossbeam = "0.7" uuid = { version = "0.8", features = ["v4"] } diff --git a/src/lib.rs b/src/lib.rs index 2a3ad35..2e2d77d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -41,22 +41,36 @@ use crate::codec::*; use crate::transport::AcceptStopHandle; use util::PeerIdentity; -#[macro_use] -extern crate enum_primitive_derive; - use async_trait::async_trait; use asynchronous_codec::FramedWrite; use futures::channel::mpsc; use futures::FutureExt; -use num_traits::ToPrimitive; use parking_lot::Mutex; + use std::collections::HashMap; use std::convert::TryFrom; use std::fmt::{Debug, Display}; +use std::str::FromStr; use std::sync::Arc; +const COMPATIBILITY_MATRIX: [u8; 121] = [ + // PAIR, PUB, SUB, REQ, REP, DEALER, ROUTER, PULL, PUSH, XPUB, XSUB + 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // PAIR + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, // PUB + 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, // SUB + 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, // REQ + 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, // REP + 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, // DEALER + 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, // ROUTER + 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, // PULL + 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, // PUSH + 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, // XPUB + 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, // XSUB +]; + #[allow(clippy::upper_case_acronyms)] -#[derive(Clone, Copy, Debug, PartialEq, Eq, Primitive)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)] +#[repr(usize)] pub enum SocketType { PAIR = 0, PUB = 1, @@ -89,25 +103,48 @@ impl SocketType { SocketType::STREAM => "STREAM", } } + + /// Checks if two sockets are compatible with each other + /// ``` + /// use zeromq::SocketType; + /// assert!(SocketType::PUB.compatible(SocketType::SUB)); + /// assert!(SocketType::REQ.compatible(SocketType::REP)); + /// assert!(SocketType::DEALER.compatible(SocketType::ROUTER)); + /// assert!(!SocketType::PUB.compatible(SocketType::REP)); + /// ``` + pub fn compatible(&self, other: SocketType) -> bool { + let row_index = *self as usize; + let col_index = other as usize; + COMPATIBILITY_MATRIX[row_index * 11 + col_index] != 0 + } +} + +impl FromStr for SocketType { + type Err = ZmqError; + + #[inline] + fn from_str(s: &str) -> Result { + Self::try_from(s.as_bytes()) + } } -impl TryFrom<&str> for SocketType { +impl TryFrom<&[u8]> for SocketType { type Error = ZmqError; - fn try_from(s: &str) -> Result { + fn try_from(s: &[u8]) -> Result { Ok(match s { - "PAIR" => SocketType::PAIR, - "PUB" => SocketType::PUB, - "SUB" => SocketType::SUB, - "REQ" => SocketType::REQ, - "REP" => SocketType::REP, - "DEALER" => SocketType::DEALER, - "ROUTER" => SocketType::ROUTER, - "PULL" => SocketType::PULL, - "PUSH" => SocketType::PUSH, - "XPUB" => SocketType::XPUB, - "XSUB" => SocketType::XSUB, - "STREAM" => SocketType::STREAM, + b"PAIR" => SocketType::PAIR, + b"PUB" => SocketType::PUB, + b"SUB" => SocketType::SUB, + b"REQ" => SocketType::REQ, + b"REP" => SocketType::REP, + b"DEALER" => SocketType::DEALER, + b"ROUTER" => SocketType::ROUTER, + b"PULL" => SocketType::PULL, + b"PUSH" => SocketType::PUSH, + b"XPUB" => SocketType::XPUB, + b"XSUB" => SocketType::XSUB, + b"STREAM" => SocketType::STREAM, _ => return Err(ZmqError::Other("Unknown socket type")), }) } diff --git a/src/util.rs b/src/util.rs index 0a9b90f..88f23e9 100644 --- a/src/util.rs +++ b/src/util.rs @@ -5,7 +5,6 @@ use asynchronous_codec::FramedRead; use bytes::Bytes; use futures::stream::StreamExt; use futures::SinkExt; -use num_traits::Pow; use rand::Rng; use std::convert::{TryFrom, TryInto}; @@ -102,36 +101,6 @@ pub(crate) struct Peer { pub(crate) recv_queue: FramedRead, ZmqCodec>, } -const COMPATIBILITY_MATRIX: [u8; 121] = [ - // PAIR, PUB, SUB, REQ, REP, DEALER, ROUTER, PULL, PUSH, XPUB, XSUB - 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // PAIR - 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, // PUB - 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, // SUB - 0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, // REQ - 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, // REP - 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, // DEALER - 0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, // ROUTER - 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, // PULL - 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, // PUSH - 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, // XPUB - 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, // XSUB -]; - -/// Checks if two sokets are compatible with each other -/// ``` -/// use zeromq::util::sockets_compatible; -/// use zeromq::SocketType; -/// assert!(sockets_compatible(SocketType::PUB, SocketType::SUB)); -/// assert!(sockets_compatible(SocketType::REQ, SocketType::REP)); -/// assert!(sockets_compatible(SocketType::DEALER, SocketType::ROUTER)); -/// assert!(!sockets_compatible(SocketType::PUB, SocketType::REP)); -/// ``` -pub fn sockets_compatible(one: SocketType, another: SocketType) -> bool { - let row_index = one.to_usize().unwrap(); - let col_index = another.to_usize().unwrap(); - COMPATIBILITY_MATRIX[row_index * 11 + col_index] != 0 -} - /// Given the result of the greetings exchange, determines the version of the /// ZMTP protocol that should be used for communication with the peer according /// to https://rfc.zeromq.org/spec/23/#version-negotiation. @@ -190,13 +159,10 @@ pub(crate) async fn ready_exchange( match ready_repl { Some(Ok(Message::Command(command))) => match command.name { ZmqCommandName::READY => { - let other_sock_type = command - .properties - .get("Socket-Type") - .map(|x| { - SocketType::try_from(std::str::from_utf8(x).expect("Invalid socket type")) - }) - .unwrap_or(Err(ZmqError::Other("Failed to parse other socket type")))?; + let other_sock_type = match command.properties.get("Socket-Type") { + Some(s) => SocketType::try_from(&s[..])?, + None => Err(ZmqError::Other("Failed to parse other socket type"))?, + }; let peer_id = command .properties @@ -205,7 +171,7 @@ pub(crate) async fn ready_exchange( .transpose()? .unwrap_or_default(); - if sockets_compatible(socket_type, other_sock_type) { + if socket_type.compatible(other_sock_type) { Ok(peer_id) } else { Err(ZmqError::Other( @@ -247,7 +213,7 @@ pub(crate) async fn connect_forever(endpoint: Endpoint) -> ZmqResult<(FramedIo, } let delay = { let mut rng = rand::thread_rng(); - std::f64::consts::E.pow(try_num as f64 / 3.0) + rng.gen_range(0.0f64..0.1f64) + std::f64::consts::E.powf(try_num as f64 / 3.0) + rng.gen_range(0.0f64..0.1f64) }; async_rt::task::sleep(std::time::Duration::from_secs_f64(delay)).await; continue;