Debug, cmd remove, close connection at last response

This commit is contained in:
Pascal Engélibert 2026-02-02 16:06:04 +01:00
commit c38b24a8ed
4 changed files with 98 additions and 13 deletions

View file

@ -93,6 +93,7 @@ pub async fn play(
connect_to: (String, u16), connect_to: (String, u16),
sync_receiver: oneshot::Receiver<()>, sync_receiver: oneshot::Receiver<()>,
repeat: u32, repeat: u32,
debug: bool,
) { ) {
sync_receiver.await.unwrap(); sync_receiver.await.unwrap();
// Semaphore used to limit the number of concurrent clients. // Semaphore used to limit the number of concurrent clients.
@ -289,8 +290,11 @@ pub async fn play(
for (direction, data_list) in ResponseStreamer::new(records.iter()) { for (direction, data_list) in ResponseStreamer::new(records.iter()) {
match direction { match direction {
Direction::ClientToServer => { Direction::ClientToServer => {
for data in data_list { for data in data_list.into_iter() {
if debug {
//println!("[CLT] ({id}) >> {}", str::from_utf8(&data[..data.len().min(255)]).unwrap()); //println!("[CLT] ({id}) >> {}", str::from_utf8(&data[..data.len().min(255)]).unwrap());
println!("[CLT] ({id}) >> {}", data.len());
}
//stream.get_mut().write_all(data).await.unwrap(); //stream.get_mut().write_all(data).await.unwrap();
match tokio::time::timeout( match tokio::time::timeout(
TIMEOUT, TIMEOUT,
@ -312,7 +316,9 @@ pub async fn play(
let reduced_len = let reduced_len =
total_len.saturating_sub(160 * data_list.len()).max(1); total_len.saturating_sub(160 * data_list.len()).max(1);
let mut total_recv = 0; let mut total_recv = 0;
//println!("[CLT] ({id}) << {}", data.len()); if debug {
println!("[CLT] ({id}) << {total_len}");
}
//let mut buf = Vec::new(); //let mut buf = Vec::new();
//stream.read_buf(&mut buf).await.ok(); //stream.read_buf(&mut buf).await.ok();
//let mut buf = vec![0; data.len().saturating_sub(50).max(1)]; //let mut buf = vec![0; data.len().saturating_sub(50).max(1)];
@ -357,6 +363,8 @@ pub async fn play(
"({}) RECV NOT ENOUGH {} / {}", "({}) RECV NOT ENOUGH {} / {}",
id, total_recv, total_len id, total_recv, total_len
); );
} else if debug {
println!("[CLT] ({id}) << {total_len} OK");
} }
} }
} }

View file

@ -32,6 +32,8 @@ enum Subcommand {
Print(OptPrint), Print(OptPrint),
/// Record traffic /// Record traffic
Record(OptRecord), Record(OptRecord),
/// Remove record
Remove(OptRemove),
/// Write test record /// Write test record
Test(OptTest), Test(OptTest),
} }
@ -64,6 +66,9 @@ struct OptPlay {
/// Only run these parts /// Only run these parts
#[argp(option, default = "String::from(\"both\")")] #[argp(option, default = "String::from(\"both\")")]
run: String, run: String,
/// Print debug info
#[argp(switch, short = 'd')]
debug: bool,
} }
/// Print records /// Print records
@ -88,6 +93,21 @@ struct OptRecord {}
#[argp(subcommand, name = "test")] #[argp(subcommand, name = "test")]
struct OptTest {} struct OptTest {}
/// Copy record but removing one connection id
#[derive(FromArgs)]
#[argp(subcommand, name = "remove")]
struct OptRemove {
/// Output path
#[argp(positional)]
output: String,
/// Record number to remove
#[argp(positional)]
record_number: u64,
/// Packet number to remove
#[argp(positional)]
packet_number: usize,
}
#[derive(Clone, Copy, Debug, Eq, PartialEq)] #[derive(Clone, Copy, Debug, Eq, PartialEq)]
enum RunMode { enum RunMode {
Client, Client,
@ -220,6 +240,7 @@ async fn main() {
(subopt.forward_addr, subopt.forward_port), (subopt.forward_addr, subopt.forward_port),
sync_receiver, sync_receiver,
subopt.repeat, subopt.repeat,
subopt.debug,
) )
.await; .await;
} else { } else {
@ -234,6 +255,7 @@ async fn main() {
&subopt.certs, &subopt.certs,
("0.0.0.0", subopt.listen_port), ("0.0.0.0", subopt.listen_port),
sync_sender, sync_sender,
subopt.debug,
) )
.await; .await;
} }
@ -246,6 +268,14 @@ async fn main() {
Subcommand::Record(_subopt) => { Subcommand::Record(_subopt) => {
record::make_record(&opt.record_file); record::make_record(&opt.record_file);
} }
Subcommand::Remove(subopt) => {
record::remove_record(
&opt.record_file,
&subopt.output,
subopt.record_number,
subopt.packet_number,
);
}
Subcommand::Test(_subopt) => { Subcommand::Test(_subopt) => {
record::make_test_record(&opt.record_file); record::make_test_record(&opt.record_file);
} }

View file

@ -4,7 +4,7 @@ use std::{
sync::mpsc::{Receiver, Sender, channel}, sync::mpsc::{Receiver, Sender, channel},
}; };
use crate::util::print_bin; use crate::util::{ResponseStreamer, print_bin};
const CLIENT_TO_SERVER: u8 = b'C'; const CLIENT_TO_SERVER: u8 = b'C';
const SERVER_TO_CLIENT: u8 = b'S'; const SERVER_TO_CLIENT: u8 = b'S';
@ -295,3 +295,26 @@ pub fn make_test_record(path: &str) {
write_record(&mut file, *direction, *conn_id, *server_name, *data); write_record(&mut file, *direction, *conn_id, *server_name, *data);
} }
} }
pub fn remove_record(
input_path: &str,
output_path: &str,
record_to_remove: u64,
packet_to_remove: usize,
) {
let records = read_record_file(input_path);
let mut output_file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(output_path)
.unwrap();
for (conn_id, (server_name, packets)) in records.into_iter() {
let server_name = String::from_utf8(server_name).unwrap();
for (packet_id, (direction, data)) in packets.into_iter().enumerate() {
if conn_id != record_to_remove || packet_id != packet_to_remove {
write_record(&mut output_file, direction, conn_id, &server_name, &data);
}
}
}
}

View file

@ -24,6 +24,7 @@ pub async fn play(
cert_path: &str, cert_path: &str,
listen_addr: (&str, u16), listen_addr: (&str, u16),
sync_sender: oneshot::Sender<()>, sync_sender: oneshot::Sender<()>,
debug: bool,
) { ) {
let mut response_map = HashMap::new(); let mut response_map = HashMap::new();
for (id, (server_name, records)) in records.iter() { for (id, (server_name, records)) in records.iter() {
@ -35,7 +36,7 @@ pub async fn play(
if let Some(hash) = hash if let Some(hash) = hash
&& !responses.is_empty() && !responses.is_empty()
{ {
response_map.insert((server_name.to_vec(), hash), (id, responses)); response_map.insert((server_name.to_vec(), hash), (id, responses, false));
responses = Vec::new(); responses = Vec::new();
} }
let mut slashes = data let mut slashes = data
@ -57,10 +58,15 @@ pub async fn play(
} }
} }
} }
if let Some(hash) = hash if let Some(hash) = hash {
&& !responses.is_empty() if !responses.is_empty() {
{ response_map.insert((server_name.to_vec(), hash), (id, responses, true));
response_map.insert((server_name.to_vec(), hash), (id, responses)); } else {
response_map
.get_mut(&(server_name.to_vec(), hash))
.unwrap()
.2 = true;
}
} }
} }
@ -249,7 +255,7 @@ pub async fn play(
} }
let stream = stream.get_mut(); let stream = stream.get_mut();
if let Some((hash, _diff)) = best { if let Some((hash, _diff)) = best {
let (id, responses) = response_map let (id, responses, last) = response_map
.get(&(server_name.as_bytes().to_vec(), hash.clone())) .get(&(server_name.as_bytes().to_vec(), hash.clone()))
.unwrap(); .unwrap();
//dbg!(id); //dbg!(id);
@ -258,6 +264,9 @@ pub async fn play(
stream.write_all(res).await.unwrap(); stream.write_all(res).await.unwrap();
stream.flush().await.unwrap(); stream.flush().await.unwrap();
} }
if *last {
break;
}
} else { } else {
println!("No response found for SNI=`{server_name}`"); println!("No response found for SNI=`{server_name}`");
} }
@ -330,7 +339,13 @@ pub async fn play(
let s1 = slashes.next(); let s1 = slashes.next();
let s2 = slashes.next(); let s2 = slashes.next();
if let (Some(s1), Some(s2)) = (s1, s2) { if let (Some(s1), Some(s2)) = (s1, s2) {
req[s1 + 1..s2].to_vec() let uniq_id = req[s1 + 1..s2].to_vec();
if debug {
if let Ok(uniq_id) = str::from_utf8(&uniq_id) {
println!("[SRV] ({uniq_id}) << {}", req.len());
}
}
uniq_id
} else { } else {
//println!("Previous: {:?}", &previous); //println!("Previous: {:?}", &previous);
println!("Did not find URL: {:?}", &req[0..req.len().min(255)]); println!("Did not find URL: {:?}", &req[0..req.len().min(255)]);
@ -354,14 +369,23 @@ pub async fn play(
} }
let stream = stream.get_mut(); let stream = stream.get_mut();
if let Some((server_name, hash, _diff)) = best { if let Some((server_name, hash, _diff)) = best {
let (id, responses) = response_map let (id, responses, last) = response_map
.get(&(server_name.clone(), hash.clone())) .get(&(server_name.clone(), hash.clone()))
.unwrap(); .unwrap();
//dbg!(id); //dbg!(id);
for &res in responses { for &res in responses {
if debug {
println!("[SRV] ({id}) >> {}", res.len());
//println!("[SRV] response for ({}): {} bytes", id, res.len()); //println!("[SRV] response for ({}): {} bytes", id, res.len());
}
stream.write_all(res).await.unwrap(); stream.write_all(res).await.unwrap();
stream.flush().await.unwrap(); stream.flush().await.unwrap();
if debug {
println!("[SRV] ({id}) >> {} OK", res.len());
}
}
if *last {
break;
} }
} else { } else {
println!("[SRV] No response found"); println!("[SRV] No response found");