From 8d9f07a848c22bd97c6a10b30ed78756fbf3530c Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Fri, 26 Jul 2024 23:25:40 +0900 Subject: [PATCH 1/4] wip: fixing dynamic reloading --- rpxy-bin/src/main.rs | 20 +++++++++-------- rpxy-lib/src/lib.rs | 37 +++++++++++++++++++++++++++----- rpxy-lib/src/proxy/proxy_main.rs | 19 +--------------- 3 files changed, 44 insertions(+), 32 deletions(-) diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index d7f7121..42b783d 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -44,18 +44,20 @@ fn main() { .unwrap(); tokio::select! { - Err(e) = config_service.start() => { - error!("config reloader service exited: {e}"); - std::process::exit(1); + config_res = config_service.start() => { + if let Err(e) = config_res { + error!("config reloader service exited: {e}"); + std::process::exit(1); + } } - Err(e) = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => { - error!("rpxy service existed: {e}"); - std::process::exit(1); - } - else => { - std::process::exit(0); + rpxy_res = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => { + if let Err(e) = rpxy_res { + error!("rpxy service existed: {e}"); + std::process::exit(1); + } } } + std::process::exit(0); } }); } diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index 3a6097d..fdfb81d 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -19,7 +19,7 @@ use crate::{ message_handler::HttpMessageHandlerBuilder, proxy::Proxy, }; -use futures::future::select_all; +use futures::future::join_all; use hot_reload::ReloaderReceiver; use rpxy_certs::ServerCryptoBase; use std::sync::Arc; @@ -130,8 +130,9 @@ pub async fn entrypoint( let connection_builder = proxy::connection_builder(&globals); // spawn each proxy for a given socket with copied Arc-ed backend, message_handler and connection builder. + let parent_cancel_token = globals.cancel_token.clone().unwrap_or_default(); let addresses = globals.proxy_config.listen_sockets.clone(); - let futures_iter = addresses.into_iter().map(|listening_on| { + let join_handles = addresses.into_iter().map(|listening_on| { let mut tls_enabled = false; if let Some(https_port) = globals.proxy_config.https_port { tls_enabled = https_port == listening_on.port() @@ -143,11 +144,37 @@ pub async fn entrypoint( connection_builder: connection_builder.clone(), message_handler: message_handler.clone(), }; - globals.runtime_handle.spawn(async move { proxy.start().await }) + + let cancel_token = parent_cancel_token.child_token(); + let parent_cancel_token_clone = parent_cancel_token.clone(); + globals.runtime_handle.spawn(async move { + info!("rpxy proxy service for {listening_on} started"); + tokio::select! { + _ = cancel_token.cancelled() => { + info!("rpxy proxy service for {listening_on} terminated"); + Ok(()) + }, + proxy_res = proxy.start() => { + info!("rpxy proxy service for {listening_on} exited"); + // cancel other proxy tasks + parent_cancel_token_clone.cancel(); + proxy_res + } + } + }) }); - if let (Ok(Err(e)), _, _) = select_all(futures_iter).await { - error!("Some proxy services are down: {}", e); + let join_res = join_all(join_handles).await; + let mut errs = join_res.into_iter().filter_map(|res| { + if let Ok(Err(e)) = res { + error!("Some proxy services are down: {}", e); + Some(e) + } else { + None + } + }); + // returns the first error as the representative error + if let Some(e) = errs.next() { return Err(e); } diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 9be175d..3bb0aec 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -312,23 +312,6 @@ where } }; - match &self.globals.cancel_token { - Some(cancel_token) => { - select! { - _ = proxy_service.fuse() => { - warn!("Proxy service got down"); - } - _ = cancel_token.cancelled().fuse() => { - info!("Proxy service listening on {} receives term signal", self.listening_on); - } - } - } - None => { - proxy_service.await?; - warn!("Proxy service got down"); - } - } - - Ok(()) + proxy_service.await } } From 48b33409f9852b65a83b72969825dd1ffe1a4e3d Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Sat, 27 Jul 2024 03:32:35 +0900 Subject: [PATCH 2/4] fix: redesigned graceful shutdown for config update --- rpxy-bin/src/constants.rs | 2 +- rpxy-bin/src/main.rs | 395 ++++++++++++++++++++------------------ rpxy-lib/src/globals.rs | 6 +- rpxy-lib/src/lib.rs | 7 +- 4 files changed, 213 insertions(+), 197 deletions(-) diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs index 53c8bbc..3173fa9 100644 --- a/rpxy-bin/src/constants.rs +++ b/rpxy-bin/src/constants.rs @@ -1,6 +1,6 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; -pub const CONFIG_WATCH_DELAY_SECS: u32 = 20; +pub const CONFIG_WATCH_DELAY_SECS: u32 = 2; #[cfg(feature = "cache")] // Cache directory diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index 42b783d..9acdc2c 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -16,6 +16,8 @@ use crate::{ }; use hot_reload::{ReloaderReceiver, ReloaderService}; use rpxy_lib::{entrypoint, RpxyOptions, RpxyOptionsBuilder}; +use std::sync::Arc; +use tokio_util::sync::CancellationToken; fn main() { init_logger(); @@ -62,51 +64,201 @@ fn main() { }); } +/// rpxy service definition +struct RpxyService { + runtime_handle: tokio::runtime::Handle, + proxy_conf: rpxy_lib::ProxyConfig, + app_conf: rpxy_lib::AppConfigList, + cert_service: Option>>, + cert_rx: Option>, + #[cfg(feature = "acme")] + acme_manager: Option, +} + +impl RpxyService { + async fn new(config_toml: &ConfigToml, runtime_handle: tokio::runtime::Handle) -> Result { + let (proxy_conf, app_conf) = build_settings(config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?; + + let (cert_service, cert_rx) = build_cert_manager(config_toml) + .await + .map_err(|e| anyhow!("Invalid cert configuration: {e}"))? + .map(|(s, r)| (Some(Arc::new(s)), Some(r))) + .unwrap_or((None, None)); + + Ok(RpxyService { + runtime_handle: runtime_handle.clone(), + proxy_conf, + app_conf, + cert_service, + cert_rx, + #[cfg(feature = "acme")] + acme_manager: build_acme_manager(config_toml, runtime_handle.clone()).await?, + }) + } + + async fn start(&self, cancel_token: Option) -> Result<(), anyhow::Error> { + let RpxyService { + runtime_handle, + proxy_conf, + app_conf, + cert_service: _, + cert_rx, + #[cfg(feature = "acme")] + acme_manager, + } = self; + + #[cfg(feature = "acme")] + { + let (acme_join_handles, server_config_acme_challenge) = acme_manager + .as_ref() + .map(|m| m.spawn_manager_tasks(cancel_token.as_ref().map(|t| t.child_token()))) + .unwrap_or((vec![], Default::default())); + let rpxy_opts = RpxyOptionsBuilder::default() + .proxy_config(proxy_conf.clone()) + .app_config_list(app_conf.clone()) + .cert_rx(cert_rx.clone()) + .runtime_handle(runtime_handle.clone()) + .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) + .server_configs_acme_challenge(Arc::new(server_config_acme_challenge)) + .build()?; + self + .start_inner(rpxy_opts, acme_join_handles) //, &runtime_handle) + .await + .map_err(|e| anyhow!(e)) + } + + #[cfg(not(feature = "acme"))] + { + let rpxy_opts = RpxyOptionsBuilder::default() + .proxy_config(proxy_conf.clone()) + .app_config_list(app_conf.clone()) + .cert_rx(cert_rx.clone()) + .runtime_handle(runtime_handle.clone()) + .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) + .build()?; + self + .start_inner(rpxy_opts) //, &runtime_handle) + .await + .map_err(|e| anyhow!(e)) + } + } + + /// Wrapper of entry point for rpxy service with certificate management service + async fn start_inner( + &self, + rpxy_opts: RpxyOptions, + // cert_service: Option<&Arc>>, + #[cfg(feature = "acme")] acme_task_handles: Vec>, + // runtime_handle: &tokio::runtime::Handle, + ) -> Result<(), anyhow::Error> { + let cancel_token = rpxy_opts.cancel_token.clone().unwrap_or_default(); + let runtime_handle = rpxy_opts.runtime_handle.clone(); + + // spawn rpxy entry point + let cancel_token_clone = cancel_token.clone(); + let child_cancel_token = cancel_token.child_token(); + let rpxy_handle = runtime_handle.spawn(async move { + tokio::select! { + rpxy_res = entrypoint(&rpxy_opts) => { + if let Err(ref e) = rpxy_res { + error!("rpxy entrypoint exited on error: {e}"); + } + cancel_token_clone.cancel(); + rpxy_res.map_err(|e| anyhow!(e)) + } + _ = child_cancel_token.cancelled() => { + debug!("rpxy entrypoint terminated by cancel token"); + Ok(()) + } + } + }); + + if self.cert_service.is_none() { + return rpxy_handle.await?; + } + + // spawn certificate reloader service + let cert_service = self.cert_service.as_ref().unwrap().clone(); + let cancel_token_clone = cancel_token.clone(); + let child_cancel_token = cancel_token.child_token(); + let cert_handle = runtime_handle.spawn(async move { + tokio::select! { + cert_res = cert_service.start() => { + if let Err(ref e) = cert_res { + error!("cert reloader service exited on error: {e}"); + } + cancel_token_clone.cancel(); + cert_res.map_err(|e| anyhow!(e)) + } + _ = child_cancel_token.cancelled() => { + debug!("cert reloader service terminated by cancel token"); + Ok(()) + } + } + }); + + #[cfg(not(feature = "acme"))] + { + let (rpxy_res, cert_res) = tokio::join!(rpxy_handle, cert_handle); + let (rpxy_res, cert_res) = (rpxy_res?, cert_res?); + match (rpxy_res, cert_res) { + (Ok(()), Ok(())) => Ok(()), + (Err(e), _) => Err(e), + (_, Err(e)) => Err(e), + } + } + + #[cfg(feature = "acme")] + { + if acme_task_handles.is_empty() { + let (rpxy_res, cert_res) = tokio::join!(rpxy_handle, cert_handle); + let (rpxy_res, cert_res) = (rpxy_res?, cert_res?); + return match (rpxy_res, cert_res) { + (Ok(()), Ok(())) => Ok(()), + (Err(e), _) => Err(e), + (_, Err(e)) => Err(e), + }; + } + + // spawn acme manager tasks + let select_all = futures_util::future::select_all(acme_task_handles); + let cancel_token_clone = cancel_token.clone(); + let child_cancel_token = cancel_token.child_token(); + let acme_handle = runtime_handle.spawn(async move { + tokio::select! { + (acme_res, _, _) = select_all => { + if let Err(ref e) = acme_res { + error!("acme manager exited on error: {e}"); + } + cancel_token_clone.cancel(); + acme_res.map_err(|e| anyhow!(e)) + } + _ = child_cancel_token.cancelled() => { + debug!("acme manager terminated by cancel token"); + Ok(()) + } + } + }); + let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle); + let (rpxy_res, cert_res, acme_res) = (rpxy_res?, cert_res?, acme_res?); + match (rpxy_res, cert_res, acme_res) { + (Ok(()), Ok(()), Ok(())) => Ok(()), + (Err(e), _, _) => Err(e), + (_, Err(e), _) => Err(e), + (_, _, Err(e)) => Err(e), + } + } + } +} + async fn rpxy_service_without_watcher( config_file_path: &str, runtime_handle: tokio::runtime::Handle, ) -> Result<(), anyhow::Error> { info!("Start rpxy service"); let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?; - let (proxy_conf, app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?; - - let (cert_service, cert_rx) = build_cert_manager(&config_toml) - .await - .map_err(|e| anyhow!("Invalid cert configuration: {e}"))? - .map(|(s, r)| (Some(s), Some(r))) - .unwrap_or((None, None)); - - #[cfg(feature = "acme")] - { - let acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?; - let (acme_join_handles, server_config_acme_challenge) = acme_manager - .as_ref() - .map(|m| m.spawn_manager_tasks(None)) - .unwrap_or((vec![], Default::default())); - let rpxy_opts = RpxyOptionsBuilder::default() - .proxy_config(proxy_conf) - .app_config_list(app_conf) - .cert_rx(cert_rx) - .runtime_handle(runtime_handle.clone()) - .server_configs_acme_challenge(std::sync::Arc::new(server_config_acme_challenge)) - .build()?; - rpxy_entrypoint(&rpxy_opts, cert_service.as_ref(), acme_join_handles) //, &runtime_handle) - .await - .map_err(|e| anyhow!(e)) - } - - #[cfg(not(feature = "acme"))] - { - let rpxy_opts = RpxyOptionsBuilder::default() - .proxy_config(proxy_conf.clone()) - .app_config_list(app_conf.clone()) - .cert_rx(cert_rx.clone()) - .runtime_handle(runtime_handle.clone()) - .build()?; - rpxy_entrypoint(&rpxy_opts, cert_service.as_ref()) //, &runtime_handle) - .await - .map_err(|e| anyhow!(e)) - } + let service = RpxyService::new(&config_toml, runtime_handle).await?; + service.start(None).await } async fn rpxy_service_with_watcher( @@ -120,176 +272,41 @@ async fn rpxy_service_with_watcher( .borrow() .clone() .ok_or(anyhow!("Something wrong in config reloader receiver"))?; - let (mut proxy_conf, mut app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?; - - #[cfg(feature = "acme")] - let mut acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?; - - let mut cert_service_and_rx = build_cert_manager(&config_toml) - .await - .map_err(|e| anyhow!("Invalid cert configuration: {e}"))?; + let mut service = RpxyService::new(&config_toml, runtime_handle.clone()).await?; // Continuous monitoring loop { // Notifier for proxy service termination let cancel_token = tokio_util::sync::CancellationToken::new(); - let (cert_service, cert_rx) = cert_service_and_rx - .as_ref() - .map(|(s, r)| (Some(s), Some(r))) - .unwrap_or((None, None)); - - #[cfg(feature = "acme")] - let (acme_join_handles, server_config_acme_challenge) = acme_manager - .as_ref() - .map(|m| m.spawn_manager_tasks(Some(cancel_token.child_token()))) - .unwrap_or((vec![], Default::default())); - - let rpxy_opts = { - #[cfg(feature = "acme")] - let res = RpxyOptionsBuilder::default() - .proxy_config(proxy_conf.clone()) - .app_config_list(app_conf.clone()) - .cert_rx(cert_rx.cloned()) - .runtime_handle(runtime_handle.clone()) - .cancel_token(Some(cancel_token.child_token())) - .server_configs_acme_challenge(std::sync::Arc::new(server_config_acme_challenge)) - .build(); - - #[cfg(not(feature = "acme"))] - let res = RpxyOptionsBuilder::default() - .proxy_config(proxy_conf.clone()) - .app_config_list(app_conf.clone()) - .cert_rx(cert_rx.cloned()) - .runtime_handle(runtime_handle.clone()) - .term_notify(Some(term_notify.clone())) - .build(); - res - }?; - tokio::select! { - rpxy_res = { - #[cfg(feature = "acme")] - { - rpxy_entrypoint(&rpxy_opts, cert_service, acme_join_handles)//, &runtime_handle) + /* ---------- */ + rpxy_res = service.start(Some(cancel_token.clone())) => { + if let Err(ref e) = rpxy_res { + error!("rpxy service exited on error: {e}"); + } else { + error!("rpxy service exited"); } - #[cfg(not(feature = "acme"))] - { - rpxy_entrypoint(&rpxy_opts, cert_service)//, &runtime_handle) - } - } => { - error!("rpxy entrypoint or cert service exited"); return rpxy_res.map_err(|e| anyhow!(e)); } + /* ---------- */ _ = config_rx.changed() => { - let Some(config_toml) = config_rx.borrow().clone() else { + let Some(new_config_toml) = config_rx.borrow().clone() else { error!("Something wrong in config reloader receiver"); return Err(anyhow!("Something wrong in config reloader receiver")); }; - match build_settings(&config_toml) { - Ok((p, a)) => { - (proxy_conf, app_conf) = (p, a) + match RpxyService::new(&new_config_toml, runtime_handle.clone()).await { + Ok(new_service) => { + info!("Configuration updated."); + service = new_service; }, Err(e) => { - error!("Invalid configuration. Configuration does not updated: {e}"); - continue; + error!("rpxy failed to be ready. Configuration does not updated: {e}"); } }; - match build_cert_manager(&config_toml).await { - Ok(c) => { - cert_service_and_rx = c; - }, - Err(e) => { - error!("Invalid cert configuration. Configuration does not updated: {e}"); - continue; - } - }; - #[cfg(feature = "acme")] - { - match build_acme_manager(&config_toml, runtime_handle.clone()).await { - Ok(m) => { - acme_manager = m; - }, - Err(e) => { - error!("Invalid acme configuration. Configuration does not updated: {e}"); - continue; - } - } - } - - info!("Configuration updated. Terminate all spawned services and force to re-bind TCP/UDP sockets"); + info!("Terminate all spawned services and force to re-bind TCP/UDP sockets"); cancel_token.cancel(); } - else => break } } - - Ok(()) -} - -#[cfg(not(feature = "acme"))] -/// Wrapper of entry point for rpxy service with certificate management service -async fn rpxy_entrypoint( - rpxy_opts: &RpxyOptions, - cert_service: Option<&ReloaderService>, - // runtime_handle: &tokio::runtime::Handle, -) -> Result<(), anyhow::Error> { - // TODO: refactor: update routine - if let Some(cert_service) = cert_service { - tokio::select! { - rpxy_res = entrypoint(rpxy_opts) => { - error!("rpxy entrypoint exited"); - rpxy_res.map_err(|e| anyhow!(e)) - } - cert_res = cert_service.start() => { - error!("cert reloader service exited"); - cert_res.map_err(|e| anyhow!(e)) - } - } - } else { - entrypoint(rpxy_opts).await.map_err(|e| anyhow!(e)) - } -} - -#[cfg(feature = "acme")] -/// Wrapper of entry point for rpxy service with certificate management service -async fn rpxy_entrypoint( - rpxy_opts: &RpxyOptions, - cert_service: Option<&ReloaderService>, - acme_task_handles: Vec>, - // runtime_handle: &tokio::runtime::Handle, -) -> Result<(), anyhow::Error> { - // TODO: refactor: update routine - if let Some(cert_service) = cert_service { - if acme_task_handles.is_empty() { - tokio::select! { - rpxy_res = entrypoint(rpxy_opts) => { - error!("rpxy entrypoint exited"); - rpxy_res.map_err(|e| anyhow!(e)) - } - cert_res = cert_service.start() => { - error!("cert reloader service exited"); - cert_res.map_err(|e| anyhow!(e)) - } - } - } else { - let select_all = futures_util::future::select_all(acme_task_handles); - tokio::select! { - rpxy_res = entrypoint(rpxy_opts) => { - error!("rpxy entrypoint exited"); - rpxy_res.map_err(|e| anyhow!(e)) - } - (acme_res, _, _) = select_all => { - error!("acme manager exited"); - acme_res.map_err(|e| anyhow!(e)) - } - cert_res = cert_service.start() => { - error!("cert reloader service exited"); - cert_res.map_err(|e| anyhow!(e)) - } - } - } - } else { - entrypoint(rpxy_opts).await.map_err(|e| anyhow!(e)) - } } diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index de1983d..c8b73bf 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -1,7 +1,7 @@ use crate::{constants::*, count::RequestCount}; use hot_reload::ReloaderReceiver; use rpxy_certs::ServerCryptoBase; -use std::{net::SocketAddr, sync::Arc, time::Duration}; +use std::{net::SocketAddr, time::Duration}; use tokio_util::sync::CancellationToken; /// Global object containing proxy configurations and shared object like counters. @@ -14,13 +14,13 @@ pub struct Globals { /// Shared context - Async task runtime handler pub runtime_handle: tokio::runtime::Handle, /// Shared context - Notify object to stop async tasks - pub cancel_token: Option, + pub cancel_token: CancellationToken, /// Shared context - Certificate reloader service receiver // TODO: newer one pub cert_reloader_rx: Option>, #[cfg(feature = "acme")] /// ServerConfig used for only ACME challenge for ACME domains - pub server_configs_acme_challenge: Arc>>, + pub server_configs_acme_challenge: std::sync::Arc>>, } /// Configuration parameters for proxy transport and request handlers diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index fdfb81d..43463b2 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -108,7 +108,7 @@ pub async fn entrypoint( proxy_config: proxy_config.clone(), request_count: Default::default(), runtime_handle: runtime_handle.clone(), - cancel_token: cancel_token.clone(), + cancel_token: cancel_token.clone().unwrap_or_default(), cert_reloader_rx: cert_rx.clone(), #[cfg(feature = "acme")] @@ -130,7 +130,6 @@ pub async fn entrypoint( let connection_builder = proxy::connection_builder(&globals); // spawn each proxy for a given socket with copied Arc-ed backend, message_handler and connection builder. - let parent_cancel_token = globals.cancel_token.clone().unwrap_or_default(); let addresses = globals.proxy_config.listen_sockets.clone(); let join_handles = addresses.into_iter().map(|listening_on| { let mut tls_enabled = false; @@ -145,8 +144,8 @@ pub async fn entrypoint( message_handler: message_handler.clone(), }; - let cancel_token = parent_cancel_token.child_token(); - let parent_cancel_token_clone = parent_cancel_token.clone(); + let cancel_token = globals.cancel_token.child_token(); + let parent_cancel_token_clone = globals.cancel_token.clone(); globals.runtime_handle.spawn(async move { info!("rpxy proxy service for {listening_on} started"); tokio::select! { From e48efa010949bb001a76754f641a2955addd8f28 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Sat, 27 Jul 2024 03:32:51 +0900 Subject: [PATCH 3/4] fix: typo --- rpxy-bin/src/constants.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs index 3173fa9..53c8bbc 100644 --- a/rpxy-bin/src/constants.rs +++ b/rpxy-bin/src/constants.rs @@ -1,6 +1,6 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; -pub const CONFIG_WATCH_DELAY_SECS: u32 = 2; +pub const CONFIG_WATCH_DELAY_SECS: u32 = 20; #[cfg(feature = "cache")] // Cache directory From 3b918af40b515b89142d84435c6f8186b16ef04f Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Sat, 27 Jul 2024 04:38:25 +0900 Subject: [PATCH 4/4] chore: refactor dynamic reloading --- Cargo.toml | 2 +- rpxy-acme/src/manager.rs | 2 +- rpxy-bin/src/constants.rs | 2 +- rpxy-bin/src/main.rs | 84 ++++++++++++++++----------------------- rpxy-lib/src/globals.rs | 2 +- rpxy-lib/src/lib.rs | 28 +++++++------ 6 files changed, 54 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e990fdf..554a677 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.9.0-alpha.1" +version = "0.9.0-alpha.2" authors = ["Jun Kurihara"] homepage = "https://github.com/junkurihara/rust-rpxy" repository = "https://github.com/junkurihara/rust-rpxy" diff --git a/rpxy-acme/src/manager.rs b/rpxy-acme/src/manager.rs index 9242743..fe10c85 100644 --- a/rpxy-acme/src/manager.rs +++ b/rpxy-acme/src/manager.rs @@ -115,7 +115,7 @@ impl AcmeManager { if let Some(cancel_token) = cancel_token.as_ref() { tokio::select! { _ = task => {}, - _ = cancel_token.cancelled() => { info!("rpxy ACME manager task for {domain} terminated") } + _ = cancel_token.cancelled() => { debug!("rpxy ACME manager task for {domain} terminated") } } } else { task.await; diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs index 53c8bbc..2f27735 100644 --- a/rpxy-bin/src/constants.rs +++ b/rpxy-bin/src/constants.rs @@ -1,6 +1,6 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; -pub const CONFIG_WATCH_DELAY_SECS: u32 = 20; +pub const CONFIG_WATCH_DELAY_SECS: u32 = 15; #[cfg(feature = "cache")] // Cache directory diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index 9acdc2c..ce96253 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -121,10 +121,7 @@ impl RpxyService { .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) .server_configs_acme_challenge(Arc::new(server_config_acme_challenge)) .build()?; - self - .start_inner(rpxy_opts, acme_join_handles) //, &runtime_handle) - .await - .map_err(|e| anyhow!(e)) + self.start_inner(rpxy_opts, acme_join_handles).await.map_err(|e| anyhow!(e)) } #[cfg(not(feature = "acme"))] @@ -136,10 +133,7 @@ impl RpxyService { .runtime_handle(runtime_handle.clone()) .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) .build()?; - self - .start_inner(rpxy_opts) //, &runtime_handle) - .await - .map_err(|e| anyhow!(e)) + self.start_inner(rpxy_opts).await.map_err(|e| anyhow!(e)) } } @@ -147,53 +141,49 @@ impl RpxyService { async fn start_inner( &self, rpxy_opts: RpxyOptions, - // cert_service: Option<&Arc>>, #[cfg(feature = "acme")] acme_task_handles: Vec>, - // runtime_handle: &tokio::runtime::Handle, ) -> Result<(), anyhow::Error> { - let cancel_token = rpxy_opts.cancel_token.clone().unwrap_or_default(); + let cancel_token = rpxy_opts.cancel_token.clone(); let runtime_handle = rpxy_opts.runtime_handle.clone(); - // spawn rpxy entry point + // spawn rpxy entrypoint, where cancellation token is possibly contained inside the service let cancel_token_clone = cancel_token.clone(); - let child_cancel_token = cancel_token.child_token(); let rpxy_handle = runtime_handle.spawn(async move { - tokio::select! { - rpxy_res = entrypoint(&rpxy_opts) => { - if let Err(ref e) = rpxy_res { - error!("rpxy entrypoint exited on error: {e}"); - } - cancel_token_clone.cancel(); - rpxy_res.map_err(|e| anyhow!(e)) - } - _ = child_cancel_token.cancelled() => { - debug!("rpxy entrypoint terminated by cancel token"); - Ok(()) + if let Err(e) = entrypoint(&rpxy_opts).await { + error!("rpxy entrypoint exited on error: {e}"); + if let Some(cancel_token) = cancel_token_clone { + cancel_token.cancel(); } + return Err(anyhow!(e)); } + Ok(()) }); if self.cert_service.is_none() { return rpxy_handle.await?; } - // spawn certificate reloader service + // spawn certificate reloader service, where cert service does not have cancellation token inside the service let cert_service = self.cert_service.as_ref().unwrap().clone(); let cancel_token_clone = cancel_token.clone(); - let child_cancel_token = cancel_token.child_token(); + let child_cancel_token = cancel_token.as_ref().map(|c| c.child_token()); let cert_handle = runtime_handle.spawn(async move { - tokio::select! { - cert_res = cert_service.start() => { - if let Err(ref e) = cert_res { - error!("cert reloader service exited on error: {e}"); + if let Some(child_cancel_token) = child_cancel_token { + tokio::select! { + cert_res = cert_service.start() => { + if let Err(ref e) = cert_res { + error!("cert reloader service exited on error: {e}"); + } + cancel_token_clone.unwrap().cancel(); + cert_res.map_err(|e| anyhow!(e)) + } + _ = child_cancel_token.cancelled() => { + debug!("cert reloader service terminated"); + Ok(()) } - cancel_token_clone.cancel(); - cert_res.map_err(|e| anyhow!(e)) - } - _ = child_cancel_token.cancelled() => { - debug!("cert reloader service terminated by cancel token"); - Ok(()) } + } else { + cert_service.start().await.map_err(|e| anyhow!(e)) } }); @@ -220,24 +210,18 @@ impl RpxyService { }; } - // spawn acme manager tasks + // spawn acme manager tasks, where cancellation token is possibly contained inside the service let select_all = futures_util::future::select_all(acme_task_handles); let cancel_token_clone = cancel_token.clone(); - let child_cancel_token = cancel_token.child_token(); let acme_handle = runtime_handle.spawn(async move { - tokio::select! { - (acme_res, _, _) = select_all => { - if let Err(ref e) = acme_res { - error!("acme manager exited on error: {e}"); - } - cancel_token_clone.cancel(); - acme_res.map_err(|e| anyhow!(e)) - } - _ = child_cancel_token.cancelled() => { - debug!("acme manager terminated by cancel token"); - Ok(()) - } + let (acme_res, _, _) = select_all.await; + if let Err(ref e) = acme_res { + error!("acme manager exited on error: {e}"); } + if let Some(cancel_token) = cancel_token_clone { + cancel_token.cancel(); + } + acme_res.map_err(|e| anyhow!(e)) }); let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle); let (rpxy_res, cert_res, acme_res) = (rpxy_res?, cert_res?, acme_res?); diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index c8b73bf..97aadef 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -14,7 +14,7 @@ pub struct Globals { /// Shared context - Async task runtime handler pub runtime_handle: tokio::runtime::Handle, /// Shared context - Notify object to stop async tasks - pub cancel_token: CancellationToken, + pub cancel_token: Option, /// Shared context - Certificate reloader service receiver // TODO: newer one pub cert_reloader_rx: Option>, diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index 43463b2..f5f6d59 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -108,7 +108,7 @@ pub async fn entrypoint( proxy_config: proxy_config.clone(), request_count: Default::default(), runtime_handle: runtime_handle.clone(), - cancel_token: cancel_token.clone().unwrap_or_default(), + cancel_token: cancel_token.clone(), cert_reloader_rx: cert_rx.clone(), #[cfg(feature = "acme")] @@ -144,21 +144,25 @@ pub async fn entrypoint( message_handler: message_handler.clone(), }; - let cancel_token = globals.cancel_token.child_token(); + let cancel_token = globals.cancel_token.as_ref().map(|t| t.child_token()); let parent_cancel_token_clone = globals.cancel_token.clone(); globals.runtime_handle.spawn(async move { info!("rpxy proxy service for {listening_on} started"); - tokio::select! { - _ = cancel_token.cancelled() => { - info!("rpxy proxy service for {listening_on} terminated"); - Ok(()) - }, - proxy_res = proxy.start() => { - info!("rpxy proxy service for {listening_on} exited"); - // cancel other proxy tasks - parent_cancel_token_clone.cancel(); - proxy_res + if let Some(cancel_token) = cancel_token { + tokio::select! { + _ = cancel_token.cancelled() => { + debug!("rpxy proxy service for {listening_on} terminated"); + Ok(()) + }, + proxy_res = proxy.start() => { + info!("rpxy proxy service for {listening_on} exited"); + // cancel other proxy tasks + parent_cancel_token_clone.unwrap().cancel(); + proxy_res + } } + } else { + proxy.start().await } }) });