Tentative fix client setup

This commit is contained in:
Pascal Engélibert 2026-02-03 11:33:24 +01:00
commit 66d8aeb4a5
2 changed files with 280 additions and 254 deletions

View file

@ -29,7 +29,7 @@ use tokio_rustls::{
}; };
use tokio_util::codec::Framed; use tokio_util::codec::Framed;
const TIMEOUT: Duration = Duration::from_secs(60); const TIMEOUT: Duration = Duration::from_secs(30);
#[derive(Debug)] #[derive(Debug)]
struct DummyCertVerifier; struct DummyCertVerifier;
@ -98,13 +98,31 @@ pub async fn play(
sync_receiver.await.unwrap(); sync_receiver.await.unwrap();
// Semaphore used to limit the number of concurrent clients. // Semaphore used to limit the number of concurrent clients.
// Its handle is released when the task panics. // Its handle is released when the task panics.
let limiter = Arc::new(Semaphore::new(32)); let limiter = Arc::new(Semaphore::new(16));
let counter = Arc::new(AtomicU32::new(0)); let counter = Arc::new(AtomicU32::new(0));
let running = Arc::new(Mutex::new(HashSet::new())); let running = Arc::new(Mutex::new(HashSet::new()));
let total = records.len() * repeat as usize; let total = records.len() * repeat as usize;
let mut handles = Vec::new();
let connect_to = connect_to.to_socket_addrs().unwrap().next().unwrap(); let connect_to = connect_to.to_socket_addrs().unwrap().next().unwrap();
let debug_mutex = Arc::new(Mutex::new(())); let debug_mutex = Arc::new(Mutex::new(()));
tokio::spawn({
let running = running.clone();
let counter = counter.clone();
async move {
let mut last_count = 0;
loop {
tokio::time::sleep(TIMEOUT).await;
println!("Running: {:?}", running.lock().await);
let new_count = counter.load(std::sync::atomic::Ordering::Relaxed);
if new_count == last_count {
println!("Stalled at {} / {}, stopping", new_count, total);
std::process::exit(0);
}
last_count = new_count;
}
}
});
match tls_mode { match tls_mode {
TlsMode::Both | TlsMode::Client => { TlsMode::Both | TlsMode::Client => {
let mut config = tokio_rustls::rustls::ClientConfig::builder() let mut config = tokio_rustls::rustls::ClientConfig::builder()
@ -125,285 +143,282 @@ pub async fn play(
} }
config.key_log = Arc::new(tokio_rustls::rustls::KeyLogFile::new()); config.key_log = Arc::new(tokio_rustls::rustls::KeyLogFile::new());
let config = Arc::new(config); let config = Arc::new(config);
for (id, (server_name, records)) in records.iter() { for _i in 0..repeat {
let connector = TlsConnector::from(config.clone()); let mut handles = Vec::new();
let counter = counter.clone(); for (id, (server_name, records)) in records.iter() {
let limiter = limiter.clone(); let connector = TlsConnector::from(config.clone());
let running = running.clone(); let counter = counter.clone();
handles.push(tokio::spawn(async move { let limiter = limiter.clone();
let mut running_guard = running.lock().await; let running = running.clone();
running_guard.insert(*id); handles.push(tokio::spawn(async move {
drop(running_guard); let mut running_guard = running.lock().await;
let limiter = limiter.acquire().await.unwrap(); running_guard.insert(*id);
let server_name = drop(running_guard);
ServerName::try_from(String::from_utf8(server_name.clone()).unwrap()) let limiter = limiter.acquire().await.unwrap();
.unwrap(); let server_name =
'repeat: for _i in 0..repeat { ServerName::try_from(String::from_utf8(server_name.clone()).unwrap())
let stream = TcpStream::connect(connect_to).await.unwrap(); .unwrap();
let stream = connector 'repeat: for _i in 0..1 {
.connect(server_name.clone(), stream) let stream = TcpStream::connect(connect_to).await.unwrap();
.await let stream = connector
.unwrap(); .connect(server_name.clone(), stream)
let mut stream = Framed::new(stream, crate::http::HttpClientCodec::new()); .await
for (direction, data_list) in ResponseStreamer::new(records.iter()) { .unwrap();
match direction { let mut stream =
Direction::ClientToServer => { Framed::new(stream, crate::http::HttpClientCodec::new());
for data in data_list { for (direction, data_list) in ResponseStreamer::new(records.iter()) {
//println!("[CLT] ({id}) >> {}", data.len()); match direction {
//stream.get_mut().write_all(data).await.unwrap(); Direction::ClientToServer => {
match tokio::time::timeout( for data in data_list {
TIMEOUT, //println!("[CLT] ({id}) >> {}", data.len());
stream.get_mut().write_all(data), //stream.get_mut().write_all(data).await.unwrap();
) match tokio::time::timeout(
.await TIMEOUT,
{ stream.get_mut().write_all(data),
Ok(v) => v.unwrap(), )
Err(_e) => { .await
println!("client timeout {id} (sending)"); {
continue 'repeat; Ok(v) => v.unwrap(),
Err(_e) => {
println!("client timeout {id} (sending)");
continue 'repeat;
}
} }
} }
} }
} Direction::ServerToClient => {
Direction::ServerToClient => { let total_len: usize =
let total_len: usize = data_list.iter().map(|data| data.len()).sum::<usize>();
data_list.iter().map(|data| data.len()).sum::<usize>(); let reduced_len =
let reduced_len = total_len.saturating_sub(160 * data_list.len()).max(1);
total_len.saturating_sub(160 * data_list.len()).max(1); let mut total_recv = 0;
let mut total_recv = 0; //println!("[CLT] ({id}) << {}", data.len());
//println!("[CLT] ({id}) << {}", data.len()); // let mut buf = Vec::new();
// let mut buf = Vec::new(); // stream.read_buf(&mut buf).await.ok();
// stream.read_buf(&mut buf).await.ok(); //let mut buf = vec![0; data.len().saturating_sub(50).max(1)];
//let mut buf = vec![0; data.len().saturating_sub(50).max(1)]; //let resp = stream.next().await.unwrap().unwrap();
//let resp = stream.next().await.unwrap().unwrap(); while total_recv < reduced_len {
while total_recv < reduced_len { let resp =
let resp = match tokio::time::timeout( match tokio::time::timeout(TIMEOUT, stream.next())
TIMEOUT, .await
stream.next(), {
) Ok(v) => v.unwrap().unwrap(),
.await Err(_e) => {
{ // TODO fix
Ok(v) => v.unwrap().unwrap(), println!(
Err(_e) => { "client timeout {}: {} / {}",
// TODO fix id, total_recv, total_len
println!( );
"client timeout {}: {} / {}", //print_bin(data);
id, total_recv, total_len break 'repeat;
); }
//print_bin(data); };
break 'repeat; total_recv += resp.len();
} //dbg!(resp.len());
}; //crate::http::decode_http(&mut buf, &mut stream).await;
total_recv += resp.len(); }
//dbg!(resp.len()); /*if total_recv > total_len {
//crate::http::decode_http(&mut buf, &mut stream).await; println!("received too much {}: {} / {}", id, total_recv, total_len);
}*/
} }
/*if total_recv > total_len {
println!("received too much {}: {} / {}", id, total_recv, total_len);
}*/
} }
} }
//stream.get_mut().shutdown().await.unwrap();
tokio::time::timeout(TIMEOUT, stream.get_mut().shutdown())
.await
.unwrap()
.unwrap();
let cnt = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
println!("Client: {} / {}", cnt + 1, total);
} }
//stream.get_mut().shutdown().await.unwrap(); drop(limiter);
tokio::time::timeout(TIMEOUT, stream.get_mut().shutdown()) let mut running_guard = running.lock().await;
.await running_guard.remove(id);
.unwrap() drop(running_guard);
.unwrap(); }));
let cnt = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); //tokio::time::sleep(std::time::Duration::from_millis(500)).await;
println!("Client: {} / {}", cnt + 1, total); }
}
drop(limiter); for handle in handles {
let mut running_guard = running.lock().await; handle.await.unwrap();
running_guard.remove(id); }
drop(running_guard);
}));
//tokio::time::sleep(std::time::Duration::from_millis(500)).await;
} }
} }
TlsMode::None | TlsMode::Server => { TlsMode::None | TlsMode::Server => {
for (id, (_server_name, records)) in records.iter() { for _i in 0..repeat {
/*if *id != 33 { let mut handles = Vec::new();
continue for (id, (_server_name, records)) in records.iter() {
}*/ /*if *id != 33 {
let counter = counter.clone(); continue
let limiter = limiter.clone(); }*/
let running = running.clone(); let counter = counter.clone();
let debug_mutex = debug_mutex.clone(); let limiter = limiter.clone();
handles.push(tokio::spawn(async move { let running = running.clone();
let mut running_guard = running.lock().await; let debug_mutex = debug_mutex.clone();
running_guard.insert(*id); handles.push(tokio::spawn(async move {
drop(running_guard); let mut running_guard = running.lock().await;
let limiter = limiter.acquire().await.unwrap(); running_guard.insert(*id);
//let mut buf = Vec::new(); drop(running_guard);
'repeat: for _i in 0..repeat { let limiter = limiter.acquire().await.unwrap();
let stream = TcpStream::connect(connect_to).await.unwrap(); //let mut buf = Vec::new();
let mut stream = Framed::new(stream, crate::http::HttpClientCodec::new()); 'repeat: for _i in 0..1 {
/*let mut skip_recv = false; let stream = TcpStream::connect(connect_to).await.unwrap();
for (direction, data) in records { let mut stream =
match direction { Framed::new(stream, crate::http::HttpClientCodec::new());
Direction::ClientToServer => { /*let mut skip_recv = false;
skip_recv = false; for (direction, data) in records {
println!("[CLT] ({id}) >> {}", data.len()); match direction {
stream.write_all(data).await.unwrap(); Direction::ClientToServer => {
} skip_recv = false;
Direction::ServerToClient => { println!("[CLT] ({id}) >> {}", data.len());
if skip_recv { stream.write_all(data).await.unwrap();
continue;
} }
println!("[CLT] ({id}) << {}", data.len()); Direction::ServerToClient => {
//let mut buf = Vec::new(); if skip_recv {
//stream.read_buf(&mut buf).await.ok(); continue;
//let mut buf = vec![0; data.len().saturating_sub(50).max(1)];
let mut buf = vec![0; data.len()];
match tokio::time::timeout(
std::time::Duration::from_millis(500),
stream.readable(),
)
.await
{
Ok(r) => {
r.unwrap();
} }
Err(_) => { println!("[CLT] ({id}) << {}", data.len());
println!("[CLT] timeout recv ({id})"); //let mut buf = Vec::new();
break; //stream.read_buf(&mut buf).await.ok();
} //let mut buf = vec![0; data.len().saturating_sub(50).max(1)];
} let mut buf = vec![0; data.len()];
// TODO utiliser crate::http ici
match tokio::time::timeout(
std::time::Duration::from_millis(500),
stream.read_exact(&mut buf),
)
.await
{
Ok(r) => {
r.unwrap();
}
Err(_) => {
println!("[CLT] skip recv ({id})");
skip_recv = true;
}
}
}
}
}*/
for (direction, data_list) in ResponseStreamer::new(records.iter()) {
match direction {
Direction::ClientToServer => {
for data in data_list.into_iter() {
if debug {
//println!("[CLT] ({id}) >> {}", str::from_utf8(&data[..data.len().min(255)]).unwrap());
println!("[CLT] ({id}) >> {}", data.len());
}
//stream.get_mut().write_all(data).await.unwrap();
match tokio::time::timeout( match tokio::time::timeout(
TIMEOUT, std::time::Duration::from_millis(500),
stream.get_mut().write_all(data), stream.readable(),
) )
.await .await
{ {
Ok(v) => v.unwrap(), Ok(r) => {
Err(_e) => { r.unwrap();
println!("client timeout {id} (sending)"); }
continue 'repeat; Err(_) => {
println!("[CLT] timeout recv ({id})");
break;
}
}
// TODO utiliser crate::http ici
match tokio::time::timeout(
std::time::Duration::from_millis(500),
stream.read_exact(&mut buf),
)
.await
{
Ok(r) => {
r.unwrap();
}
Err(_) => {
println!("[CLT] skip recv ({id})");
skip_recv = true;
} }
} }
} }
} }
Direction::ServerToClient => { }*/
let total_len: usize = for (direction, data_list) in ResponseStreamer::new(records.iter()) {
data_list.iter().map(|data| data.len()).sum::<usize>(); match direction {
let reduced_len = Direction::ClientToServer => {
total_len.saturating_sub(160 * data_list.len()).max(1); for data in data_list.into_iter() {
let mut total_recv = 0; if debug {
if debug { //println!("[CLT] ({id}) >> {}", str::from_utf8(&data[..data.len().min(255)]).unwrap());
println!("[CLT] ({id}) << {total_len}"); println!("[CLT] ({id}) >> {}", data.len());
}
//let mut buf = Vec::new();
//stream.read_buf(&mut buf).await.ok();
//let mut buf = vec![0; data.len().saturating_sub(50).max(1)];
while total_recv < reduced_len {
let resp = match tokio::time::timeout(
TIMEOUT,
stream.next(),
)
.await
{
Ok(None) => break,
Ok(Some(v)) => v.unwrap(),
Err(_e) => {
// TODO fix
println!(
"client timeout {}: {} / {}",
id, total_recv, total_len
);
//print_bin(data);
break 'repeat;
} }
}; //stream.get_mut().write_all(data).await.unwrap();
total_recv += resp.len(); match tokio::time::timeout(
/*if resp.len() != data.len() { TIMEOUT,
let guard = debug_mutex.lock().await; stream.get_mut().write_all(data),
println!("RECV NOT ENOUGH {} / {}", resp.len(), data.len()); )
if resp.len() < 1000 && data.len() < 1000 { .await
//print_bin(&resp); {
//println!("WANTED"); Ok(v) => v.unwrap(),
//print_bin(data); Err(_e) => {
println!("client timeout {id} (sending)");
continue 'repeat;
}
} }
std::mem::drop(guard); }
}*/
//print_bin(&resp);
//let resp = stream.next().await.unwrap().unwrap();
//dbg!(resp.len());
//crate::http::decode_http(&mut buf, &mut stream).await;
//buf.clear();
} }
if total_recv < reduced_len { Direction::ServerToClient => {
println!( let total_len: usize =
"({}) RECV NOT ENOUGH {} / {}", data_list.iter().map(|data| data.len()).sum::<usize>();
id, total_recv, total_len let reduced_len =
); total_len.saturating_sub(160 * data_list.len()).max(1);
} else if debug { let mut total_recv = 0;
println!("[CLT] ({id}) << {total_len} OK"); if debug {
println!("[CLT] ({id}) << {total_len}");
}
//let mut buf = Vec::new();
//stream.read_buf(&mut buf).await.ok();
//let mut buf = vec![0; data.len().saturating_sub(50).max(1)];
let mut resp = Vec::new();
while total_recv < reduced_len {
resp =
match tokio::time::timeout(TIMEOUT, stream.next())
.await
{
Ok(None) => break,
Ok(Some(v)) => v.unwrap(),
Err(_e) => {
// TODO fix
println!(
"client timeout {}: {} / {}",
id, total_recv, total_len
);
//print_bin(data);
break 'repeat;
}
};
total_recv += resp.len();
/*if resp.len() != data.len() {
let guard = debug_mutex.lock().await;
println!("RECV NOT ENOUGH {} / {}", resp.len(), data.len());
if resp.len() < 1000 && data.len() < 1000 {
//print_bin(&resp);
//println!("WANTED");
//print_bin(data);
}
std::mem::drop(guard);
}*/
//print_bin(&resp);
//let resp = stream.next().await.unwrap().unwrap();
//dbg!(resp.len());
//crate::http::decode_http(&mut buf, &mut stream).await;
//buf.clear();
}
if total_recv < reduced_len {
println!(
"({}) RECV NOT ENOUGH {} / {}",
id, total_recv, total_len
);
if resp.len() < 1024 {
print_bin(&resp);
}
} else if debug {
println!("[CLT] ({id}) << {total_len} OK");
}
} }
} }
} }
//stream.get_mut().shutdown().await.unwrap();
tokio::time::timeout(TIMEOUT, stream.get_mut().shutdown())
.await
.unwrap()
.unwrap();
let cnt = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
println!("Client: {} / {}", cnt + 1, total);
} }
//stream.get_mut().shutdown().await.unwrap(); drop(limiter);
tokio::time::timeout(TIMEOUT, stream.get_mut().shutdown()) let mut running_guard = running.lock().await;
.await running_guard.remove(id);
.unwrap() drop(running_guard);
.unwrap(); }));
let cnt = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); //tokio::time::sleep(std::time::Duration::from_millis(500)).await;
println!("Client: {} / {}", cnt + 1, total); }
}
drop(limiter); for handle in handles {
let mut running_guard = running.lock().await; handle.await.unwrap();
running_guard.remove(id);
drop(running_guard);
}));
//tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
}
tokio::spawn({
let running = running.clone();
async move {
let mut last_count = 0;
loop {
tokio::time::sleep(TIMEOUT).await;
println!("Running: {:?}", running.lock().await);
let new_count = counter.load(std::sync::atomic::Ordering::Relaxed);
if new_count == last_count {
println!("Stalled at {} / {}, stopping", new_count, total);
std::process::exit(0);
} }
last_count = new_count;
} }
} }
});
for handle in handles {
handle.await.unwrap();
} }
println!("Unfinished: {:?}", running.lock().await); println!("Unfinished: {:?}", running.lock().await);
std::process::exit(0); std::process::exit(0);

View file

@ -212,8 +212,19 @@ pub async fn play(
.map_err(|e| panic!("{e:?} with name `{server_name}`")) .map_err(|e| panic!("{e:?} with name `{server_name}`"))
.unwrap(); .unwrap();
let mut stream = Framed::new(stream, crate::http::HttpServerCodec::new()); let mut stream = Framed::new(stream, crate::http::HttpServerCodec::new());
let mut break_next = false;
//let mut previous = Vec::new(); //let mut previous = Vec::new();
while let Some(req) = stream.next().await { loop {
let Ok(req) = tokio::time::timeout(tokio::time::Duration::from_secs(1), stream.next()).await else {
if break_next {
break;
} else {
continue;
}
};
let Some(req) = req else {
break;
};
let req = req.unwrap(); let req = req.unwrap();
//println!("REQUEST"); //println!("REQUEST");
//print_bin(&req); //print_bin(&req);
@ -265,7 +276,7 @@ pub async fn play(
stream.flush().await.unwrap(); stream.flush().await.unwrap();
} }
if *last { if *last {
break; break_next = true;
} }
} else { } else {
println!("No response found for SNI=`{server_name}`"); println!("No response found for SNI=`{server_name}`");
@ -385,7 +396,7 @@ pub async fn play(
} }
} }
if *last { if *last {
break; //break;
} }
} else { } else {
println!("[SRV] No response found"); println!("[SRV] No response found");