From 889dc0e97893bc1142b0d48587e5a3272cfabb11 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20H=C3=A4ggblad?= Date: Wed, 26 Jan 2022 15:54:24 +0100 Subject: [PATCH 1/3] client-libs/mixnode: add trait for client sending --- .../client-libs/mixnet-client/src/client.rs | 17 ++++++++-- .../mixnet-client/src/forwarder.rs | 2 +- common/client-libs/mixnet-client/src/lib.rs | 2 +- mixnode/src/node/mod.rs | 6 +++- mixnode/src/node/packet_delayforwarder.rs | 33 ++++++++----------- 5 files changed, 34 insertions(+), 26 deletions(-) diff --git a/common/client-libs/mixnet-client/src/client.rs b/common/client-libs/mixnet-client/src/client.rs index 8d3d66820c..13fcfb595a 100644 --- a/common/client-libs/mixnet-client/src/client.rs +++ b/common/client-libs/mixnet-client/src/client.rs @@ -41,6 +41,17 @@ impl Config { } } +pub trait SendWithoutResponse { + // Without response in this context means we will not listen for anything we might get back (not + // that we should get anything), including any possible io errors + fn send_without_response( + &mut self, + address: NymNodeRoutingAddress, + packet: SphinxPacket, + packet_mode: PacketMode, + ) -> io::Result<()>; +} + pub struct Client { conn_new: HashMap, config: Config, @@ -186,10 +197,10 @@ impl Client { .await }); } +} - // without response in this context means we will not listen for anything we might get back - // (not that we should get anything), including any possible io errors - pub fn send_without_response( +impl SendWithoutResponse for Client { + fn send_without_response( &mut self, address: NymNodeRoutingAddress, packet: SphinxPacket, diff --git a/common/client-libs/mixnet-client/src/forwarder.rs b/common/client-libs/mixnet-client/src/forwarder.rs index 350af150b5..ead3b25a41 100644 --- a/common/client-libs/mixnet-client/src/forwarder.rs +++ b/common/client-libs/mixnet-client/src/forwarder.rs @@ -1,7 +1,7 @@ // Copyright 2021 - Nym Technologies SA // SPDX-License-Identifier: Apache-2.0 -use crate::client::{Client, Config}; +use crate::client::{Client, Config, SendWithoutResponse}; use futures::channel::mpsc; use futures::StreamExt; use log::*; diff --git a/common/client-libs/mixnet-client/src/lib.rs b/common/client-libs/mixnet-client/src/lib.rs index 6836a46f12..a63eb5ca03 100644 --- a/common/client-libs/mixnet-client/src/lib.rs +++ b/common/client-libs/mixnet-client/src/lib.rs @@ -4,4 +4,4 @@ pub mod client; pub mod forwarder; -pub use client::{Client, Config}; +pub use client::{Client, Config, SendWithoutResponse}; diff --git a/mixnode/src/node/mod.rs b/mixnode/src/node/mod.rs index 1b7dff0f2d..cf2ff9bfed 100644 --- a/mixnode/src/node/mod.rs +++ b/mixnode/src/node/mod.rs @@ -188,11 +188,15 @@ impl MixNode { ) -> PacketDelayForwardSender { info!("Starting packet delay-forwarder..."); - let mut packet_forwarder = DelayForwarder::new( + let client_config = mixnet_client::Config::new( self.config.get_packet_forwarding_initial_backoff(), self.config.get_packet_forwarding_maximum_backoff(), self.config.get_initial_connection_timeout(), self.config.get_maximum_connection_buffer_size(), + ); + + let mut packet_forwarder = DelayForwarder::new( + mixnet_client::Client::new(client_config), node_stats_update_sender, ); diff --git a/mixnode/src/node/packet_delayforwarder.rs b/mixnode/src/node/packet_delayforwarder.rs index ee469f3eb8..548bb7ff70 100644 --- a/mixnode/src/node/packet_delayforwarder.rs +++ b/mixnode/src/node/packet_delayforwarder.rs @@ -7,7 +7,7 @@ use futures::StreamExt; use nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, TimerError}; use nymsphinx::forwarding::packet::MixPacket; use std::io; -use tokio::time::{Duration, Instant}; +use tokio::time::Instant; // Delay + MixPacket vs Instant + MixPacket @@ -17,34 +17,27 @@ pub(crate) type PacketDelayForwardSender = mpsc::UnboundedSender<(MixPacket, Opt type PacketDelayForwardReceiver = mpsc::UnboundedReceiver<(MixPacket, Option)>; /// Entity responsible for delaying received sphinx packet and forwarding it to next node. -pub(crate) struct DelayForwarder { +pub(crate) struct DelayForwarder +where + C: mixnet_client::SendWithoutResponse, +{ delay_queue: NonExhaustiveDelayQueue, - mixnet_client: mixnet_client::Client, + mixnet_client: C, packet_sender: PacketDelayForwardSender, packet_receiver: PacketDelayForwardReceiver, node_stats_update_sender: UpdateSender, } -impl DelayForwarder { - pub(crate) fn new( - initial_reconnection_backoff: Duration, - maximum_reconnection_backoff: Duration, - initial_connection_timeout: Duration, - maximum_connection_buffer_size: usize, - node_stats_update_sender: UpdateSender, - ) -> Self { - let client_config = mixnet_client::Config::new( - initial_reconnection_backoff, - maximum_reconnection_backoff, - initial_connection_timeout, - maximum_connection_buffer_size, - ); - +impl DelayForwarder +where + C: mixnet_client::SendWithoutResponse, +{ + pub(crate) fn new(client: C, node_stats_update_sender: UpdateSender) -> DelayForwarder { let (packet_sender, packet_receiver) = mpsc::unbounded(); - DelayForwarder { + DelayForwarder:: { delay_queue: NonExhaustiveDelayQueue::new(), - mixnet_client: mixnet_client::Client::new(client_config), + mixnet_client: client, packet_sender, packet_receiver, node_stats_update_sender, From 79f99130d6b628baecfd29ae917fa8ec76817ea5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20H=C3=A4ggblad?= Date: Wed, 26 Jan 2022 16:08:40 +0100 Subject: [PATCH 2/3] mixnode: instantiate and test DelayForwarder --- Cargo.lock | 2 + mixnode/Cargo.toml | 3 + mixnode/src/node/packet_delayforwarder.rs | 113 ++++++++++++++++++++++ 3 files changed, 118 insertions(+) diff --git a/Cargo.lock b/Cargo.lock index 977c916db8..e030c6e018 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3904,6 +3904,8 @@ dependencies = [ "mixnode-common", "nonexhaustive-delayqueue", "nymsphinx", + "nymsphinx-params", + "nymsphinx-types", "pemstore", "pretty_env_logger", "rand 0.7.3", diff --git a/mixnode/Cargo.toml b/mixnode/Cargo.toml index 56ec535e56..46914dabe9 100644 --- a/mixnode/Cargo.toml +++ b/mixnode/Cargo.toml @@ -50,5 +50,8 @@ version-checker = { path="../common/version-checker" } [dev-dependencies] serial_test = "0.5" +nymsphinx-types = { path = "../common/nymsphinx/types" } +nymsphinx-params = { path = "../common/nymsphinx/params" } + [build-dependencies] vergen = { version = "5", default-features = false, features = ["build", "git", "rustc", "cargo"] } diff --git a/mixnode/src/node/packet_delayforwarder.rs b/mixnode/src/node/packet_delayforwarder.rs index 548bb7ff70..64a4012c04 100644 --- a/mixnode/src/node/packet_delayforwarder.rs +++ b/mixnode/src/node/packet_delayforwarder.rs @@ -116,3 +116,116 @@ where } } } + +#[cfg(test)] +mod tests { + use super::*; + + use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + use std::sync::{Arc, Mutex}; + use std::time::Duration; + + use nymsphinx::addressing::nodes::NymNodeRoutingAddress; + use nymsphinx_params::packet_sizes::PacketSize; + use nymsphinx_params::PacketMode; + use nymsphinx_types::builder::SphinxPacketBuilder; + use nymsphinx_types::{ + crypto, Delay as SphinxDelay, Destination, DestinationAddressBytes, Node, NodeAddressBytes, + SphinxPacket, DESTINATION_ADDRESS_LENGTH, IDENTIFIER_LENGTH, NODE_ADDRESS_LENGTH, + }; + // use tokio::sync::Mutex; + + #[derive(Default)] + struct TestClient { + pub packets_sent: Arc>>, + } + + impl mixnet_client::SendWithoutResponse for TestClient { + fn send_without_response( + &mut self, + address: NymNodeRoutingAddress, + packet: SphinxPacket, + packet_mode: PacketMode, + ) -> io::Result<()> { + self.packets_sent + .lock() + .unwrap() + .push((address, packet, packet_mode)); + Ok(()) + } + } + + fn make_valid_sphinx_packet(size: PacketSize) -> SphinxPacket { + let (_, node1_pk) = crypto::keygen(); + let node1 = Node::new( + NodeAddressBytes::from_bytes([5u8; NODE_ADDRESS_LENGTH]), + node1_pk, + ); + let (_, node2_pk) = crypto::keygen(); + let node2 = Node::new( + NodeAddressBytes::from_bytes([4u8; NODE_ADDRESS_LENGTH]), + node2_pk, + ); + let (_, node3_pk) = crypto::keygen(); + let node3 = Node::new( + NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]), + node3_pk, + ); + + let route = [node1, node2, node3]; + let destination = Destination::new( + DestinationAddressBytes::from_bytes([3u8; DESTINATION_ADDRESS_LENGTH]), + [4u8; IDENTIFIER_LENGTH], + ); + let delays = vec![ + SphinxDelay::new_from_nanos(42), + SphinxDelay::new_from_nanos(42), + SphinxDelay::new_from_nanos(42), + ]; + SphinxPacketBuilder::new() + .with_payload_size(size.payload_size()) + .build_packet(b"foomp".to_vec(), &route, &destination, &delays) + .unwrap() + } + + #[tokio::test] + async fn packets_received_are_forwarded() { + // Wire up the DelayForwarder + let (stats_sender, _stats_receiver) = mpsc::unbounded(); + let node_stats_update_sender = UpdateSender::new(stats_sender); + let client = TestClient::default(); + let client_packets_sent = client.packets_sent.clone(); + let mut delay_forwarder = DelayForwarder::new(client, node_stats_update_sender); + let packet_sender = delay_forwarder.sender(); + + // Spawn the worker, listening on packet_sender channel + tokio::spawn(async move { delay_forwarder.run().await }); + + // Send a `MixPacket` down the channel without any delay attached. + let next_hop = + NymNodeRoutingAddress::from(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 42)); + let mix_packet = MixPacket::new( + next_hop, + make_valid_sphinx_packet(PacketSize::default()), + PacketMode::default(), + ); + let forward_instant = None; + packet_sender + .unbounded_send((mix_packet, forward_instant)) + .unwrap(); + + // Give the the worker a chance to act + tokio::time::sleep(Duration::from_millis(10)).await; + + // The client should have forwarded the packet straight away + assert_eq!( + client_packets_sent + .lock() + .unwrap() + .iter() + .map(|(a, _, _)| *a) + .collect::>(), + vec![next_hop] + ); + } +} From 6b2783ccfc0b0ef741f4c44ebddb5153aadaa468 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jon=20H=C3=A4ggblad?= Date: Wed, 26 Jan 2022 16:19:20 +0100 Subject: [PATCH 3/3] mixnode: remove commented out line --- mixnode/src/node/packet_delayforwarder.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/mixnode/src/node/packet_delayforwarder.rs b/mixnode/src/node/packet_delayforwarder.rs index 64a4012c04..d2f1ced206 100644 --- a/mixnode/src/node/packet_delayforwarder.rs +++ b/mixnode/src/node/packet_delayforwarder.rs @@ -133,7 +133,6 @@ mod tests { crypto, Delay as SphinxDelay, Destination, DestinationAddressBytes, Node, NodeAddressBytes, SphinxPacket, DESTINATION_ADDRESS_LENGTH, IDENTIFIER_LENGTH, NODE_ADDRESS_LENGTH, }; - // use tokio::sync::Mutex; #[derive(Default)] struct TestClient {