diff --git a/Cargo.toml b/Cargo.toml index e990fdf..554a677 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.9.0-alpha.1" +version = "0.9.0-alpha.2" authors = ["Jun Kurihara"] homepage = "https://github.com/junkurihara/rust-rpxy" repository = "https://github.com/junkurihara/rust-rpxy" diff --git a/rpxy-acme/src/manager.rs b/rpxy-acme/src/manager.rs index 9242743..fe10c85 100644 --- a/rpxy-acme/src/manager.rs +++ b/rpxy-acme/src/manager.rs @@ -115,7 +115,7 @@ impl AcmeManager { if let Some(cancel_token) = cancel_token.as_ref() { tokio::select! { _ = task => {}, - _ = cancel_token.cancelled() => { info!("rpxy ACME manager task for {domain} terminated") } + _ = cancel_token.cancelled() => { debug!("rpxy ACME manager task for {domain} terminated") } } } else { task.await; diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs index 53c8bbc..2f27735 100644 --- a/rpxy-bin/src/constants.rs +++ b/rpxy-bin/src/constants.rs @@ -1,6 +1,6 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; -pub const CONFIG_WATCH_DELAY_SECS: u32 = 20; +pub const CONFIG_WATCH_DELAY_SECS: u32 = 15; #[cfg(feature = "cache")] // Cache directory diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index d7f7121..ce96253 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -16,6 +16,8 @@ use crate::{ }; use hot_reload::{ReloaderReceiver, ReloaderService}; use rpxy_lib::{entrypoint, RpxyOptions, RpxyOptionsBuilder}; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; fn main() { init_logger(); @@ -44,67 +46,203 @@ fn main() { .unwrap(); tokio::select! { - Err(e) = config_service.start() => { - error!("config reloader service exited: {e}"); - std::process::exit(1); + config_res = config_service.start() => { + if let Err(e) = config_res { + error!("config reloader service exited: {e}"); + std::process::exit(1); + } } - Err(e) = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => { - error!("rpxy service existed: {e}"); - std::process::exit(1); - } - else => { - std::process::exit(0); + rpxy_res = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => { + if let Err(e) = rpxy_res { + error!("rpxy service existed: {e}"); + std::process::exit(1); + } } } + std::process::exit(0); } }); } +/// rpxy service definition +struct RpxyService { + runtime_handle: tokio::runtime::Handle, + proxy_conf: rpxy_lib::ProxyConfig, + app_conf: rpxy_lib::AppConfigList, + cert_service: Option>>, + cert_rx: Option>, + #[cfg(feature = "acme")] + acme_manager: Option, +} + +impl RpxyService { + async fn new(config_toml: &ConfigToml, runtime_handle: tokio::runtime::Handle) -> Result { + let (proxy_conf, app_conf) = build_settings(config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?; + + let (cert_service, cert_rx) = build_cert_manager(config_toml) + .await + .map_err(|e| anyhow!("Invalid cert configuration: {e}"))? + .map(|(s, r)| (Some(Arc::new(s)), Some(r))) + .unwrap_or((None, None)); + + Ok(RpxyService { + runtime_handle: runtime_handle.clone(), + proxy_conf, + app_conf, + cert_service, + cert_rx, + #[cfg(feature = "acme")] + acme_manager: build_acme_manager(config_toml, runtime_handle.clone()).await?, + }) + } + + async fn start(&self, cancel_token: Option) -> Result<(), anyhow::Error> { + let RpxyService { + runtime_handle, + proxy_conf, + app_conf, + cert_service: _, + cert_rx, + #[cfg(feature = "acme")] + acme_manager, + } = self; + + #[cfg(feature = "acme")] + { + let (acme_join_handles, server_config_acme_challenge) = acme_manager + .as_ref() + .map(|m| m.spawn_manager_tasks(cancel_token.as_ref().map(|t| t.child_token()))) + .unwrap_or((vec![], Default::default())); + let rpxy_opts = RpxyOptionsBuilder::default() + .proxy_config(proxy_conf.clone()) + .app_config_list(app_conf.clone()) + .cert_rx(cert_rx.clone()) + .runtime_handle(runtime_handle.clone()) + .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) + .server_configs_acme_challenge(Arc::new(server_config_acme_challenge)) + .build()?; + self.start_inner(rpxy_opts, acme_join_handles).await.map_err(|e| anyhow!(e)) + } + + #[cfg(not(feature = "acme"))] + { + let rpxy_opts = RpxyOptionsBuilder::default() + .proxy_config(proxy_conf.clone()) + .app_config_list(app_conf.clone()) + .cert_rx(cert_rx.clone()) + .runtime_handle(runtime_handle.clone()) + .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) + .build()?; + self.start_inner(rpxy_opts).await.map_err(|e| anyhow!(e)) + } + } + + /// Wrapper of entry point for rpxy service with certificate management service + async fn start_inner( + &self, + rpxy_opts: RpxyOptions, + #[cfg(feature = "acme")] acme_task_handles: Vec>, + ) -> Result<(), anyhow::Error> { + let cancel_token = rpxy_opts.cancel_token.clone(); + let runtime_handle = rpxy_opts.runtime_handle.clone(); + + // spawn rpxy entrypoint, where cancellation token is possibly contained inside the service + let cancel_token_clone = cancel_token.clone(); + let rpxy_handle = runtime_handle.spawn(async move { + if let Err(e) = entrypoint(&rpxy_opts).await { + error!("rpxy entrypoint exited on error: {e}"); + if let Some(cancel_token) = cancel_token_clone { + cancel_token.cancel(); + } + return Err(anyhow!(e)); + } + Ok(()) + }); + + if self.cert_service.is_none() { + return rpxy_handle.await?; + } + + // spawn certificate reloader service, where cert service does not have cancellation token inside the service + let cert_service = self.cert_service.as_ref().unwrap().clone(); + let cancel_token_clone = cancel_token.clone(); + let child_cancel_token = cancel_token.as_ref().map(|c| c.child_token()); + let cert_handle = runtime_handle.spawn(async move { + if let Some(child_cancel_token) = child_cancel_token { + tokio::select! { + cert_res = cert_service.start() => { + if let Err(ref e) = cert_res { + error!("cert reloader service exited on error: {e}"); + } + cancel_token_clone.unwrap().cancel(); + cert_res.map_err(|e| anyhow!(e)) + } + _ = child_cancel_token.cancelled() => { + debug!("cert reloader service terminated"); + Ok(()) + } + } + } else { + cert_service.start().await.map_err(|e| anyhow!(e)) + } + }); + + #[cfg(not(feature = "acme"))] + { + let (rpxy_res, cert_res) = tokio::join!(rpxy_handle, cert_handle); + let (rpxy_res, cert_res) = (rpxy_res?, cert_res?); + match (rpxy_res, cert_res) { + (Ok(()), Ok(())) => Ok(()), + (Err(e), _) => Err(e), + (_, Err(e)) => Err(e), + } + } + + #[cfg(feature = "acme")] + { + if acme_task_handles.is_empty() { + let (rpxy_res, cert_res) = tokio::join!(rpxy_handle, cert_handle); + let (rpxy_res, cert_res) = (rpxy_res?, cert_res?); + return match (rpxy_res, cert_res) { + (Ok(()), Ok(())) => Ok(()), + (Err(e), _) => Err(e), + (_, Err(e)) => Err(e), + }; + } + + // spawn acme manager tasks, where cancellation token is possibly contained inside the service + let select_all = futures_util::future::select_all(acme_task_handles); + let cancel_token_clone = cancel_token.clone(); + let acme_handle = runtime_handle.spawn(async move { + let (acme_res, _, _) = select_all.await; + if let Err(ref e) = acme_res { + error!("acme manager exited on error: {e}"); + } + if let Some(cancel_token) = cancel_token_clone { + cancel_token.cancel(); + } + acme_res.map_err(|e| anyhow!(e)) + }); + let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle); + let (rpxy_res, cert_res, acme_res) = (rpxy_res?, cert_res?, acme_res?); + match (rpxy_res, cert_res, acme_res) { + (Ok(()), Ok(()), Ok(())) => Ok(()), + (Err(e), _, _) => Err(e), + (_, Err(e), _) => Err(e), + (_, _, Err(e)) => Err(e), + } + } + } +} + async fn rpxy_service_without_watcher( config_file_path: &str, runtime_handle: tokio::runtime::Handle, ) -> Result<(), anyhow::Error> { info!("Start rpxy service"); let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?; - let (proxy_conf, app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?; - - let (cert_service, cert_rx) = build_cert_manager(&config_toml) - .await - .map_err(|e| anyhow!("Invalid cert configuration: {e}"))? - .map(|(s, r)| (Some(s), Some(r))) - .unwrap_or((None, None)); - - #[cfg(feature = "acme")] - { - let acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?; - let (acme_join_handles, server_config_acme_challenge) = acme_manager - .as_ref() - .map(|m| m.spawn_manager_tasks(None)) - .unwrap_or((vec![], Default::default())); - let rpxy_opts = RpxyOptionsBuilder::default() - .proxy_config(proxy_conf) - .app_config_list(app_conf) - .cert_rx(cert_rx) - .runtime_handle(runtime_handle.clone()) - .server_configs_acme_challenge(std::sync::Arc::new(server_config_acme_challenge)) - .build()?; - rpxy_entrypoint(&rpxy_opts, cert_service.as_ref(), acme_join_handles) //, &runtime_handle) - .await - .map_err(|e| anyhow!(e)) - } - - #[cfg(not(feature = "acme"))] - { - let rpxy_opts = RpxyOptionsBuilder::default() - .proxy_config(proxy_conf.clone()) - .app_config_list(app_conf.clone()) - .cert_rx(cert_rx.clone()) - .runtime_handle(runtime_handle.clone()) - .build()?; - rpxy_entrypoint(&rpxy_opts, cert_service.as_ref()) //, &runtime_handle) - .await - .map_err(|e| anyhow!(e)) - } + let service = RpxyService::new(&config_toml, runtime_handle).await?; + service.start(None).await } async fn rpxy_service_with_watcher( @@ -118,176 +256,41 @@ async fn rpxy_service_with_watcher( .borrow() .clone() .ok_or(anyhow!("Something wrong in config reloader receiver"))?; - let (mut proxy_conf, mut app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?; - - #[cfg(feature = "acme")] - let mut acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?; - - let mut cert_service_and_rx = build_cert_manager(&config_toml) - .await - .map_err(|e| anyhow!("Invalid cert configuration: {e}"))?; + let mut service = RpxyService::new(&config_toml, runtime_handle.clone()).await?; // Continuous monitoring loop { // Notifier for proxy service termination let cancel_token = tokio_util::sync::CancellationToken::new(); - let (cert_service, cert_rx) = cert_service_and_rx - .as_ref() - .map(|(s, r)| (Some(s), Some(r))) - .unwrap_or((None, None)); - - #[cfg(feature = "acme")] - let (acme_join_handles, server_config_acme_challenge) = acme_manager - .as_ref() - .map(|m| m.spawn_manager_tasks(Some(cancel_token.child_token()))) - .unwrap_or((vec![], Default::default())); - - let rpxy_opts = { - #[cfg(feature = "acme")] - let res = RpxyOptionsBuilder::default() - .proxy_config(proxy_conf.clone()) - .app_config_list(app_conf.clone()) - .cert_rx(cert_rx.cloned()) - .runtime_handle(runtime_handle.clone()) - .cancel_token(Some(cancel_token.child_token())) - .server_configs_acme_challenge(std::sync::Arc::new(server_config_acme_challenge)) - .build(); - - #[cfg(not(feature = "acme"))] - let res = RpxyOptionsBuilder::default() - .proxy_config(proxy_conf.clone()) - .app_config_list(app_conf.clone()) - .cert_rx(cert_rx.cloned()) - .runtime_handle(runtime_handle.clone()) - .term_notify(Some(term_notify.clone())) - .build(); - res - }?; - tokio::select! { - rpxy_res = { - #[cfg(feature = "acme")] - { - rpxy_entrypoint(&rpxy_opts, cert_service, acme_join_handles)//, &runtime_handle) + /* ---------- */ + rpxy_res = service.start(Some(cancel_token.clone())) => { + if let Err(ref e) = rpxy_res { + error!("rpxy service exited on error: {e}"); + } else { + error!("rpxy service exited"); } - #[cfg(not(feature = "acme"))] - { - rpxy_entrypoint(&rpxy_opts, cert_service)//, &runtime_handle) - } - } => { - error!("rpxy entrypoint or cert service exited"); return rpxy_res.map_err(|e| anyhow!(e)); } + /* ---------- */ _ = config_rx.changed() => { - let Some(config_toml) = config_rx.borrow().clone() else { + let Some(new_config_toml) = config_rx.borrow().clone() else { error!("Something wrong in config reloader receiver"); return Err(anyhow!("Something wrong in config reloader receiver")); }; - match build_settings(&config_toml) { - Ok((p, a)) => { - (proxy_conf, app_conf) = (p, a) + match RpxyService::new(&new_config_toml, runtime_handle.clone()).await { + Ok(new_service) => { + info!("Configuration updated."); + service = new_service; }, Err(e) => { - error!("Invalid configuration. Configuration does not updated: {e}"); - continue; + error!("rpxy failed to be ready. Configuration does not updated: {e}"); } }; - match build_cert_manager(&config_toml).await { - Ok(c) => { - cert_service_and_rx = c; - }, - Err(e) => { - error!("Invalid cert configuration. Configuration does not updated: {e}"); - continue; - } - }; - #[cfg(feature = "acme")] - { - match build_acme_manager(&config_toml, runtime_handle.clone()).await { - Ok(m) => { - acme_manager = m; - }, - Err(e) => { - error!("Invalid acme configuration. Configuration does not updated: {e}"); - continue; - } - } - } - - info!("Configuration updated. Terminate all spawned services and force to re-bind TCP/UDP sockets"); + info!("Terminate all spawned services and force to re-bind TCP/UDP sockets"); cancel_token.cancel(); } - else => break } } - - Ok(()) -} - -#[cfg(not(feature = "acme"))] -/// Wrapper of entry point for rpxy service with certificate management service -async fn rpxy_entrypoint( - rpxy_opts: &RpxyOptions, - cert_service: Option<&ReloaderService>, - // runtime_handle: &tokio::runtime::Handle, -) -> Result<(), anyhow::Error> { - // TODO: refactor: update routine - if let Some(cert_service) = cert_service { - tokio::select! { - rpxy_res = entrypoint(rpxy_opts) => { - error!("rpxy entrypoint exited"); - rpxy_res.map_err(|e| anyhow!(e)) - } - cert_res = cert_service.start() => { - error!("cert reloader service exited"); - cert_res.map_err(|e| anyhow!(e)) - } - } - } else { - entrypoint(rpxy_opts).await.map_err(|e| anyhow!(e)) - } -} - -#[cfg(feature = "acme")] -/// Wrapper of entry point for rpxy service with certificate management service -async fn rpxy_entrypoint( - rpxy_opts: &RpxyOptions, - cert_service: Option<&ReloaderService>, - acme_task_handles: Vec>, - // runtime_handle: &tokio::runtime::Handle, -) -> Result<(), anyhow::Error> { - // TODO: refactor: update routine - if let Some(cert_service) = cert_service { - if acme_task_handles.is_empty() { - tokio::select! { - rpxy_res = entrypoint(rpxy_opts) => { - error!("rpxy entrypoint exited"); - rpxy_res.map_err(|e| anyhow!(e)) - } - cert_res = cert_service.start() => { - error!("cert reloader service exited"); - cert_res.map_err(|e| anyhow!(e)) - } - } - } else { - let select_all = futures_util::future::select_all(acme_task_handles); - tokio::select! { - rpxy_res = entrypoint(rpxy_opts) => { - error!("rpxy entrypoint exited"); - rpxy_res.map_err(|e| anyhow!(e)) - } - (acme_res, _, _) = select_all => { - error!("acme manager exited"); - acme_res.map_err(|e| anyhow!(e)) - } - cert_res = cert_service.start() => { - error!("cert reloader service exited"); - cert_res.map_err(|e| anyhow!(e)) - } - } - } - } else { - entrypoint(rpxy_opts).await.map_err(|e| anyhow!(e)) - } } diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index de1983d..97aadef 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -1,7 +1,7 @@ use crate::{constants::*, count::RequestCount}; use hot_reload::ReloaderReceiver; use rpxy_certs::ServerCryptoBase; -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, time::Duration}; use tokio_util::sync::CancellationToken; /// Global object containing proxy configurations and shared object like counters. @@ -20,7 +20,7 @@ pub struct Globals { #[cfg(feature = "acme")] /// ServerConfig used for only ACME challenge for ACME domains - pub server_configs_acme_challenge: Arc>>, + pub server_configs_acme_challenge: std::sync::Arc>>, } /// Configuration parameters for proxy transport and request handlers diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index 3a6097d..f5f6d59 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -19,7 +19,7 @@ use crate::{ message_handler::HttpMessageHandlerBuilder, proxy::Proxy, }; -use futures::future::select_all; +use futures::future::join_all; use hot_reload::ReloaderReceiver; use rpxy_certs::ServerCryptoBase; use std::sync::Arc; @@ -131,7 +131,7 @@ pub async fn entrypoint( // spawn each proxy for a given socket with copied Arc-ed backend, message_handler and connection builder. let addresses = globals.proxy_config.listen_sockets.clone(); - let futures_iter = addresses.into_iter().map(|listening_on| { + let join_handles = addresses.into_iter().map(|listening_on| { let mut tls_enabled = false; if let Some(https_port) = globals.proxy_config.https_port { tls_enabled = https_port == listening_on.port() @@ -143,11 +143,41 @@ pub async fn entrypoint( connection_builder: connection_builder.clone(), message_handler: message_handler.clone(), }; - globals.runtime_handle.spawn(async move { proxy.start().await }) + + let cancel_token = globals.cancel_token.as_ref().map(|t| t.child_token()); + let parent_cancel_token_clone = globals.cancel_token.clone(); + globals.runtime_handle.spawn(async move { + info!("rpxy proxy service for {listening_on} started"); + if let Some(cancel_token) = cancel_token { + tokio::select! { + _ = cancel_token.cancelled() => { + debug!("rpxy proxy service for {listening_on} terminated"); + Ok(()) + }, + proxy_res = proxy.start() => { + info!("rpxy proxy service for {listening_on} exited"); + // cancel other proxy tasks + parent_cancel_token_clone.unwrap().cancel(); + proxy_res + } + } + } else { + proxy.start().await + } + }) }); - if let (Ok(Err(e)), _, _) = select_all(futures_iter).await { - error!("Some proxy services are down: {}", e); + let join_res = join_all(join_handles).await; + let mut errs = join_res.into_iter().filter_map(|res| { + if let Ok(Err(e)) = res { + error!("Some proxy services are down: {}", e); + Some(e) + } else { + None + } + }); + // returns the first error as the representative error + if let Some(e) = errs.next() { return Err(e); } diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 9be175d..3bb0aec 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -312,23 +312,6 @@ where } }; - match &self.globals.cancel_token { - Some(cancel_token) => { - select! { - _ = proxy_service.fuse() => { - warn!("Proxy service got down"); - } - _ = cancel_token.cancelled().fuse() => { - info!("Proxy service listening on {} receives term signal", self.listening_on); - } - } - } - None => { - proxy_service.await?; - warn!("Proxy service got down"); - } - } - - Ok(()) + proxy_service.await } }