From 66d8aeb4a567a2f0c10158274bb0d5176b59d248 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Eng=C3=A9libert?= Date: Tue, 3 Feb 2026 11:33:24 +0100 Subject: [PATCH] Tentative fix client setup --- src/client.rs | 519 ++++++++++++++++++++++++++------------------------ src/server.rs | 17 +- 2 files changed, 281 insertions(+), 255 deletions(-) diff --git a/src/client.rs b/src/client.rs index b47b50a..404c523 100644 --- a/src/client.rs +++ b/src/client.rs @@ -29,7 +29,7 @@ use tokio_rustls::{ }; use tokio_util::codec::Framed; -const TIMEOUT: Duration = Duration::from_secs(60); +const TIMEOUT: Duration = Duration::from_secs(30); #[derive(Debug)] struct DummyCertVerifier; @@ -98,13 +98,31 @@ pub async fn play( sync_receiver.await.unwrap(); // Semaphore used to limit the number of concurrent clients. // Its handle is released when the task panics. - let limiter = Arc::new(Semaphore::new(32)); + let limiter = Arc::new(Semaphore::new(16)); let counter = Arc::new(AtomicU32::new(0)); let running = Arc::new(Mutex::new(HashSet::new())); let total = records.len() * repeat as usize; - let mut handles = Vec::new(); let connect_to = connect_to.to_socket_addrs().unwrap().next().unwrap(); let debug_mutex = Arc::new(Mutex::new(())); + + tokio::spawn({ + let running = running.clone(); + let counter = counter.clone(); + async move { + let mut last_count = 0; + loop { + tokio::time::sleep(TIMEOUT).await; + println!("Running: {:?}", running.lock().await); + let new_count = counter.load(std::sync::atomic::Ordering::Relaxed); + if new_count == last_count { + println!("Stalled at {} / {}, stopping", new_count, total); + std::process::exit(0); + } + last_count = new_count; + } + } + }); + match tls_mode { TlsMode::Both | TlsMode::Client => { let mut config = tokio_rustls::rustls::ClientConfig::builder() @@ -125,285 +143,282 @@ pub async fn play( } config.key_log = Arc::new(tokio_rustls::rustls::KeyLogFile::new()); let config = Arc::new(config); - for (id, (server_name, records)) in records.iter() { - let connector = TlsConnector::from(config.clone()); - let counter = counter.clone(); - let limiter = limiter.clone(); - let running = running.clone(); - handles.push(tokio::spawn(async move { - let mut running_guard = running.lock().await; - running_guard.insert(*id); - drop(running_guard); - let limiter = limiter.acquire().await.unwrap(); - let server_name = - ServerName::try_from(String::from_utf8(server_name.clone()).unwrap()) - .unwrap(); - 'repeat: for _i in 0..repeat { - let stream = TcpStream::connect(connect_to).await.unwrap(); - let stream = connector - .connect(server_name.clone(), stream) - .await - .unwrap(); - let mut stream = Framed::new(stream, crate::http::HttpClientCodec::new()); - for (direction, data_list) in ResponseStreamer::new(records.iter()) { - match direction { - Direction::ClientToServer => { - for data in data_list { - //println!("[CLT] ({id}) >> {}", data.len()); - //stream.get_mut().write_all(data).await.unwrap(); - match tokio::time::timeout( - TIMEOUT, - stream.get_mut().write_all(data), - ) - .await - { - Ok(v) => v.unwrap(), - Err(_e) => { - println!("client timeout {id} (sending)"); - continue 'repeat; + for _i in 0..repeat { + let mut handles = Vec::new(); + for (id, (server_name, records)) in records.iter() { + let connector = TlsConnector::from(config.clone()); + let counter = counter.clone(); + let limiter = limiter.clone(); + let running = running.clone(); + handles.push(tokio::spawn(async move { + let mut running_guard = running.lock().await; + running_guard.insert(*id); + drop(running_guard); + let limiter = limiter.acquire().await.unwrap(); + let server_name = + ServerName::try_from(String::from_utf8(server_name.clone()).unwrap()) + .unwrap(); + 'repeat: for _i in 0..1 { + let stream = TcpStream::connect(connect_to).await.unwrap(); + let stream = connector + .connect(server_name.clone(), stream) + .await + .unwrap(); + let mut stream = + Framed::new(stream, crate::http::HttpClientCodec::new()); + for (direction, data_list) in ResponseStreamer::new(records.iter()) { + match direction { + Direction::ClientToServer => { + for data in data_list { + //println!("[CLT] ({id}) >> {}", data.len()); + //stream.get_mut().write_all(data).await.unwrap(); + match tokio::time::timeout( + TIMEOUT, + stream.get_mut().write_all(data), + ) + .await + { + Ok(v) => v.unwrap(), + Err(_e) => { + println!("client timeout {id} (sending)"); + continue 'repeat; + } } } } - } - Direction::ServerToClient => { - let total_len: usize = - data_list.iter().map(|data| data.len()).sum::(); - let reduced_len = - total_len.saturating_sub(160 * data_list.len()).max(1); - let mut total_recv = 0; - //println!("[CLT] ({id}) << {}", data.len()); - // let mut buf = Vec::new(); - // stream.read_buf(&mut buf).await.ok(); - //let mut buf = vec![0; data.len().saturating_sub(50).max(1)]; - //let resp = stream.next().await.unwrap().unwrap(); - while total_recv < reduced_len { - let resp = match tokio::time::timeout( - TIMEOUT, - stream.next(), - ) - .await - { - Ok(v) => v.unwrap().unwrap(), - Err(_e) => { - // TODO fix - println!( - "client timeout {}: {} / {}", - id, total_recv, total_len - ); - //print_bin(data); - break 'repeat; - } - }; - total_recv += resp.len(); - //dbg!(resp.len()); - //crate::http::decode_http(&mut buf, &mut stream).await; + Direction::ServerToClient => { + let total_len: usize = + data_list.iter().map(|data| data.len()).sum::(); + let reduced_len = + total_len.saturating_sub(160 * data_list.len()).max(1); + let mut total_recv = 0; + //println!("[CLT] ({id}) << {}", data.len()); + // let mut buf = Vec::new(); + // stream.read_buf(&mut buf).await.ok(); + //let mut buf = vec![0; data.len().saturating_sub(50).max(1)]; + //let resp = stream.next().await.unwrap().unwrap(); + while total_recv < reduced_len { + let resp = + match tokio::time::timeout(TIMEOUT, stream.next()) + .await + { + Ok(v) => v.unwrap().unwrap(), + Err(_e) => { + // TODO fix + println!( + "client timeout {}: {} / {}", + id, total_recv, total_len + ); + //print_bin(data); + break 'repeat; + } + }; + total_recv += resp.len(); + //dbg!(resp.len()); + //crate::http::decode_http(&mut buf, &mut stream).await; + } + /*if total_recv > total_len { + println!("received too much {}: {} / {}", id, total_recv, total_len); + }*/ } - /*if total_recv > total_len { - println!("received too much {}: {} / {}", id, total_recv, total_len); - }*/ } } + //stream.get_mut().shutdown().await.unwrap(); + tokio::time::timeout(TIMEOUT, stream.get_mut().shutdown()) + .await + .unwrap() + .unwrap(); + let cnt = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + println!("Client: {} / {}", cnt + 1, total); } - //stream.get_mut().shutdown().await.unwrap(); - tokio::time::timeout(TIMEOUT, stream.get_mut().shutdown()) - .await - .unwrap() - .unwrap(); - let cnt = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - println!("Client: {} / {}", cnt + 1, total); - } - drop(limiter); - let mut running_guard = running.lock().await; - running_guard.remove(id); - drop(running_guard); - })); - //tokio::time::sleep(std::time::Duration::from_millis(500)).await; + drop(limiter); + let mut running_guard = running.lock().await; + running_guard.remove(id); + drop(running_guard); + })); + //tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + + for handle in handles { + handle.await.unwrap(); + } } } TlsMode::None | TlsMode::Server => { - for (id, (_server_name, records)) in records.iter() { - /*if *id != 33 { - continue - }*/ - let counter = counter.clone(); - let limiter = limiter.clone(); - let running = running.clone(); - let debug_mutex = debug_mutex.clone(); - handles.push(tokio::spawn(async move { - let mut running_guard = running.lock().await; - running_guard.insert(*id); - drop(running_guard); - let limiter = limiter.acquire().await.unwrap(); - //let mut buf = Vec::new(); - 'repeat: for _i in 0..repeat { - let stream = TcpStream::connect(connect_to).await.unwrap(); - let mut stream = Framed::new(stream, crate::http::HttpClientCodec::new()); - /*let mut skip_recv = false; - for (direction, data) in records { - match direction { - Direction::ClientToServer => { - skip_recv = false; - println!("[CLT] ({id}) >> {}", data.len()); - stream.write_all(data).await.unwrap(); - } - Direction::ServerToClient => { - if skip_recv { - continue; + for _i in 0..repeat { + let mut handles = Vec::new(); + for (id, (_server_name, records)) in records.iter() { + /*if *id != 33 { + continue + }*/ + let counter = counter.clone(); + let limiter = limiter.clone(); + let running = running.clone(); + let debug_mutex = debug_mutex.clone(); + handles.push(tokio::spawn(async move { + let mut running_guard = running.lock().await; + running_guard.insert(*id); + drop(running_guard); + let limiter = limiter.acquire().await.unwrap(); + //let mut buf = Vec::new(); + 'repeat: for _i in 0..1 { + let stream = TcpStream::connect(connect_to).await.unwrap(); + let mut stream = + Framed::new(stream, crate::http::HttpClientCodec::new()); + /*let mut skip_recv = false; + for (direction, data) in records { + match direction { + Direction::ClientToServer => { + skip_recv = false; + println!("[CLT] ({id}) >> {}", data.len()); + stream.write_all(data).await.unwrap(); } - println!("[CLT] ({id}) << {}", data.len()); - //let mut buf = Vec::new(); - //stream.read_buf(&mut buf).await.ok(); - //let mut buf = vec![0; data.len().saturating_sub(50).max(1)]; - let mut buf = vec![0; data.len()]; - match tokio::time::timeout( - std::time::Duration::from_millis(500), - stream.readable(), - ) - .await - { - Ok(r) => { - r.unwrap(); + Direction::ServerToClient => { + if skip_recv { + continue; } - Err(_) => { - println!("[CLT] timeout recv ({id})"); - break; - } - } - // TODO utiliser crate::http ici - match tokio::time::timeout( - std::time::Duration::from_millis(500), - stream.read_exact(&mut buf), - ) - .await - { - Ok(r) => { - r.unwrap(); - } - Err(_) => { - println!("[CLT] skip recv ({id})"); - skip_recv = true; - } - } - } - } - }*/ - for (direction, data_list) in ResponseStreamer::new(records.iter()) { - match direction { - Direction::ClientToServer => { - for data in data_list.into_iter() { - if debug { - //println!("[CLT] ({id}) >> {}", str::from_utf8(&data[..data.len().min(255)]).unwrap()); - println!("[CLT] ({id}) >> {}", data.len()); - } - //stream.get_mut().write_all(data).await.unwrap(); + println!("[CLT] ({id}) << {}", data.len()); + //let mut buf = Vec::new(); + //stream.read_buf(&mut buf).await.ok(); + //let mut buf = vec![0; data.len().saturating_sub(50).max(1)]; + let mut buf = vec![0; data.len()]; match tokio::time::timeout( - TIMEOUT, - stream.get_mut().write_all(data), + std::time::Duration::from_millis(500), + stream.readable(), ) .await { - Ok(v) => v.unwrap(), - Err(_e) => { - println!("client timeout {id} (sending)"); - continue 'repeat; + Ok(r) => { + r.unwrap(); + } + Err(_) => { + println!("[CLT] timeout recv ({id})"); + break; + } + } + // TODO utiliser crate::http ici + match tokio::time::timeout( + std::time::Duration::from_millis(500), + stream.read_exact(&mut buf), + ) + .await + { + Ok(r) => { + r.unwrap(); + } + Err(_) => { + println!("[CLT] skip recv ({id})"); + skip_recv = true; } } } } - Direction::ServerToClient => { - let total_len: usize = - data_list.iter().map(|data| data.len()).sum::(); - let reduced_len = - total_len.saturating_sub(160 * data_list.len()).max(1); - let mut total_recv = 0; - if debug { - println!("[CLT] ({id}) << {total_len}"); - } - //let mut buf = Vec::new(); - //stream.read_buf(&mut buf).await.ok(); - //let mut buf = vec![0; data.len().saturating_sub(50).max(1)]; - while total_recv < reduced_len { - let resp = match tokio::time::timeout( - TIMEOUT, - stream.next(), - ) - .await - { - Ok(None) => break, - Ok(Some(v)) => v.unwrap(), - Err(_e) => { - // TODO fix - println!( - "client timeout {}: {} / {}", - id, total_recv, total_len - ); - //print_bin(data); - break 'repeat; + }*/ + for (direction, data_list) in ResponseStreamer::new(records.iter()) { + match direction { + Direction::ClientToServer => { + for data in data_list.into_iter() { + if debug { + //println!("[CLT] ({id}) >> {}", str::from_utf8(&data[..data.len().min(255)]).unwrap()); + println!("[CLT] ({id}) >> {}", data.len()); } - }; - total_recv += resp.len(); - /*if resp.len() != data.len() { - let guard = debug_mutex.lock().await; - println!("RECV NOT ENOUGH {} / {}", resp.len(), data.len()); - if resp.len() < 1000 && data.len() < 1000 { - //print_bin(&resp); - //println!("WANTED"); - //print_bin(data); + //stream.get_mut().write_all(data).await.unwrap(); + match tokio::time::timeout( + TIMEOUT, + stream.get_mut().write_all(data), + ) + .await + { + Ok(v) => v.unwrap(), + Err(_e) => { + println!("client timeout {id} (sending)"); + continue 'repeat; + } } - std::mem::drop(guard); - }*/ - //print_bin(&resp); - //let resp = stream.next().await.unwrap().unwrap(); - //dbg!(resp.len()); - //crate::http::decode_http(&mut buf, &mut stream).await; - //buf.clear(); + } } - if total_recv < reduced_len { - println!( - "({}) RECV NOT ENOUGH {} / {}", - id, total_recv, total_len - ); - } else if debug { - println!("[CLT] ({id}) << {total_len} OK"); + Direction::ServerToClient => { + let total_len: usize = + data_list.iter().map(|data| data.len()).sum::(); + let reduced_len = + total_len.saturating_sub(160 * data_list.len()).max(1); + let mut total_recv = 0; + if debug { + println!("[CLT] ({id}) << {total_len}"); + } + //let mut buf = Vec::new(); + //stream.read_buf(&mut buf).await.ok(); + //let mut buf = vec![0; data.len().saturating_sub(50).max(1)]; + let mut resp = Vec::new(); + while total_recv < reduced_len { + resp = + match tokio::time::timeout(TIMEOUT, stream.next()) + .await + { + Ok(None) => break, + Ok(Some(v)) => v.unwrap(), + Err(_e) => { + // TODO fix + println!( + "client timeout {}: {} / {}", + id, total_recv, total_len + ); + //print_bin(data); + break 'repeat; + } + }; + total_recv += resp.len(); + /*if resp.len() != data.len() { + let guard = debug_mutex.lock().await; + println!("RECV NOT ENOUGH {} / {}", resp.len(), data.len()); + if resp.len() < 1000 && data.len() < 1000 { + //print_bin(&resp); + //println!("WANTED"); + //print_bin(data); + } + std::mem::drop(guard); + }*/ + //print_bin(&resp); + //let resp = stream.next().await.unwrap().unwrap(); + //dbg!(resp.len()); + //crate::http::decode_http(&mut buf, &mut stream).await; + //buf.clear(); + } + if total_recv < reduced_len { + println!( + "({}) RECV NOT ENOUGH {} / {}", + id, total_recv, total_len + ); + if resp.len() < 1024 { + print_bin(&resp); + } + } else if debug { + println!("[CLT] ({id}) << {total_len} OK"); + } } } } + //stream.get_mut().shutdown().await.unwrap(); + tokio::time::timeout(TIMEOUT, stream.get_mut().shutdown()) + .await + .unwrap() + .unwrap(); + let cnt = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + println!("Client: {} / {}", cnt + 1, total); } - //stream.get_mut().shutdown().await.unwrap(); - tokio::time::timeout(TIMEOUT, stream.get_mut().shutdown()) - .await - .unwrap() - .unwrap(); - let cnt = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); - println!("Client: {} / {}", cnt + 1, total); - } - drop(limiter); - let mut running_guard = running.lock().await; - running_guard.remove(id); - drop(running_guard); - })); - //tokio::time::sleep(std::time::Duration::from_millis(500)).await; - } - } - } - tokio::spawn({ - let running = running.clone(); - async move { - let mut last_count = 0; - loop { - tokio::time::sleep(TIMEOUT).await; - println!("Running: {:?}", running.lock().await); - let new_count = counter.load(std::sync::atomic::Ordering::Relaxed); - if new_count == last_count { - println!("Stalled at {} / {}, stopping", new_count, total); - std::process::exit(0); + drop(limiter); + let mut running_guard = running.lock().await; + running_guard.remove(id); + drop(running_guard); + })); + //tokio::time::sleep(std::time::Duration::from_millis(500)).await; + } + + for handle in handles { + handle.await.unwrap(); } - last_count = new_count; } } - }); - for handle in handles { - handle.await.unwrap(); } println!("Unfinished: {:?}", running.lock().await); std::process::exit(0); diff --git a/src/server.rs b/src/server.rs index 0a5f31a..9a7467f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -212,8 +212,19 @@ pub async fn play( .map_err(|e| panic!("{e:?} with name `{server_name}`")) .unwrap(); let mut stream = Framed::new(stream, crate::http::HttpServerCodec::new()); + let mut break_next = false; //let mut previous = Vec::new(); - while let Some(req) = stream.next().await { + loop { + let Ok(req) = tokio::time::timeout(tokio::time::Duration::from_secs(1), stream.next()).await else { + if break_next { + break; + } else { + continue; + } + }; + let Some(req) = req else { + break; + }; let req = req.unwrap(); //println!("REQUEST"); //print_bin(&req); @@ -265,7 +276,7 @@ pub async fn play( stream.flush().await.unwrap(); } if *last { - break; + break_next = true; } } else { println!("No response found for SNI=`{server_name}`"); @@ -385,7 +396,7 @@ pub async fn play( } } if *last { - break; + //break; } } else { println!("[SRV] No response found");