From 35417a200a408ee96307e48f8e1ee13525c6135c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Pascal=20Eng=C3=A9libert?= Date: Fri, 20 Mar 2026 16:49:04 +0100 Subject: [PATCH] Limit number of alive tasks --- src/client.rs | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/src/client.rs b/src/client.rs index 7ad9a27..2fa579f 100644 --- a/src/client.rs +++ b/src/client.rs @@ -192,15 +192,14 @@ pub async fn play( } config.key_log = Arc::new(tokio_rustls::rustls::KeyLogFile::new()); let config = Arc::new(config); - let mut handles = Vec::new(); for _i in 0..repeat { for (conn_id, (server_name, records)) in records.iter() { + let limiter = limiter.acquire().await.unwrap(); let connector = TlsConnector::from(config.clone()); - handles.push(tokio::spawn(async move { + tokio::spawn(async move { let mut running_guard = running.lock().await; running_guard.insert(*conn_id); drop(running_guard); - let limiter = limiter.acquire().await.unwrap(); let server_name = ServerName::try_from(String::from_utf8(server_name.clone()).unwrap()) .unwrap(); @@ -269,29 +268,28 @@ pub async fn play( if debug { println!("Client: {} / {}", cnt + 1, total); } - drop(limiter); if let Some(notify_socket) = ¬ify_socket { notify_socket.send(&cnt.to_be_bytes()).ok(); } let mut running_guard = running.lock().await; running_guard.remove(conn_id); drop(running_guard); - })); + drop(limiter); + }); //tokio::time::sleep(std::time::Duration::from_millis(500)).await; } } - for handle in handles { - handle.await.unwrap(); + while limiter.available_permits() < concurrency { + tokio::time::sleep(Duration::from_secs(1)).await; } } else { - let mut handles = Vec::new(); for _i in 0..repeat { for (conn_id, (_server_name, records)) in records.iter() { - handles.push(tokio::spawn(async move { + let limiter = limiter.acquire().await.unwrap(); + tokio::spawn(async move { let mut running_guard = running.lock().await; running_guard.insert(*conn_id); drop(running_guard); - let limiter = limiter.acquire().await.unwrap(); let stream = TcpStream::connect(connect_to).await.unwrap(); let mut stream = crate::codec::StreamCodec::new(stream); for (direction, reqs) in ResponseStreamer::new(records.iter()) { @@ -353,19 +351,19 @@ pub async fn play( if debug { println!("Client: {} / {}", cnt, total); } - drop(limiter); if let Some(notify_socket) = notify_socket { notify_socket.send(&cnt.to_be_bytes()).ok(); } let mut running_guard = running.lock().await; running_guard.remove(conn_id); drop(running_guard); - })); + drop(limiter); + }); //tokio::time::sleep(std::time::Duration::from_millis(500)).await; } } - for handle in handles { - handle.await.unwrap(); + while limiter.available_permits() < concurrency { + tokio::time::sleep(Duration::from_secs(1)).await; } } println!("Unfinished: {:?}", running.lock().await);