Limit number of alive tasks

This commit is contained in:
Pascal Engélibert 2026-03-20 16:49:04 +01:00
commit 35417a200a

View file

@ -192,15 +192,14 @@ pub async fn play(
} }
config.key_log = Arc::new(tokio_rustls::rustls::KeyLogFile::new()); config.key_log = Arc::new(tokio_rustls::rustls::KeyLogFile::new());
let config = Arc::new(config); let config = Arc::new(config);
let mut handles = Vec::new();
for _i in 0..repeat { for _i in 0..repeat {
for (conn_id, (server_name, records)) in records.iter() { for (conn_id, (server_name, records)) in records.iter() {
let limiter = limiter.acquire().await.unwrap();
let connector = TlsConnector::from(config.clone()); let connector = TlsConnector::from(config.clone());
handles.push(tokio::spawn(async move { tokio::spawn(async move {
let mut running_guard = running.lock().await; let mut running_guard = running.lock().await;
running_guard.insert(*conn_id); running_guard.insert(*conn_id);
drop(running_guard); drop(running_guard);
let limiter = limiter.acquire().await.unwrap();
let server_name = let server_name =
ServerName::try_from(String::from_utf8(server_name.clone()).unwrap()) ServerName::try_from(String::from_utf8(server_name.clone()).unwrap())
.unwrap(); .unwrap();
@ -269,29 +268,28 @@ pub async fn play(
if debug { if debug {
println!("Client: {} / {}", cnt + 1, total); println!("Client: {} / {}", cnt + 1, total);
} }
drop(limiter);
if let Some(notify_socket) = &notify_socket { if let Some(notify_socket) = &notify_socket {
notify_socket.send(&cnt.to_be_bytes()).ok(); notify_socket.send(&cnt.to_be_bytes()).ok();
} }
let mut running_guard = running.lock().await; let mut running_guard = running.lock().await;
running_guard.remove(conn_id); running_guard.remove(conn_id);
drop(running_guard); drop(running_guard);
})); drop(limiter);
});
//tokio::time::sleep(std::time::Duration::from_millis(500)).await; //tokio::time::sleep(std::time::Duration::from_millis(500)).await;
} }
} }
for handle in handles { while limiter.available_permits() < concurrency {
handle.await.unwrap(); tokio::time::sleep(Duration::from_secs(1)).await;
} }
} else { } else {
let mut handles = Vec::new();
for _i in 0..repeat { for _i in 0..repeat {
for (conn_id, (_server_name, records)) in records.iter() { for (conn_id, (_server_name, records)) in records.iter() {
handles.push(tokio::spawn(async move { let limiter = limiter.acquire().await.unwrap();
tokio::spawn(async move {
let mut running_guard = running.lock().await; let mut running_guard = running.lock().await;
running_guard.insert(*conn_id); running_guard.insert(*conn_id);
drop(running_guard); drop(running_guard);
let limiter = limiter.acquire().await.unwrap();
let stream = TcpStream::connect(connect_to).await.unwrap(); let stream = TcpStream::connect(connect_to).await.unwrap();
let mut stream = crate::codec::StreamCodec::new(stream); let mut stream = crate::codec::StreamCodec::new(stream);
for (direction, reqs) in ResponseStreamer::new(records.iter()) { for (direction, reqs) in ResponseStreamer::new(records.iter()) {
@ -353,19 +351,19 @@ pub async fn play(
if debug { if debug {
println!("Client: {} / {}", cnt, total); println!("Client: {} / {}", cnt, total);
} }
drop(limiter);
if let Some(notify_socket) = notify_socket { if let Some(notify_socket) = notify_socket {
notify_socket.send(&cnt.to_be_bytes()).ok(); notify_socket.send(&cnt.to_be_bytes()).ok();
} }
let mut running_guard = running.lock().await; let mut running_guard = running.lock().await;
running_guard.remove(conn_id); running_guard.remove(conn_id);
drop(running_guard); drop(running_guard);
})); drop(limiter);
});
//tokio::time::sleep(std::time::Duration::from_millis(500)).await; //tokio::time::sleep(std::time::Duration::from_millis(500)).await;
} }
} }
for handle in handles { while limiter.available_permits() < concurrency {
handle.await.unwrap(); tokio::time::sleep(Duration::from_secs(1)).await;
} }
} }
println!("Unfinished: {:?}", running.lock().await); println!("Unfinished: {:?}", running.lock().await);