Skip to content

Commit

Permalink
Merge branch 'master' into adj-socket-type
Browse files Browse the repository at this point in the history
  • Loading branch information
poyea committed Aug 14, 2022
2 parents 2cd972a + 6ca89ef commit ebd5da5
Show file tree
Hide file tree
Showing 5 changed files with 30 additions and 33 deletions.
14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,16 @@ thiserror = "1"
futures = "0.3"
futures-util = "0.3"
async-trait = "0.1"
parking_lot = "0.11"
parking_lot = "0.12"
rand = "0.8"
bytes = "1"
tokio = { version = "1", features = ["full"], optional = true }
tokio-util = { version = "0.6", features = ["compat"], optional = true }
dashmap = "3.11"
crossbeam = "0.7"
uuid = { version = "0.8", features = ["v4"] }
tokio-util = { version = "0.7", features = ["compat"], optional = true }
dashmap = "5"
crossbeam-queue = "0.3"
uuid = { version = "1", features = ["v4"] }
regex = "1"
lazy_static = "1"
once_cell = "1"
log = "0.4"
asynchronous-codec = "0.6"
async-std = { version = "1", features = ["attributes"], optional = true }
Expand All @@ -39,7 +39,7 @@ chrono = "0.4"
criterion = "0.3"
pretty_env_logger = "0.4"
zmq2 = "0.5.0"
hex = "0.4.3"
hex = "0.4"


[lib]
Expand Down
6 changes: 3 additions & 3 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::{
MultiPeerBackend, SocketBackend, SocketEvent, SocketOptions, SocketType, ZmqError, ZmqResult,
};
use async_trait::async_trait;
use crossbeam::queue::SegQueue;
use crossbeam_queue::SegQueue;
use dashmap::DashMap;
use futures::channel::mpsc;
use futures::SinkExt;
Expand Down Expand Up @@ -49,8 +49,8 @@ impl GenericSocketBackend {
// we don't have a matching peer in peers map
loop {
let next_peer_id = match self.round_robin.pop() {
Ok(peer) => peer,
Err(_) => match message {
Some(peer) => peer,
None => match message {
Message::Greeting(_) => panic!("Sending greeting is not supported"),
Message::Command(_) => panic!("Sending commands is not supported"),
Message::Message(m) => {
Expand Down
35 changes: 16 additions & 19 deletions src/endpoint/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ mod transport;
pub use host::Host;
pub use transport::Transport;

use lazy_static::lazy_static;
use once_cell::sync::Lazy;
use regex::Regex;
use std::fmt;
use std::net::SocketAddr;
Expand All @@ -16,6 +16,9 @@ pub use error::EndpointError;

pub type Port = u16;

static TRANSPORT_REGEX: Lazy<Regex> = Lazy::new(|| Regex::new(r"^([[:lower:]]+):https://(.+)$").unwrap());
static HOST_PORT_REGEX: Lazy<Regex> = Lazy::new(|| Regex::new(r"^(.+):(\d+)$").unwrap());

/// Represents a ZMQ Endpoint.
///
/// # Examples
Expand Down Expand Up @@ -58,11 +61,6 @@ impl FromStr for Endpoint {
type Err = EndpointError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
lazy_static! {
static ref TRANSPORT_REGEX: Regex = Regex::new(r"^([[:lower:]]+):https://(.+)$").unwrap();
static ref HOST_PORT_REGEX: Regex = Regex::new(r"^(.+):(\d+)$").unwrap();
}

let caps = TRANSPORT_REGEX
.captures(s)
.ok_or(EndpointError::Syntax("Could not parse transport"))?;
Expand Down Expand Up @@ -149,21 +147,20 @@ mod private {
#[cfg(test)]
mod tests {
use super::*;
use lazy_static::lazy_static;

lazy_static! {
static ref PAIRS: Vec<(Endpoint, &'static str)> = vec![
static PAIRS: Lazy<Vec<(Endpoint, &'static str)>> = Lazy::new(|| {
vec![
(
Endpoint::Ipc(Some(PathBuf::from("/tmp/asdf"))),
"ipc:https:///tmp/asdf"
"ipc:https:///tmp/asdf",
),
(
Endpoint::Ipc(Some(PathBuf::from("my/dir_1/dir-2"))),
"ipc:https://my/dir_1/dir-2"
"ipc:https://my/dir_1/dir-2",
),
(
Endpoint::Ipc(Some(PathBuf::from("@abstract/namespace"))),
"ipc:https://@abstract/namespace"
"ipc:https://@abstract/namespace",
),
(
Endpoint::Tcp(Host::Domain("www.example.com".to_string()), 1234),
Expand All @@ -183,22 +180,22 @@ mod tests {
),
(
Endpoint::Tcp(Host::Domain("i❤.ws".to_string()), 80),
"tcp:https://i❤.ws:80"
"tcp:https://i❤.ws:80",
),
(
Endpoint::Tcp(Host::Domain("xn--i-7iq.ws".to_string()), 80),
"tcp:https://xn--i-7iq.ws:80"
"tcp:https://xn--i-7iq.ws:80",
),
(
Endpoint::Tcp(Host::Ipv4("127.0.0.1".parse().unwrap()), 65535),
"tcp:https://127.0.0.1:65535"
"tcp:https://127.0.0.1:65535",
),
(
Endpoint::Tcp(Host::Ipv4("127.0.0.1".parse().unwrap()), 0),
"tcp:https://127.0.0.1:0"
)
];
}
"tcp:https://127.0.0.1:0",
),
]
});

#[test]
fn test_endpoint_display() {
Expand Down
6 changes: 3 additions & 3 deletions src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{SocketType, ZmqResult};

use async_trait::async_trait;
use bytes::Bytes;
use crossbeam::queue::SegQueue;
use crossbeam_queue::SegQueue;
use dashmap::DashMap;
use futures::{SinkExt, StreamExt};
use std::collections::HashMap;
Expand Down Expand Up @@ -49,8 +49,8 @@ impl SocketSend for ReqSocket {
// we don't have a matching peer in peers map
loop {
let next_peer_id = match self.backend.round_robin.pop() {
Ok(peer) => peer,
Err(_) => {
Some(peer) => peer,
None => {
return Err(ZmqError::ReturnToSender {
reason: "Not connected to peers. Unable to send messages",
message,
Expand Down
2 changes: 1 addition & 1 deletion src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use crate::fair_queue::FairQueue;
use crate::fair_queue::QueueInner;
use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use crossbeam::queue::SegQueue;
use crossbeam_queue::SegQueue;
use dashmap::DashMap;
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
Expand Down

0 comments on commit ebd5da5

Please sign in to comment.