Skip to content

Commit

Permalink
Merge pull request #165 from andrewwhitehead/adj-socket-type
Browse files Browse the repository at this point in the history
SocketType cleanups
  • Loading branch information
poyea committed Sep 12, 2022
2 parents 6ca89ef + ebd5da5 commit f79c4dc
Show file tree
Hide file tree
Showing 3 changed files with 62 additions and 61 deletions.
2 changes: 0 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ rand = "0.8"
bytes = "1"
tokio = { version = "1", features = ["full"], optional = true }
tokio-util = { version = "0.7", features = ["compat"], optional = true }
num-traits = "0.2"
enum-primitive-derive = "0.2"
dashmap = "5"
crossbeam-queue = "0.3"
uuid = { version = "1", features = ["v4"] }
Expand Down
75 changes: 56 additions & 19 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,36 @@ use crate::codec::*;
use crate::transport::AcceptStopHandle;
use util::PeerIdentity;

#[macro_use]
extern crate enum_primitive_derive;

use async_trait::async_trait;
use asynchronous_codec::FramedWrite;
use futures::channel::mpsc;
use futures::FutureExt;
use num_traits::ToPrimitive;
use parking_lot::Mutex;

use std::collections::HashMap;
use std::convert::TryFrom;
use std::fmt::{Debug, Display};
use std::str::FromStr;
use std::sync::Arc;

const COMPATIBILITY_MATRIX: [u8; 121] = [
// PAIR, PUB, SUB, REQ, REP, DEALER, ROUTER, PULL, PUSH, XPUB, XSUB
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // PAIR
0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, // PUB
0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, // SUB
0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, // REQ
0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, // REP
0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, // DEALER
0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, // ROUTER
0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, // PULL
0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, // PUSH
0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, // XPUB
0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, // XSUB
];

#[allow(clippy::upper_case_acronyms)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Primitive)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, PartialOrd, Ord)]
#[repr(usize)]
pub enum SocketType {
PAIR = 0,
PUB = 1,
Expand Down Expand Up @@ -89,25 +103,48 @@ impl SocketType {
SocketType::STREAM => "STREAM",
}
}

/// Checks if two sockets are compatible with each other
/// ```
/// use zeromq::SocketType;
/// assert!(SocketType::PUB.compatible(SocketType::SUB));
/// assert!(SocketType::REQ.compatible(SocketType::REP));
/// assert!(SocketType::DEALER.compatible(SocketType::ROUTER));
/// assert!(!SocketType::PUB.compatible(SocketType::REP));
/// ```
pub fn compatible(&self, other: SocketType) -> bool {
let row_index = *self as usize;
let col_index = other as usize;
COMPATIBILITY_MATRIX[row_index * 11 + col_index] != 0
}
}

impl FromStr for SocketType {
type Err = ZmqError;

#[inline]
fn from_str(s: &str) -> Result<Self, ZmqError> {
Self::try_from(s.as_bytes())
}
}

impl TryFrom<&str> for SocketType {
impl TryFrom<&[u8]> for SocketType {
type Error = ZmqError;

fn try_from(s: &str) -> Result<Self, ZmqError> {
fn try_from(s: &[u8]) -> Result<Self, ZmqError> {
Ok(match s {
"PAIR" => SocketType::PAIR,
"PUB" => SocketType::PUB,
"SUB" => SocketType::SUB,
"REQ" => SocketType::REQ,
"REP" => SocketType::REP,
"DEALER" => SocketType::DEALER,
"ROUTER" => SocketType::ROUTER,
"PULL" => SocketType::PULL,
"PUSH" => SocketType::PUSH,
"XPUB" => SocketType::XPUB,
"XSUB" => SocketType::XSUB,
"STREAM" => SocketType::STREAM,
b"PAIR" => SocketType::PAIR,
b"PUB" => SocketType::PUB,
b"SUB" => SocketType::SUB,
b"REQ" => SocketType::REQ,
b"REP" => SocketType::REP,
b"DEALER" => SocketType::DEALER,
b"ROUTER" => SocketType::ROUTER,
b"PULL" => SocketType::PULL,
b"PUSH" => SocketType::PUSH,
b"XPUB" => SocketType::XPUB,
b"XSUB" => SocketType::XSUB,
b"STREAM" => SocketType::STREAM,
_ => return Err(ZmqError::Other("Unknown socket type")),
})
}
Expand Down
46 changes: 6 additions & 40 deletions src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use asynchronous_codec::FramedRead;
use bytes::Bytes;
use futures::stream::StreamExt;
use futures::SinkExt;
use num_traits::Pow;
use rand::Rng;

use std::convert::{TryFrom, TryInto};
Expand Down Expand Up @@ -102,36 +101,6 @@ pub(crate) struct Peer {
pub(crate) recv_queue: FramedRead<Box<dyn FrameableRead>, ZmqCodec>,
}

const COMPATIBILITY_MATRIX: [u8; 121] = [
// PAIR, PUB, SUB, REQ, REP, DEALER, ROUTER, PULL, PUSH, XPUB, XSUB
1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // PAIR
0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, // PUB
0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, // SUB
0, 0, 0, 0, 1, 0, 1, 0, 0, 0, 0, // REQ
0, 0, 0, 1, 0, 1, 0, 0, 0, 0, 0, // REP
0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, // DEALER
0, 0, 0, 1, 0, 1, 1, 0, 0, 0, 0, // ROUTER
0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, // PULL
0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, // PUSH
0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 1, // XPUB
0, 1, 0, 0, 0, 0, 0, 0, 0, 1, 0, // XSUB
];

/// Checks if two sokets are compatible with each other
/// ```
/// use zeromq::util::sockets_compatible;
/// use zeromq::SocketType;
/// assert!(sockets_compatible(SocketType::PUB, SocketType::SUB));
/// assert!(sockets_compatible(SocketType::REQ, SocketType::REP));
/// assert!(sockets_compatible(SocketType::DEALER, SocketType::ROUTER));
/// assert!(!sockets_compatible(SocketType::PUB, SocketType::REP));
/// ```
pub fn sockets_compatible(one: SocketType, another: SocketType) -> bool {
let row_index = one.to_usize().unwrap();
let col_index = another.to_usize().unwrap();
COMPATIBILITY_MATRIX[row_index * 11 + col_index] != 0
}

/// Given the result of the greetings exchange, determines the version of the
/// ZMTP protocol that should be used for communication with the peer according
/// to https://rfc.zeromq.org/spec/23/#version-negotiation.
Expand Down Expand Up @@ -190,13 +159,10 @@ pub(crate) async fn ready_exchange(
match ready_repl {
Some(Ok(Message::Command(command))) => match command.name {
ZmqCommandName::READY => {
let other_sock_type = command
.properties
.get("Socket-Type")
.map(|x| {
SocketType::try_from(std::str::from_utf8(x).expect("Invalid socket type"))
})
.unwrap_or(Err(ZmqError::Other("Failed to parse other socket type")))?;
let other_sock_type = match command.properties.get("Socket-Type") {
Some(s) => SocketType::try_from(&s[..])?,
None => Err(ZmqError::Other("Failed to parse other socket type"))?,
};

let peer_id = command
.properties
Expand All @@ -205,7 +171,7 @@ pub(crate) async fn ready_exchange(
.transpose()?
.unwrap_or_default();

if sockets_compatible(socket_type, other_sock_type) {
if socket_type.compatible(other_sock_type) {
Ok(peer_id)
} else {
Err(ZmqError::Other(
Expand Down Expand Up @@ -247,7 +213,7 @@ pub(crate) async fn connect_forever(endpoint: Endpoint) -> ZmqResult<(FramedIo,
}
let delay = {
let mut rng = rand::thread_rng();
std::f64::consts::E.pow(try_num as f64 / 3.0) + rng.gen_range(0.0f64..0.1f64)
std::f64::consts::E.powf(try_num as f64 / 3.0) + rng.gen_range(0.0f64..0.1f64)
};
async_rt::task::sleep(std::time::Duration::from_secs_f64(delay)).await;
continue;
Expand Down

0 comments on commit f79c4dc

Please sign in to comment.