diff --git a/src/client.rs b/src/client.rs index 01733ee..b47b50a 100644 --- a/src/client.rs +++ b/src/client.rs @@ -93,6 +93,7 @@ pub async fn play( connect_to: (String, u16), sync_receiver: oneshot::Receiver<()>, repeat: u32, + debug: bool, ) { sync_receiver.await.unwrap(); // Semaphore used to limit the number of concurrent clients. @@ -289,8 +290,11 @@ pub async fn play( for (direction, data_list) in ResponseStreamer::new(records.iter()) { match direction { Direction::ClientToServer => { - for data in data_list { - //println!("[CLT] ({id}) >> {}", str::from_utf8(&data[..data.len().min(255)]).unwrap()); + for data in data_list.into_iter() { + if debug { + //println!("[CLT] ({id}) >> {}", str::from_utf8(&data[..data.len().min(255)]).unwrap()); + println!("[CLT] ({id}) >> {}", data.len()); + } //stream.get_mut().write_all(data).await.unwrap(); match tokio::time::timeout( TIMEOUT, @@ -312,7 +316,9 @@ pub async fn play( let reduced_len = total_len.saturating_sub(160 * data_list.len()).max(1); let mut total_recv = 0; - //println!("[CLT] ({id}) << {}", data.len()); + if debug { + println!("[CLT] ({id}) << {total_len}"); + } //let mut buf = Vec::new(); //stream.read_buf(&mut buf).await.ok(); //let mut buf = vec![0; data.len().saturating_sub(50).max(1)]; @@ -357,6 +363,8 @@ pub async fn play( "({}) RECV NOT ENOUGH {} / {}", id, total_recv, total_len ); + } else if debug { + println!("[CLT] ({id}) << {total_len} OK"); } } } diff --git a/src/main.rs b/src/main.rs index 55c1766..c331794 100644 --- a/src/main.rs +++ b/src/main.rs @@ -32,6 +32,8 @@ enum Subcommand { Print(OptPrint), /// Record traffic Record(OptRecord), + /// Remove record + Remove(OptRemove), /// Write test record Test(OptTest), } @@ -64,6 +66,9 @@ struct OptPlay { /// Only run these parts #[argp(option, default = "String::from(\"both\")")] run: String, + /// Print debug info + #[argp(switch, short = 'd')] + debug: bool, } /// Print records @@ -88,6 +93,21 @@ struct OptRecord {} #[argp(subcommand, name = "test")] struct OptTest {} +/// Copy record but removing one connection id +#[derive(FromArgs)] +#[argp(subcommand, name = "remove")] +struct OptRemove { + /// Output path + #[argp(positional)] + output: String, + /// Record number to remove + #[argp(positional)] + record_number: u64, + /// Packet number to remove + #[argp(positional)] + packet_number: usize, +} + #[derive(Clone, Copy, Debug, Eq, PartialEq)] enum RunMode { Client, @@ -220,6 +240,7 @@ async fn main() { (subopt.forward_addr, subopt.forward_port), sync_receiver, subopt.repeat, + subopt.debug, ) .await; } else { @@ -234,6 +255,7 @@ async fn main() { &subopt.certs, ("0.0.0.0", subopt.listen_port), sync_sender, + subopt.debug, ) .await; } @@ -246,6 +268,14 @@ async fn main() { Subcommand::Record(_subopt) => { record::make_record(&opt.record_file); } + Subcommand::Remove(subopt) => { + record::remove_record( + &opt.record_file, + &subopt.output, + subopt.record_number, + subopt.packet_number, + ); + } Subcommand::Test(_subopt) => { record::make_test_record(&opt.record_file); } diff --git a/src/record.rs b/src/record.rs index 38fc7bd..a5c4a3f 100644 --- a/src/record.rs +++ b/src/record.rs @@ -4,7 +4,7 @@ use std::{ sync::mpsc::{Receiver, Sender, channel}, }; -use crate::util::print_bin; +use crate::util::{ResponseStreamer, print_bin}; const CLIENT_TO_SERVER: u8 = b'C'; const SERVER_TO_CLIENT: u8 = b'S'; @@ -295,3 +295,26 @@ pub fn make_test_record(path: &str) { write_record(&mut file, *direction, *conn_id, *server_name, *data); } } + +pub fn remove_record( + input_path: &str, + output_path: &str, + record_to_remove: u64, + packet_to_remove: usize, +) { + let records = read_record_file(input_path); + let mut output_file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(output_path) + .unwrap(); + for (conn_id, (server_name, packets)) in records.into_iter() { + let server_name = String::from_utf8(server_name).unwrap(); + for (packet_id, (direction, data)) in packets.into_iter().enumerate() { + if conn_id != record_to_remove || packet_id != packet_to_remove { + write_record(&mut output_file, direction, conn_id, &server_name, &data); + } + } + } +} diff --git a/src/server.rs b/src/server.rs index 6f85ebc..0a5f31a 100644 --- a/src/server.rs +++ b/src/server.rs @@ -24,6 +24,7 @@ pub async fn play( cert_path: &str, listen_addr: (&str, u16), sync_sender: oneshot::Sender<()>, + debug: bool, ) { let mut response_map = HashMap::new(); for (id, (server_name, records)) in records.iter() { @@ -35,7 +36,7 @@ pub async fn play( if let Some(hash) = hash && !responses.is_empty() { - response_map.insert((server_name.to_vec(), hash), (id, responses)); + response_map.insert((server_name.to_vec(), hash), (id, responses, false)); responses = Vec::new(); } let mut slashes = data @@ -57,10 +58,15 @@ pub async fn play( } } } - if let Some(hash) = hash - && !responses.is_empty() - { - response_map.insert((server_name.to_vec(), hash), (id, responses)); + if let Some(hash) = hash { + if !responses.is_empty() { + response_map.insert((server_name.to_vec(), hash), (id, responses, true)); + } else { + response_map + .get_mut(&(server_name.to_vec(), hash)) + .unwrap() + .2 = true; + } } } @@ -249,7 +255,7 @@ pub async fn play( } let stream = stream.get_mut(); if let Some((hash, _diff)) = best { - let (id, responses) = response_map + let (id, responses, last) = response_map .get(&(server_name.as_bytes().to_vec(), hash.clone())) .unwrap(); //dbg!(id); @@ -258,6 +264,9 @@ pub async fn play( stream.write_all(res).await.unwrap(); stream.flush().await.unwrap(); } + if *last { + break; + } } else { println!("No response found for SNI=`{server_name}`"); } @@ -330,7 +339,13 @@ pub async fn play( let s1 = slashes.next(); let s2 = slashes.next(); if let (Some(s1), Some(s2)) = (s1, s2) { - req[s1 + 1..s2].to_vec() + let uniq_id = req[s1 + 1..s2].to_vec(); + if debug { + if let Ok(uniq_id) = str::from_utf8(&uniq_id) { + println!("[SRV] ({uniq_id}) << {}", req.len()); + } + } + uniq_id } else { //println!("Previous: {:?}", &previous); println!("Did not find URL: {:?}", &req[0..req.len().min(255)]); @@ -354,14 +369,23 @@ pub async fn play( } let stream = stream.get_mut(); if let Some((server_name, hash, _diff)) = best { - let (id, responses) = response_map + let (id, responses, last) = response_map .get(&(server_name.clone(), hash.clone())) .unwrap(); //dbg!(id); for &res in responses { - //println!("[SRV] response for ({}): {} bytes", id, res.len()); + if debug { + println!("[SRV] ({id}) >> {}", res.len()); + //println!("[SRV] response for ({}): {} bytes", id, res.len()); + } stream.write_all(res).await.unwrap(); stream.flush().await.unwrap(); + if debug { + println!("[SRV] ({id}) >> {} OK", res.len()); + } + } + if *last { + break; } } else { println!("[SRV] No response found");