diff --git a/src/client.rs b/src/client.rs index 3ed98e9..bf6f5eb 100644 --- a/src/client.rs +++ b/src/client.rs @@ -97,6 +97,7 @@ pub async fn play( cert_path: Option<&str>, skip_verif: bool, concurrency: usize, + notify_addr: Option<&str>, debug: bool, ) { // Semaphore used to limit the number of concurrent clients. @@ -109,6 +110,12 @@ pub async fn play( let dummy_bytes = Arc::new(vec![0x42u8; 16 * 1024 * 1024]); + let notify_socket = notify_addr.map(|notify_addr| { + let socket = std::net::UdpSocket::bind("0.0.0.0:48567").unwrap(); + socket.connect(notify_addr).unwrap(); + Arc::new(socket) + }); + tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; tokio::spawn({ @@ -287,6 +294,7 @@ pub async fn play( let limiter = limiter.clone(); let running = running.clone(); let dummy_bytes = dummy_bytes.clone(); + let notify_socket = notify_socket.clone(); handles.push(tokio::spawn(async move { let mut running_guard = running.lock().await; running_guard.insert(*conn_id); @@ -349,9 +357,12 @@ pub async fn play( .await .unwrap() .unwrap(); - let cnt = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed); + let cnt = counter.fetch_add(1, std::sync::atomic::Ordering::Relaxed) + 1; if debug { - println!("Client: {} / {}", cnt + 1, total); + println!("Client: {} / {}", cnt, total); + } + if let Some(notify_socket) = ¬ify_socket { + notify_socket.send(&cnt.to_be_bytes()).unwrap(); } drop(limiter); let mut running_guard = running.lock().await; @@ -367,4 +378,8 @@ pub async fn play( } } println!("Unfinished: {:?}", running.lock().await); + + if let Some(notify_socket) = notify_socket { + notify_socket.send(&[0xff; 4]).unwrap(); + } } diff --git a/src/main.rs b/src/main.rs index b7b874c..c168050 100644 --- a/src/main.rs +++ b/src/main.rs @@ -154,13 +154,10 @@ async fn main() { subopt.certs.as_deref(), subopt.skip_verif, subopt.concurrency, + subopt.notify_addr.as_deref(), subopt.debug, ) .await; - if let Some(notify_addr) = subopt.notify_addr { - let socket = std::net::UdpSocket::bind("0.0.0.0:48567").unwrap(); - socket.send_to(b"done", ¬ify_addr).unwrap(); - } } Subcommand::Server(subopt) => { let records = RECORDS.init(record::read_record_file(&opt.record_file));