From 3b918af40b515b89142d84435c6f8186b16ef04f Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Sat, 27 Jul 2024 04:38:25 +0900 Subject: [PATCH] chore: refactor dynamic reloading --- Cargo.toml | 2 +- rpxy-acme/src/manager.rs | 2 +- rpxy-bin/src/constants.rs | 2 +- rpxy-bin/src/main.rs | 84 ++++++++++++++++----------------------- rpxy-lib/src/globals.rs | 2 +- rpxy-lib/src/lib.rs | 28 +++++++------ 6 files changed, 54 insertions(+), 66 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index e990fdf..554a677 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.9.0-alpha.1" +version = "0.9.0-alpha.2" authors = ["Jun Kurihara"] homepage = "https://github.com/junkurihara/rust-rpxy" repository = "https://github.com/junkurihara/rust-rpxy" diff --git a/rpxy-acme/src/manager.rs b/rpxy-acme/src/manager.rs index 9242743..fe10c85 100644 --- a/rpxy-acme/src/manager.rs +++ b/rpxy-acme/src/manager.rs @@ -115,7 +115,7 @@ impl AcmeManager { if let Some(cancel_token) = cancel_token.as_ref() { tokio::select! { _ = task => {}, - _ = cancel_token.cancelled() => { info!("rpxy ACME manager task for {domain} terminated") } + _ = cancel_token.cancelled() => { debug!("rpxy ACME manager task for {domain} terminated") } } } else { task.await; diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs index 53c8bbc..2f27735 100644 --- a/rpxy-bin/src/constants.rs +++ b/rpxy-bin/src/constants.rs @@ -1,6 +1,6 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; -pub const CONFIG_WATCH_DELAY_SECS: u32 = 20; +pub const CONFIG_WATCH_DELAY_SECS: u32 = 15; #[cfg(feature = "cache")] // Cache directory diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index 9acdc2c..ce96253 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -121,10 +121,7 @@ impl RpxyService { .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) .server_configs_acme_challenge(Arc::new(server_config_acme_challenge)) .build()?; - self - .start_inner(rpxy_opts, acme_join_handles) //, &runtime_handle) - .await - .map_err(|e| anyhow!(e)) + self.start_inner(rpxy_opts, acme_join_handles).await.map_err(|e| anyhow!(e)) } #[cfg(not(feature = "acme"))] @@ -136,10 +133,7 @@ impl RpxyService { .runtime_handle(runtime_handle.clone()) .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) .build()?; - self - .start_inner(rpxy_opts) //, &runtime_handle) - .await - .map_err(|e| anyhow!(e)) + self.start_inner(rpxy_opts).await.map_err(|e| anyhow!(e)) } } @@ -147,53 +141,49 @@ impl RpxyService { async fn start_inner( &self, rpxy_opts: RpxyOptions, - // cert_service: Option<&Arc>>, #[cfg(feature = "acme")] acme_task_handles: Vec>, - // runtime_handle: &tokio::runtime::Handle, ) -> Result<(), anyhow::Error> { - let cancel_token = rpxy_opts.cancel_token.clone().unwrap_or_default(); + let cancel_token = rpxy_opts.cancel_token.clone(); let runtime_handle = rpxy_opts.runtime_handle.clone(); - // spawn rpxy entry point + // spawn rpxy entrypoint, where cancellation token is possibly contained inside the service let cancel_token_clone = cancel_token.clone(); - let child_cancel_token = cancel_token.child_token(); let rpxy_handle = runtime_handle.spawn(async move { - tokio::select! { - rpxy_res = entrypoint(&rpxy_opts) => { - if let Err(ref e) = rpxy_res { - error!("rpxy entrypoint exited on error: {e}"); - } - cancel_token_clone.cancel(); - rpxy_res.map_err(|e| anyhow!(e)) - } - _ = child_cancel_token.cancelled() => { - debug!("rpxy entrypoint terminated by cancel token"); - Ok(()) + if let Err(e) = entrypoint(&rpxy_opts).await { + error!("rpxy entrypoint exited on error: {e}"); + if let Some(cancel_token) = cancel_token_clone { + cancel_token.cancel(); } + return Err(anyhow!(e)); } + Ok(()) }); if self.cert_service.is_none() { return rpxy_handle.await?; } - // spawn certificate reloader service + // spawn certificate reloader service, where cert service does not have cancellation token inside the service let cert_service = self.cert_service.as_ref().unwrap().clone(); let cancel_token_clone = cancel_token.clone(); - let child_cancel_token = cancel_token.child_token(); + let child_cancel_token = cancel_token.as_ref().map(|c| c.child_token()); let cert_handle = runtime_handle.spawn(async move { - tokio::select! { - cert_res = cert_service.start() => { - if let Err(ref e) = cert_res { - error!("cert reloader service exited on error: {e}"); + if let Some(child_cancel_token) = child_cancel_token { + tokio::select! { + cert_res = cert_service.start() => { + if let Err(ref e) = cert_res { + error!("cert reloader service exited on error: {e}"); + } + cancel_token_clone.unwrap().cancel(); + cert_res.map_err(|e| anyhow!(e)) + } + _ = child_cancel_token.cancelled() => { + debug!("cert reloader service terminated"); + Ok(()) } - cancel_token_clone.cancel(); - cert_res.map_err(|e| anyhow!(e)) - } - _ = child_cancel_token.cancelled() => { - debug!("cert reloader service terminated by cancel token"); - Ok(()) } + } else { + cert_service.start().await.map_err(|e| anyhow!(e)) } }); @@ -220,24 +210,18 @@ impl RpxyService { }; } - // spawn acme manager tasks + // spawn acme manager tasks, where cancellation token is possibly contained inside the service let select_all = futures_util::future::select_all(acme_task_handles); let cancel_token_clone = cancel_token.clone(); - let child_cancel_token = cancel_token.child_token(); let acme_handle = runtime_handle.spawn(async move { - tokio::select! { - (acme_res, _, _) = select_all => { - if let Err(ref e) = acme_res { - error!("acme manager exited on error: {e}"); - } - cancel_token_clone.cancel(); - acme_res.map_err(|e| anyhow!(e)) - } - _ = child_cancel_token.cancelled() => { - debug!("acme manager terminated by cancel token"); - Ok(()) - } + let (acme_res, _, _) = select_all.await; + if let Err(ref e) = acme_res { + error!("acme manager exited on error: {e}"); } + if let Some(cancel_token) = cancel_token_clone { + cancel_token.cancel(); + } + acme_res.map_err(|e| anyhow!(e)) }); let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle); let (rpxy_res, cert_res, acme_res) = (rpxy_res?, cert_res?, acme_res?); diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index c8b73bf..97aadef 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -14,7 +14,7 @@ pub struct Globals { /// Shared context - Async task runtime handler pub runtime_handle: tokio::runtime::Handle, /// Shared context - Notify object to stop async tasks - pub cancel_token: CancellationToken, + pub cancel_token: Option, /// Shared context - Certificate reloader service receiver // TODO: newer one pub cert_reloader_rx: Option>, diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index 43463b2..f5f6d59 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -108,7 +108,7 @@ pub async fn entrypoint( proxy_config: proxy_config.clone(), request_count: Default::default(), runtime_handle: runtime_handle.clone(), - cancel_token: cancel_token.clone().unwrap_or_default(), + cancel_token: cancel_token.clone(), cert_reloader_rx: cert_rx.clone(), #[cfg(feature = "acme")] @@ -144,21 +144,25 @@ pub async fn entrypoint( message_handler: message_handler.clone(), }; - let cancel_token = globals.cancel_token.child_token(); + let cancel_token = globals.cancel_token.as_ref().map(|t| t.child_token()); let parent_cancel_token_clone = globals.cancel_token.clone(); globals.runtime_handle.spawn(async move { info!("rpxy proxy service for {listening_on} started"); - tokio::select! { - _ = cancel_token.cancelled() => { - info!("rpxy proxy service for {listening_on} terminated"); - Ok(()) - }, - proxy_res = proxy.start() => { - info!("rpxy proxy service for {listening_on} exited"); - // cancel other proxy tasks - parent_cancel_token_clone.cancel(); - proxy_res + if let Some(cancel_token) = cancel_token { + tokio::select! { + _ = cancel_token.cancelled() => { + debug!("rpxy proxy service for {listening_on} terminated"); + Ok(()) + }, + proxy_res = proxy.start() => { + info!("rpxy proxy service for {listening_on} exited"); + // cancel other proxy tasks + parent_cancel_token_clone.unwrap().cancel(); + proxy_res + } } + } else { + proxy.start().await } }) });