Skip to content

Commit

Permalink
Rust benchmark client: clarify internals, expose parameters through CLI
Browse files Browse the repository at this point in the history
  • Loading branch information
De117 committed Nov 27, 2023
1 parent 0852d99 commit f6e3f1f
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 48 deletions.
7 changes: 7 additions & 0 deletions benchmark/client-rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions benchmark/client-rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@ edition = "2021"
tokio = { version = "1", features = ["full"] }
reqwest = {version = "0.11.22"}
futures = {version = "0.3"}
argparse = {version = "0.2.2"}
152 changes: 104 additions & 48 deletions benchmark/client-rust/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,28 @@ use tokio::io::AsyncWriteExt;
//use tokio::task::{JoinHandle, JoinError};
use std::io::{Read, Write};
use std::time::{Instant, Duration};
use argparse::{ArgumentParser, Store};
use std::fmt::{Display, Formatter};

// const HTTP_HOST: &'static str = "localhost:8080";
const TARGET_IP: &'static str = "127.0.0.1";
const TARGET_PORT: u16 = 2222;
const PROXY_IP: &'static str = "127.0.0.1";
const PROXY_PORT: u16 = 8080;
#[derive(Clone)]
struct Host {
hostname: String,
port: u16,
}
impl Display for Host {
fn fmt(&self, f: &mut Formatter) -> std::fmt::Result {
write!(f, "{}:{}", self.hostname, self.port)
}
}

fn parse_host(s: &str) -> Option<Host> {
if let Some((hostname, port_string)) = s.split_once(':') {
let port: u16 = port_string.parse().unwrap();
let hostname = String::from(hostname);
return Some(Host {hostname, port})
}
None
}
const BUFFER_SIZE: usize = 1000;

#[derive(Debug, Clone, Copy)]
Expand Down Expand Up @@ -52,9 +68,9 @@ fn spin_until(t: Instant) -> Duration {
}
}

fn make_request(request_id: &str, _ip_address: &str, _tcp_port: u16) -> Sample {
let proxy_request = format!("CONNECT localhost:{TARGET_PORT} HTTP/1.1\r\nHost: {PROXY_IP}:{PROXY_PORT}\r\n\r\n").into_bytes();
let http_request = format!("GET / HTTP/1.1\r\nHost: {TARGET_IP}:{TARGET_PORT}\r\n\r\n").into_bytes();
fn make_request(request_id: &str, proxy: &Host, target: &Host) -> Sample {
let proxy_request = format!("CONNECT {} HTTP/1.1\r\nHost: {}\r\n\r\n", target, proxy).into_bytes();
let http_request = format!("GET / HTTP/1.1\r\nHost: {}\r\n\r\n", target).into_bytes();

let mut buffer = [0u8; BUFFER_SIZE];

Expand All @@ -67,7 +83,7 @@ fn make_request(request_id: &str, _ip_address: &str, _tcp_port: u16) -> Sample {
let mut t_read = None;
let ok: bool;

match std::net::TcpStream::connect((PROXY_IP, PROXY_PORT)) {
match std::net::TcpStream::connect((&proxy.hostname as &str, proxy.port)) {
Err(_) => {ok = false; log("Could not connect")}
Ok(mut stream) => {
t_connect = Some(Instant::now());
Expand Down Expand Up @@ -227,7 +243,7 @@ fn print_latencies(durations: &Vec<Duration>) {

//fn print_type_of<T>(_: &T) {println!("{}", std::any::type_name::<T>())}

fn run_thread(initial_request_id: u64, num_requests: u64, frequency: u32, target_ip: &str, target_port: u16) -> (Vec<Sample>, Vec<Duration>) {
fn run_thread(initial_request_id: u64, num_requests: u64, frequency: u32, proxy: &Host, target: &Host) -> (Vec<Sample>, Vec<Duration>) {
println!("Starting thread, requests {initial_request_id}-{} at {frequency}/s", initial_request_id + num_requests);

let period = Duration::from_nanos(1_000_000_000 / frequency as u64);
Expand All @@ -236,7 +252,7 @@ fn run_thread(initial_request_id: u64, num_requests: u64, frequency: u32, target

let mut scheduled_at = Instant::now();
for i in 0..num_requests {
let sample = make_request(&format!("{}", initial_request_id + i), target_ip, target_port);
let sample = make_request(&format!("{}", initial_request_id + i), proxy, target);
actual_times.push(Instant::now() - scheduled_at);
samples.push(sample);
scheduled_at += period;
Expand All @@ -245,62 +261,102 @@ fn run_thread(initial_request_id: u64, num_requests: u64, frequency: u32, target
return (samples, actual_times);
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> () {
const NUM_REQUESTS: u64 = 100_000;

let frequency = 1_000; // Hz
println!("Running at frequency of {frequency} Hz");

/// The load of a single OS thread.
#[derive(Copy, Clone)]
struct Chunk {
frequency: u32,
num_requests: u64,
}

// At low rates & low latencies, we can do everything from a single thread.
// At high rates or high latencies, our thread would get more and more delayed,
// so we need to use multiple threads.
// For _our_ testing, on localhost, 10kHz should be doable, but not much more.
const FREQUENCY_TRESHOLD: u32 = 10_000;
/// At low rates & low latencies, we can do everything from a single thread.
/// At high rates or high latencies, one thread would get more and more delayed,
/// so we need to use multiple threads.
///
/// This function calculates the division of work over n threads, such that all
/// threads except maybe the last one run at maximum frequency, and a thread's
/// number of requests proportional to its frequency.
///
/// Returns: (n, normal_chunk, last_chunk)
fn calculate_chunks(max_frequency_per_thread: u32, frequency: u32, num_requests: u64) -> (u32, Chunk, Chunk) {

// Divide the work over threads as follows:
// 1. frequency first, by filling (n-1) full blocks and 1 potentially empty block
// 2. requests ∝ frequency, with factor total_requests / total_frequency
let n = frequency as f64 / FREQUENCY_TRESHOLD as f64;
let n = frequency as f64 / max_frequency_per_thread as f64;

let frequency_per_chunk = FREQUENCY_TRESHOLD;
let requests_per_chunk = (NUM_REQUESTS as f64 / n).floor() as u64;
let frequency_per_chunk = max_frequency_per_thread;
let requests_per_chunk = (num_requests as f64 / n).floor() as u64;

let frequency_in_last_chunk: u32;
let requests_in_last_chunk: u64;
if frequency % FREQUENCY_TRESHOLD != 0 { // frequency doesn't cleanly divide into chunks
if frequency % max_frequency_per_thread != 0 {
// frequency doesn't cleanly divide into chunks
frequency_in_last_chunk = frequency - n.floor() as u32 * frequency_per_chunk;
requests_in_last_chunk = NUM_REQUESTS - n.floor() as u64 * requests_per_chunk;
requests_in_last_chunk = num_requests - n.floor() as u64 * requests_per_chunk;
} else {
frequency_in_last_chunk = frequency_per_chunk;
requests_in_last_chunk = requests_per_chunk;
}
let num_threads = n.ceil() as u32;

return (
num_threads,
Chunk{frequency: frequency_per_chunk, num_requests: requests_per_chunk},
Chunk{frequency: frequency_in_last_chunk, num_requests: requests_in_last_chunk},
);
}

#[tokio::main(flavor = "current_thread")]
async fn main() -> () {

let mut num_requests: u64 = 100_000;
let mut frequency: u32 = 1_000; // Hz
let mut time: u32 = 10; // seconds
let mut proxy_string: String = "localhost:8080".to_owned();
let mut target_string: String = "localhost:2222".to_owned();

{
let mut parser = ArgumentParser::new();
parser.refer(&mut num_requests).add_option(&["-n", "--num-requests"], Store, "Number of requests to send");
parser.refer(&mut frequency).add_option(&["-f", "--frequency"], Store, "Desired send rate (in Hz)");
parser.refer(&mut time).add_option(&["--time"], Store, "How long to send requests (in s)");
parser.refer(&mut proxy_string).add_option(&["--proxy"], Store,
"Which HTTP CONNECT proxy to use, if any; e.g. localhost:12345");
parser.refer(&mut target_string).add_option(&["--target"], Store,
"Which target to hit; e.g. localhost:12345");
parser.parse_args_or_exit();
}

let proxy = parse_host(&proxy_string).expect("Malformed proxy");
let target = parse_host(&target_string).expect("Malformed target");
println!("Proxy: {}", proxy);
println!("Target: {}", target);

println!("Running at frequency of {frequency} Hz");

// For _our_ testing, on localhost, 10kHz should be doable, but not much more.
let (num_threads, normal_chunk, last_chunk) =
calculate_chunks(10_000, frequency, num_requests);

println!("num_threads: {num_threads}");

let mut join_handles = Vec::<std::thread::JoinHandle<_>>::new();
let t0 = Instant::now();
for i in 0..num_threads {
if i < num_threads - 1 {
join_handles.push(std::thread::spawn(move ||
run_thread(
i as u64 * requests_per_chunk,
requests_per_chunk,
frequency_per_chunk,
TARGET_IP,
TARGET_PORT
)));
} else {
join_handles.push(std::thread::spawn(move ||
run_thread(
i as u64 * requests_per_chunk,
requests_in_last_chunk,
frequency_in_last_chunk,
TARGET_IP,
TARGET_PORT
)));
}
let chunk = if i < num_threads - 1 {normal_chunk} else {last_chunk};

// copies to move into thread
let proxy = proxy.clone();
let target = target.clone();

join_handles.push(std::thread::spawn(move ||
run_thread(
i as u64 * normal_chunk.num_requests,
chunk.num_requests,
chunk.frequency,
&proxy,
&target,
)));
}
let results = join_handles.into_iter().map(|h| h.join()).collect::<Vec<_>>();
let t1 = Instant::now();
Expand Down Expand Up @@ -355,5 +411,5 @@ async fn main() -> () {
println!("Send: max {:10.3} μs, min {:10.3} μs", 1e6 * t_send_max , 1e6 * t_send_min );
println!("Total: max {:10.3} μs, min {:10.3} μs", 1e6 * t_total_max , 1e6 * t_total_min );

println!("Total time elapsed: {:?}, {} μs per request", t1 - t0, 1e6 * (t1 - t0).as_secs_f64() / NUM_REQUESTS as f64);
println!("Total time elapsed: {:?}, {} μs per request", t1 - t0, 1e6 * (t1 - t0).as_secs_f64() / num_requests as f64);
}

0 comments on commit f6e3f1f

Please sign in to comment.