This commit is contained in:
Pascal Engélibert 2025-11-05 14:17:38 +01:00
commit 90a9196a9d
10 changed files with 1124 additions and 52 deletions

View file

@ -3,11 +3,15 @@ use crate::{
record::{Direction, Records},
};
use std::{net::ToSocketAddrs, sync::Arc};
use futures_util::StreamExt;
use std::{
net::ToSocketAddrs,
sync::{Arc, atomic::AtomicU32},
};
use tokio::{
io::{AsyncReadExt, AsyncWriteExt},
io::AsyncWriteExt,
net::TcpStream,
sync::oneshot,
sync::{Semaphore, oneshot},
};
use tokio_rustls::{
TlsConnector,
@ -17,6 +21,7 @@ use tokio_rustls::{
pki_types::ServerName,
},
};
use tokio_util::codec::Framed;
#[derive(Debug)]
struct DummyCertVerifier;
@ -82,6 +87,11 @@ pub async fn play(
repeat: u32,
) {
sync_receiver.await.unwrap();
// Semaphore used to limit the number of concurrent clients.
// Its handle is released when the task panics.
let limiter = Arc::new(Semaphore::new(32));
let counter = Arc::new(AtomicU32::new(0));
let total = records.len() * repeat as usize;
let mut handles = Vec::new();
let connect_to = connect_to.to_socket_addrs().unwrap().next().unwrap();
match tls_mode {
@ -92,58 +102,159 @@ pub async fn play(
.with_custom_certificate_verifier(Arc::new(DummyCertVerifier))
.with_no_client_auth(),
);
for (_id, (server_name, records)) in records.iter() {
for (id, (server_name, records)) in records.iter() {
let connector = TlsConnector::from(config.clone());
let counter = counter.clone();
let limiter = limiter.clone();
handles.push(tokio::spawn(async move {
let limiter = limiter.acquire().await.unwrap();
let server_name =
ServerName::try_from(String::from_utf8(server_name.clone()).unwrap())
.unwrap();
for _i in 0..repeat {
let stream = TcpStream::connect(connect_to).await.unwrap();
let mut stream = connector
let stream = connector
.connect(server_name.clone(), stream)
.await
.unwrap();
let mut stream = Framed::new(stream, crate::http::HttpCodec {});
for (direction, data) in records {
match direction {
Direction::ClientToServer => {
stream.write_all(data).await.unwrap();
println!("[CLT] ({id}) >> {}", data.len());
stream.get_mut().write_all(data).await.unwrap();
}
Direction::ServerToClient => {
let mut buf = Vec::new();
stream.read_buf(&mut buf).await.ok();
println!("[CLT] ({id}) << {}", data.len());
// let mut buf = Vec::new();
// stream.read_buf(&mut buf).await.ok();
//let mut buf = vec![0; data.len().saturating_sub(50).max(1)];
//let resp = stream.next().await.unwrap().unwrap();
let resp = tokio::time::timeout(
std::time::Duration::from_millis(500),
stream.next(),
)
.await
.unwrap()
.unwrap()
.unwrap();
dbg!(resp.len());
//crate::http::decode_http(&mut buf, &mut stream).await;
}
}
}
stream.shutdown().await.unwrap();
stream.get_mut().shutdown().await.unwrap();
let cnt = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
println!("Client: {} / {}", cnt + 1, total);
}
drop(limiter);
}));
//tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
TlsMode::None | TlsMode::Server => {
for (_id, (_server_name, records)) in records.iter() {
for (id, (_server_name, records)) in records.iter() {
/*if *id != 33 {
continue
}*/
let counter = counter.clone();
let limiter = limiter.clone();
handles.push(tokio::spawn(async move {
dbg!(limiter.available_permits());
let limiter = limiter.acquire().await.unwrap();
//let mut buf = Vec::new();
for _i in 0..repeat {
let mut stream = TcpStream::connect(connect_to).await.unwrap();
dbg!();
let stream = TcpStream::connect(connect_to).await.unwrap();
let mut stream = Framed::new(stream, crate::http::HttpCodec {});
/*let mut skip_recv = false;
for (direction, data) in records {
match direction {
Direction::ClientToServer => {
skip_recv = false;
println!("[CLT] ({id}) >> {}", data.len());
stream.write_all(data).await.unwrap();
}
Direction::ServerToClient => {
let mut buf = Vec::new();
stream.read_buf(&mut buf).await.ok();
if skip_recv {
continue;
}
println!("[CLT] ({id}) << {}", data.len());
//let mut buf = Vec::new();
//stream.read_buf(&mut buf).await.ok();
//let mut buf = vec![0; data.len().saturating_sub(50).max(1)];
let mut buf = vec![0; data.len()];
match tokio::time::timeout(
std::time::Duration::from_millis(500),
stream.readable(),
)
.await
{
Ok(r) => {
r.unwrap();
}
Err(_) => {
println!("[CLT] timeout recv ({id})");
break;
}
}
// TODO utiliser crate::http ici
match tokio::time::timeout(
std::time::Duration::from_millis(500),
stream.read_exact(&mut buf),
)
.await
{
Ok(r) => {
r.unwrap();
}
Err(_) => {
println!("[CLT] skip recv ({id})");
skip_recv = true;
}
}
}
}
}*/
for (direction, data) in records {
match direction {
Direction::ClientToServer => {
println!("[CLT] ({id}) >> {}", data.len());
stream.get_mut().write_all(data).await.unwrap();
}
Direction::ServerToClient => {
println!("[CLT] ({id}) << {}", data.len());
//let mut buf = Vec::new();
//stream.read_buf(&mut buf).await.ok();
//let mut buf = vec![0; data.len().saturating_sub(50).max(1)];
let resp = tokio::time::timeout(
std::time::Duration::from_millis(500),
stream.next(),
)
.await
.unwrap()
.unwrap()
.unwrap();
//let resp = stream.next().await.unwrap().unwrap();
dbg!(resp.len());
//crate::http::decode_http(&mut buf, &mut stream).await;
//buf.clear();
}
}
}
stream.shutdown().await.unwrap();
dbg!();
stream.get_mut().shutdown().await.unwrap();
let cnt = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
println!("Client: {} / {}", cnt + 1, total);
}
drop(limiter);
}));
//tokio::time::sleep(std::time::Duration::from_millis(500)).await;
}
}
}
for handle in handles {
handle.await.unwrap();
}
//std::process::exit(0);
std::process::exit(0);
}