diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs index 53c8bbc..3173fa9 100644 --- a/rpxy-bin/src/constants.rs +++ b/rpxy-bin/src/constants.rs @@ -1,6 +1,6 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; -pub const CONFIG_WATCH_DELAY_SECS: u32 = 20; +pub const CONFIG_WATCH_DELAY_SECS: u32 = 2; #[cfg(feature = "cache")] // Cache directory diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index 42b783d..9acdc2c 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -16,6 +16,8 @@ use crate::{ }; use hot_reload::{ReloaderReceiver, ReloaderService}; use rpxy_lib::{entrypoint, RpxyOptions, RpxyOptionsBuilder}; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; fn main() { init_logger(); @@ -62,51 +64,201 @@ fn main() { }); } +/// rpxy service definition +struct RpxyService { + runtime_handle: tokio::runtime::Handle, + proxy_conf: rpxy_lib::ProxyConfig, + app_conf: rpxy_lib::AppConfigList, + cert_service: Option>>, + cert_rx: Option>, + #[cfg(feature = "acme")] + acme_manager: Option, +} + +impl RpxyService { + async fn new(config_toml: &ConfigToml, runtime_handle: tokio::runtime::Handle) -> Result { + let (proxy_conf, app_conf) = build_settings(config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?; + + let (cert_service, cert_rx) = build_cert_manager(config_toml) + .await + .map_err(|e| anyhow!("Invalid cert configuration: {e}"))? + .map(|(s, r)| (Some(Arc::new(s)), Some(r))) + .unwrap_or((None, None)); + + Ok(RpxyService { + runtime_handle: runtime_handle.clone(), + proxy_conf, + app_conf, + cert_service, + cert_rx, + #[cfg(feature = "acme")] + acme_manager: build_acme_manager(config_toml, runtime_handle.clone()).await?, + }) + } + + async fn start(&self, cancel_token: Option) -> Result<(), anyhow::Error> { + let RpxyService { + runtime_handle, + proxy_conf, + app_conf, + cert_service: _, + cert_rx, + #[cfg(feature = "acme")] + acme_manager, + } = self; + + #[cfg(feature = "acme")] + { + let (acme_join_handles, server_config_acme_challenge) = acme_manager + .as_ref() + .map(|m| m.spawn_manager_tasks(cancel_token.as_ref().map(|t| t.child_token()))) + .unwrap_or((vec![], Default::default())); + let rpxy_opts = RpxyOptionsBuilder::default() + .proxy_config(proxy_conf.clone()) + .app_config_list(app_conf.clone()) + .cert_rx(cert_rx.clone()) + .runtime_handle(runtime_handle.clone()) + .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) + .server_configs_acme_challenge(Arc::new(server_config_acme_challenge)) + .build()?; + self + .start_inner(rpxy_opts, acme_join_handles) //, &runtime_handle) + .await + .map_err(|e| anyhow!(e)) + } + + #[cfg(not(feature = "acme"))] + { + let rpxy_opts = RpxyOptionsBuilder::default() + .proxy_config(proxy_conf.clone()) + .app_config_list(app_conf.clone()) + .cert_rx(cert_rx.clone()) + .runtime_handle(runtime_handle.clone()) + .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) + .build()?; + self + .start_inner(rpxy_opts) //, &runtime_handle) + .await + .map_err(|e| anyhow!(e)) + } + } + + /// Wrapper of entry point for rpxy service with certificate management service + async fn start_inner( + &self, + rpxy_opts: RpxyOptions, + // cert_service: Option<&Arc>>, + #[cfg(feature = "acme")] acme_task_handles: Vec>, + // runtime_handle: &tokio::runtime::Handle, + ) -> Result<(), anyhow::Error> { + let cancel_token = rpxy_opts.cancel_token.clone().unwrap_or_default(); + let runtime_handle = rpxy_opts.runtime_handle.clone(); + + // spawn rpxy entry point + let cancel_token_clone = cancel_token.clone(); + let child_cancel_token = cancel_token.child_token(); + let rpxy_handle = runtime_handle.spawn(async move { + tokio::select! { + rpxy_res = entrypoint(&rpxy_opts) => { + if let Err(ref e) = rpxy_res { + error!("rpxy entrypoint exited on error: {e}"); + } + cancel_token_clone.cancel(); + rpxy_res.map_err(|e| anyhow!(e)) + } + _ = child_cancel_token.cancelled() => { + debug!("rpxy entrypoint terminated by cancel token"); + Ok(()) + } + } + }); + + if self.cert_service.is_none() { + return rpxy_handle.await?; + } + + // spawn certificate reloader service + let cert_service = self.cert_service.as_ref().unwrap().clone(); + let cancel_token_clone = cancel_token.clone(); + let child_cancel_token = cancel_token.child_token(); + let cert_handle = runtime_handle.spawn(async move { + tokio::select! { + cert_res = cert_service.start() => { + if let Err(ref e) = cert_res { + error!("cert reloader service exited on error: {e}"); + } + cancel_token_clone.cancel(); + cert_res.map_err(|e| anyhow!(e)) + } + _ = child_cancel_token.cancelled() => { + debug!("cert reloader service terminated by cancel token"); + Ok(()) + } + } + }); + + #[cfg(not(feature = "acme"))] + { + let (rpxy_res, cert_res) = tokio::join!(rpxy_handle, cert_handle); + let (rpxy_res, cert_res) = (rpxy_res?, cert_res?); + match (rpxy_res, cert_res) { + (Ok(()), Ok(())) => Ok(()), + (Err(e), _) => Err(e), + (_, Err(e)) => Err(e), + } + } + + #[cfg(feature = "acme")] + { + if acme_task_handles.is_empty() { + let (rpxy_res, cert_res) = tokio::join!(rpxy_handle, cert_handle); + let (rpxy_res, cert_res) = (rpxy_res?, cert_res?); + return match (rpxy_res, cert_res) { + (Ok(()), Ok(())) => Ok(()), + (Err(e), _) => Err(e), + (_, Err(e)) => Err(e), + }; + } + + // spawn acme manager tasks + let select_all = futures_util::future::select_all(acme_task_handles); + let cancel_token_clone = cancel_token.clone(); + let child_cancel_token = cancel_token.child_token(); + let acme_handle = runtime_handle.spawn(async move { + tokio::select! { + (acme_res, _, _) = select_all => { + if let Err(ref e) = acme_res { + error!("acme manager exited on error: {e}"); + } + cancel_token_clone.cancel(); + acme_res.map_err(|e| anyhow!(e)) + } + _ = child_cancel_token.cancelled() => { + debug!("acme manager terminated by cancel token"); + Ok(()) + } + } + }); + let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle); + let (rpxy_res, cert_res, acme_res) = (rpxy_res?, cert_res?, acme_res?); + match (rpxy_res, cert_res, acme_res) { + (Ok(()), Ok(()), Ok(())) => Ok(()), + (Err(e), _, _) => Err(e), + (_, Err(e), _) => Err(e), + (_, _, Err(e)) => Err(e), + } + } + } +} + async fn rpxy_service_without_watcher( config_file_path: &str, runtime_handle: tokio::runtime::Handle, ) -> Result<(), anyhow::Error> { info!("Start rpxy service"); let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?; - let (proxy_conf, app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?; - - let (cert_service, cert_rx) = build_cert_manager(&config_toml) - .await - .map_err(|e| anyhow!("Invalid cert configuration: {e}"))? - .map(|(s, r)| (Some(s), Some(r))) - .unwrap_or((None, None)); - - #[cfg(feature = "acme")] - { - let acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?; - let (acme_join_handles, server_config_acme_challenge) = acme_manager - .as_ref() - .map(|m| m.spawn_manager_tasks(None)) - .unwrap_or((vec![], Default::default())); - let rpxy_opts = RpxyOptionsBuilder::default() - .proxy_config(proxy_conf) - .app_config_list(app_conf) - .cert_rx(cert_rx) - .runtime_handle(runtime_handle.clone()) - .server_configs_acme_challenge(std::sync::Arc::new(server_config_acme_challenge)) - .build()?; - rpxy_entrypoint(&rpxy_opts, cert_service.as_ref(), acme_join_handles) //, &runtime_handle) - .await - .map_err(|e| anyhow!(e)) - } - - #[cfg(not(feature = "acme"))] - { - let rpxy_opts = RpxyOptionsBuilder::default() - .proxy_config(proxy_conf.clone()) - .app_config_list(app_conf.clone()) - .cert_rx(cert_rx.clone()) - .runtime_handle(runtime_handle.clone()) - .build()?; - rpxy_entrypoint(&rpxy_opts, cert_service.as_ref()) //, &runtime_handle) - .await - .map_err(|e| anyhow!(e)) - } + let service = RpxyService::new(&config_toml, runtime_handle).await?; + service.start(None).await } async fn rpxy_service_with_watcher( @@ -120,176 +272,41 @@ async fn rpxy_service_with_watcher( .borrow() .clone() .ok_or(anyhow!("Something wrong in config reloader receiver"))?; - let (mut proxy_conf, mut app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?; - - #[cfg(feature = "acme")] - let mut acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?; - - let mut cert_service_and_rx = build_cert_manager(&config_toml) - .await - .map_err(|e| anyhow!("Invalid cert configuration: {e}"))?; + let mut service = RpxyService::new(&config_toml, runtime_handle.clone()).await?; // Continuous monitoring loop { // Notifier for proxy service termination let cancel_token = tokio_util::sync::CancellationToken::new(); - let (cert_service, cert_rx) = cert_service_and_rx - .as_ref() - .map(|(s, r)| (Some(s), Some(r))) - .unwrap_or((None, None)); - - #[cfg(feature = "acme")] - let (acme_join_handles, server_config_acme_challenge) = acme_manager - .as_ref() - .map(|m| m.spawn_manager_tasks(Some(cancel_token.child_token()))) - .unwrap_or((vec![], Default::default())); - - let rpxy_opts = { - #[cfg(feature = "acme")] - let res = RpxyOptionsBuilder::default() - .proxy_config(proxy_conf.clone()) - .app_config_list(app_conf.clone()) - .cert_rx(cert_rx.cloned()) - .runtime_handle(runtime_handle.clone()) - .cancel_token(Some(cancel_token.child_token())) - .server_configs_acme_challenge(std::sync::Arc::new(server_config_acme_challenge)) - .build(); - - #[cfg(not(feature = "acme"))] - let res = RpxyOptionsBuilder::default() - .proxy_config(proxy_conf.clone()) - .app_config_list(app_conf.clone()) - .cert_rx(cert_rx.cloned()) - .runtime_handle(runtime_handle.clone()) - .term_notify(Some(term_notify.clone())) - .build(); - res - }?; - tokio::select! { - rpxy_res = { - #[cfg(feature = "acme")] - { - rpxy_entrypoint(&rpxy_opts, cert_service, acme_join_handles)//, &runtime_handle) + /* ---------- */ + rpxy_res = service.start(Some(cancel_token.clone())) => { + if let Err(ref e) = rpxy_res { + error!("rpxy service exited on error: {e}"); + } else { + error!("rpxy service exited"); } - #[cfg(not(feature = "acme"))] - { - rpxy_entrypoint(&rpxy_opts, cert_service)//, &runtime_handle) - } - } => { - error!("rpxy entrypoint or cert service exited"); return rpxy_res.map_err(|e| anyhow!(e)); } + /* ---------- */ _ = config_rx.changed() => { - let Some(config_toml) = config_rx.borrow().clone() else { + let Some(new_config_toml) = config_rx.borrow().clone() else { error!("Something wrong in config reloader receiver"); return Err(anyhow!("Something wrong in config reloader receiver")); }; - match build_settings(&config_toml) { - Ok((p, a)) => { - (proxy_conf, app_conf) = (p, a) + match RpxyService::new(&new_config_toml, runtime_handle.clone()).await { + Ok(new_service) => { + info!("Configuration updated."); + service = new_service; }, Err(e) => { - error!("Invalid configuration. Configuration does not updated: {e}"); - continue; + error!("rpxy failed to be ready. Configuration does not updated: {e}"); } }; - match build_cert_manager(&config_toml).await { - Ok(c) => { - cert_service_and_rx = c; - }, - Err(e) => { - error!("Invalid cert configuration. Configuration does not updated: {e}"); - continue; - } - }; - #[cfg(feature = "acme")] - { - match build_acme_manager(&config_toml, runtime_handle.clone()).await { - Ok(m) => { - acme_manager = m; - }, - Err(e) => { - error!("Invalid acme configuration. Configuration does not updated: {e}"); - continue; - } - } - } - - info!("Configuration updated. Terminate all spawned services and force to re-bind TCP/UDP sockets"); + info!("Terminate all spawned services and force to re-bind TCP/UDP sockets"); cancel_token.cancel(); } - else => break } } - - Ok(()) -} - -#[cfg(not(feature = "acme"))] -/// Wrapper of entry point for rpxy service with certificate management service -async fn rpxy_entrypoint( - rpxy_opts: &RpxyOptions, - cert_service: Option<&ReloaderService>, - // runtime_handle: &tokio::runtime::Handle, -) -> Result<(), anyhow::Error> { - // TODO: refactor: update routine - if let Some(cert_service) = cert_service { - tokio::select! { - rpxy_res = entrypoint(rpxy_opts) => { - error!("rpxy entrypoint exited"); - rpxy_res.map_err(|e| anyhow!(e)) - } - cert_res = cert_service.start() => { - error!("cert reloader service exited"); - cert_res.map_err(|e| anyhow!(e)) - } - } - } else { - entrypoint(rpxy_opts).await.map_err(|e| anyhow!(e)) - } -} - -#[cfg(feature = "acme")] -/// Wrapper of entry point for rpxy service with certificate management service -async fn rpxy_entrypoint( - rpxy_opts: &RpxyOptions, - cert_service: Option<&ReloaderService>, - acme_task_handles: Vec>, - // runtime_handle: &tokio::runtime::Handle, -) -> Result<(), anyhow::Error> { - // TODO: refactor: update routine - if let Some(cert_service) = cert_service { - if acme_task_handles.is_empty() { - tokio::select! { - rpxy_res = entrypoint(rpxy_opts) => { - error!("rpxy entrypoint exited"); - rpxy_res.map_err(|e| anyhow!(e)) - } - cert_res = cert_service.start() => { - error!("cert reloader service exited"); - cert_res.map_err(|e| anyhow!(e)) - } - } - } else { - let select_all = futures_util::future::select_all(acme_task_handles); - tokio::select! { - rpxy_res = entrypoint(rpxy_opts) => { - error!("rpxy entrypoint exited"); - rpxy_res.map_err(|e| anyhow!(e)) - } - (acme_res, _, _) = select_all => { - error!("acme manager exited"); - acme_res.map_err(|e| anyhow!(e)) - } - cert_res = cert_service.start() => { - error!("cert reloader service exited"); - cert_res.map_err(|e| anyhow!(e)) - } - } - } - } else { - entrypoint(rpxy_opts).await.map_err(|e| anyhow!(e)) - } } diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index de1983d..c8b73bf 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -1,7 +1,7 @@ use crate::{constants::*, count::RequestCount}; use hot_reload::ReloaderReceiver; use rpxy_certs::ServerCryptoBase; -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, time::Duration}; use tokio_util::sync::CancellationToken; /// Global object containing proxy configurations and shared object like counters. @@ -14,13 +14,13 @@ pub struct Globals { /// Shared context - Async task runtime handler pub runtime_handle: tokio::runtime::Handle, /// Shared context - Notify object to stop async tasks - pub cancel_token: Option, + pub cancel_token: CancellationToken, /// Shared context - Certificate reloader service receiver // TODO: newer one pub cert_reloader_rx: Option>, #[cfg(feature = "acme")] /// ServerConfig used for only ACME challenge for ACME domains - pub server_configs_acme_challenge: Arc>>, + pub server_configs_acme_challenge: std::sync::Arc>>, } /// Configuration parameters for proxy transport and request handlers diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index fdfb81d..43463b2 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -108,7 +108,7 @@ pub async fn entrypoint( proxy_config: proxy_config.clone(), request_count: Default::default(), runtime_handle: runtime_handle.clone(), - cancel_token: cancel_token.clone(), + cancel_token: cancel_token.clone().unwrap_or_default(), cert_reloader_rx: cert_rx.clone(), #[cfg(feature = "acme")] @@ -130,7 +130,6 @@ pub async fn entrypoint( let connection_builder = proxy::connection_builder(&globals); // spawn each proxy for a given socket with copied Arc-ed backend, message_handler and connection builder. - let parent_cancel_token = globals.cancel_token.clone().unwrap_or_default(); let addresses = globals.proxy_config.listen_sockets.clone(); let join_handles = addresses.into_iter().map(|listening_on| { let mut tls_enabled = false; @@ -145,8 +144,8 @@ pub async fn entrypoint( message_handler: message_handler.clone(), }; - let cancel_token = parent_cancel_token.child_token(); - let parent_cancel_token_clone = parent_cancel_token.clone(); + let cancel_token = globals.cancel_token.child_token(); + let parent_cancel_token_clone = globals.cancel_token.clone(); globals.runtime_handle.spawn(async move { info!("rpxy proxy service for {listening_on} started"); tokio::select! {