Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add trait to mock client dependency in DelayForwarder #1073

Merged
merged 3 commits into from
Jan 27, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

17 changes: 14 additions & 3 deletions common/client-libs/mixnet-client/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,17 @@ impl Config {
}
}

pub trait SendWithoutResponse {
// Without response in this context means we will not listen for anything we might get back (not
// that we should get anything), including any possible io errors
fn send_without_response(
&mut self,
address: NymNodeRoutingAddress,
packet: SphinxPacket,
packet_mode: PacketMode,
) -> io::Result<()>;
}

pub struct Client {
conn_new: HashMap<NymNodeRoutingAddress, ConnectionSender>,
config: Config,
Expand Down Expand Up @@ -186,10 +197,10 @@ impl Client {
.await
});
}
}

// without response in this context means we will not listen for anything we might get back
// (not that we should get anything), including any possible io errors
pub fn send_without_response(
impl SendWithoutResponse for Client {
fn send_without_response(
&mut self,
address: NymNodeRoutingAddress,
packet: SphinxPacket,
Expand Down
2 changes: 1 addition & 1 deletion common/client-libs/mixnet-client/src/forwarder.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
// Copyright 2021 - Nym Technologies SA <[email protected]>
// SPDX-License-Identifier: Apache-2.0

use crate::client::{Client, Config};
use crate::client::{Client, Config, SendWithoutResponse};
use futures::channel::mpsc;
use futures::StreamExt;
use log::*;
Expand Down
2 changes: 1 addition & 1 deletion common/client-libs/mixnet-client/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
pub mod client;
pub mod forwarder;

pub use client::{Client, Config};
pub use client::{Client, Config, SendWithoutResponse};
3 changes: 3 additions & 0 deletions mixnode/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,8 @@ version-checker = { path="../common/version-checker" }
[dev-dependencies]
serial_test = "0.5"

nymsphinx-types = { path = "../common/nymsphinx/types" }
nymsphinx-params = { path = "../common/nymsphinx/params" }

[build-dependencies]
vergen = { version = "5", default-features = false, features = ["build", "git", "rustc", "cargo"] }
6 changes: 5 additions & 1 deletion mixnode/src/node/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,11 +188,15 @@ impl MixNode {
) -> PacketDelayForwardSender {
info!("Starting packet delay-forwarder...");

let mut packet_forwarder = DelayForwarder::new(
let client_config = mixnet_client::Config::new(
self.config.get_packet_forwarding_initial_backoff(),
self.config.get_packet_forwarding_maximum_backoff(),
self.config.get_initial_connection_timeout(),
self.config.get_maximum_connection_buffer_size(),
);

let mut packet_forwarder = DelayForwarder::new(
mixnet_client::Client::new(client_config),
node_stats_update_sender,
);

Expand Down
145 changes: 125 additions & 20 deletions mixnode/src/node/packet_delayforwarder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use futures::StreamExt;
use nonexhaustive_delayqueue::{Expired, NonExhaustiveDelayQueue, TimerError};
use nymsphinx::forwarding::packet::MixPacket;
use std::io;
use tokio::time::{Duration, Instant};
use tokio::time::Instant;

// Delay + MixPacket vs Instant + MixPacket

Expand All @@ -17,34 +17,27 @@ pub(crate) type PacketDelayForwardSender = mpsc::UnboundedSender<(MixPacket, Opt
type PacketDelayForwardReceiver = mpsc::UnboundedReceiver<(MixPacket, Option<Instant>)>;

/// Entity responsible for delaying received sphinx packet and forwarding it to next node.
pub(crate) struct DelayForwarder {
pub(crate) struct DelayForwarder<C>
where
C: mixnet_client::SendWithoutResponse,
{
delay_queue: NonExhaustiveDelayQueue<MixPacket>,
mixnet_client: mixnet_client::Client,
mixnet_client: C,
packet_sender: PacketDelayForwardSender,
packet_receiver: PacketDelayForwardReceiver,
node_stats_update_sender: UpdateSender,
}

impl DelayForwarder {
pub(crate) fn new(
initial_reconnection_backoff: Duration,
maximum_reconnection_backoff: Duration,
initial_connection_timeout: Duration,
maximum_connection_buffer_size: usize,
node_stats_update_sender: UpdateSender,
) -> Self {
let client_config = mixnet_client::Config::new(
initial_reconnection_backoff,
maximum_reconnection_backoff,
initial_connection_timeout,
maximum_connection_buffer_size,
);

impl<C> DelayForwarder<C>
where
C: mixnet_client::SendWithoutResponse,
{
pub(crate) fn new(client: C, node_stats_update_sender: UpdateSender) -> DelayForwarder<C> {
let (packet_sender, packet_receiver) = mpsc::unbounded();

DelayForwarder {
DelayForwarder::<C> {
delay_queue: NonExhaustiveDelayQueue::new(),
mixnet_client: mixnet_client::Client::new(client_config),
mixnet_client: client,
packet_sender,
packet_receiver,
node_stats_update_sender,
Expand Down Expand Up @@ -123,3 +116,115 @@ impl DelayForwarder {
}
}
}

#[cfg(test)]
mod tests {
use super::*;

use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::sync::{Arc, Mutex};
use std::time::Duration;

use nymsphinx::addressing::nodes::NymNodeRoutingAddress;
use nymsphinx_params::packet_sizes::PacketSize;
use nymsphinx_params::PacketMode;
use nymsphinx_types::builder::SphinxPacketBuilder;
use nymsphinx_types::{
crypto, Delay as SphinxDelay, Destination, DestinationAddressBytes, Node, NodeAddressBytes,
SphinxPacket, DESTINATION_ADDRESS_LENGTH, IDENTIFIER_LENGTH, NODE_ADDRESS_LENGTH,
};

#[derive(Default)]
struct TestClient {
pub packets_sent: Arc<Mutex<Vec<(NymNodeRoutingAddress, SphinxPacket, PacketMode)>>>,
}

impl mixnet_client::SendWithoutResponse for TestClient {
fn send_without_response(
&mut self,
address: NymNodeRoutingAddress,
packet: SphinxPacket,
packet_mode: PacketMode,
) -> io::Result<()> {
self.packets_sent
.lock()
.unwrap()
.push((address, packet, packet_mode));
Ok(())
}
}

fn make_valid_sphinx_packet(size: PacketSize) -> SphinxPacket {
let (_, node1_pk) = crypto::keygen();
let node1 = Node::new(
NodeAddressBytes::from_bytes([5u8; NODE_ADDRESS_LENGTH]),
node1_pk,
);
let (_, node2_pk) = crypto::keygen();
let node2 = Node::new(
NodeAddressBytes::from_bytes([4u8; NODE_ADDRESS_LENGTH]),
node2_pk,
);
let (_, node3_pk) = crypto::keygen();
let node3 = Node::new(
NodeAddressBytes::from_bytes([2u8; NODE_ADDRESS_LENGTH]),
node3_pk,
);

let route = [node1, node2, node3];
let destination = Destination::new(
DestinationAddressBytes::from_bytes([3u8; DESTINATION_ADDRESS_LENGTH]),
[4u8; IDENTIFIER_LENGTH],
);
let delays = vec![
SphinxDelay::new_from_nanos(42),
SphinxDelay::new_from_nanos(42),
SphinxDelay::new_from_nanos(42),
];
SphinxPacketBuilder::new()
.with_payload_size(size.payload_size())
.build_packet(b"foomp".to_vec(), &route, &destination, &delays)
.unwrap()
}

#[tokio::test]
async fn packets_received_are_forwarded() {
// Wire up the DelayForwarder
let (stats_sender, _stats_receiver) = mpsc::unbounded();
let node_stats_update_sender = UpdateSender::new(stats_sender);
let client = TestClient::default();
let client_packets_sent = client.packets_sent.clone();
let mut delay_forwarder = DelayForwarder::new(client, node_stats_update_sender);
let packet_sender = delay_forwarder.sender();

// Spawn the worker, listening on packet_sender channel
tokio::spawn(async move { delay_forwarder.run().await });

// Send a `MixPacket` down the channel without any delay attached.
let next_hop =
NymNodeRoutingAddress::from(SocketAddr::new(IpAddr::V4(Ipv4Addr::new(1, 2, 3, 4)), 42));
let mix_packet = MixPacket::new(
next_hop,
make_valid_sphinx_packet(PacketSize::default()),
PacketMode::default(),
);
let forward_instant = None;
packet_sender
.unbounded_send((mix_packet, forward_instant))
.unwrap();

// Give the the worker a chance to act
tokio::time::sleep(Duration::from_millis(10)).await;

// The client should have forwarded the packet straight away
assert_eq!(
client_packets_sent
.lock()
.unwrap()
.iter()
.map(|(a, _, _)| *a)
.collect::<Vec<_>>(),
vec![next_hop]
);
}
}