diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index d7f7121..42b783d 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -44,18 +44,20 @@ fn main() { .unwrap(); tokio::select! { - Err(e) = config_service.start() => { - error!("config reloader service exited: {e}"); - std::process::exit(1); + config_res = config_service.start() => { + if let Err(e) = config_res { + error!("config reloader service exited: {e}"); + std::process::exit(1); + } } - Err(e) = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => { - error!("rpxy service existed: {e}"); - std::process::exit(1); - } - else => { - std::process::exit(0); + rpxy_res = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => { + if let Err(e) = rpxy_res { + error!("rpxy service existed: {e}"); + std::process::exit(1); + } } } + std::process::exit(0); } }); } diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index 3a6097d..fdfb81d 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -19,7 +19,7 @@ use crate::{ message_handler::HttpMessageHandlerBuilder, proxy::Proxy, }; -use futures::future::select_all; +use futures::future::join_all; use hot_reload::ReloaderReceiver; use rpxy_certs::ServerCryptoBase; use std::sync::Arc; @@ -130,8 +130,9 @@ pub async fn entrypoint( let connection_builder = proxy::connection_builder(&globals); // spawn each proxy for a given socket with copied Arc-ed backend, message_handler and connection builder. + let parent_cancel_token = globals.cancel_token.clone().unwrap_or_default(); let addresses = globals.proxy_config.listen_sockets.clone(); - let futures_iter = addresses.into_iter().map(|listening_on| { + let join_handles = addresses.into_iter().map(|listening_on| { let mut tls_enabled = false; if let Some(https_port) = globals.proxy_config.https_port { tls_enabled = https_port == listening_on.port() @@ -143,11 +144,37 @@ pub async fn entrypoint( connection_builder: connection_builder.clone(), message_handler: message_handler.clone(), }; - globals.runtime_handle.spawn(async move { proxy.start().await }) + + let cancel_token = parent_cancel_token.child_token(); + let parent_cancel_token_clone = parent_cancel_token.clone(); + globals.runtime_handle.spawn(async move { + info!("rpxy proxy service for {listening_on} started"); + tokio::select! { + _ = cancel_token.cancelled() => { + info!("rpxy proxy service for {listening_on} terminated"); + Ok(()) + }, + proxy_res = proxy.start() => { + info!("rpxy proxy service for {listening_on} exited"); + // cancel other proxy tasks + parent_cancel_token_clone.cancel(); + proxy_res + } + } + }) }); - if let (Ok(Err(e)), _, _) = select_all(futures_iter).await { - error!("Some proxy services are down: {}", e); + let join_res = join_all(join_handles).await; + let mut errs = join_res.into_iter().filter_map(|res| { + if let Ok(Err(e)) = res { + error!("Some proxy services are down: {}", e); + Some(e) + } else { + None + } + }); + // returns the first error as the representative error + if let Some(e) = errs.next() { return Err(e); } diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 9be175d..3bb0aec 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -312,23 +312,6 @@ where } }; - match &self.globals.cancel_token { - Some(cancel_token) => { - select! { - _ = proxy_service.fuse() => { - warn!("Proxy service got down"); - } - _ = cancel_token.cancelled().fuse() => { - info!("Proxy service listening on {} receives term signal", self.listening_on); - } - } - } - None => { - proxy_service.await?; - warn!("Proxy service got down"); - } - } - - Ok(()) + proxy_service.await } }