From 4b3f71975916b38a05e9a7c7f8bf6495badc30af Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Thu, 6 Feb 2025 18:51:38 +0900 Subject: [PATCH 1/2] chore: deps --- rpxy-bin/Cargo.toml | 4 ++-- rpxy-lib/Cargo.toml | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index 06df18a..1b1e28e 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -45,8 +45,8 @@ async-trait = "0.1.86" futures-util = { version = "0.3.31", default-features = false } # config -clap = { version = "4.5.27", features = ["std", "cargo", "wrap_help"] } -toml = { version = "0.8.19", default-features = false, features = ["parse"] } +clap = { version = "4.5.28", features = ["std", "cargo", "wrap_help"] } +toml = { version = "0.8.20", default-features = false, features = ["parse"] } hot_reload = "0.1.8" serde_ignored = "0.1.10" diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index 627d3f7..5bf6109 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -38,7 +38,7 @@ post-quantum = [ [dependencies] rand = "0.9.0" ahash = "0.8.11" -bytes = "1.9.0" +bytes = "1.10.0" derive_builder = "0.20.2" futures = { version = "0.3.31", features = ["alloc", "async-await"] } tokio = { version = "1.43.0", default-features = false, features = [ From 8d0adde1b0d3aaf96e78c51c967e5332e10598cb Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Fri, 14 Feb 2025 02:36:34 +0900 Subject: [PATCH 2/2] refactor: simplify the watcher option of inner handler --- rpxy-acme/src/manager.rs | 13 +++---- rpxy-bin/src/main.rs | 58 +++++++++++++++----------------- rpxy-lib/src/globals.rs | 3 -- rpxy-lib/src/lib.rs | 33 +++++++----------- rpxy-lib/src/proxy/proxy_main.rs | 44 ++++++++++++++++++------ 5 files changed, 79 insertions(+), 72 deletions(-) diff --git a/rpxy-acme/src/manager.rs b/rpxy-acme/src/manager.rs index 1a4c091..73b786d 100644 --- a/rpxy-acme/src/manager.rs +++ b/rpxy-acme/src/manager.rs @@ -77,7 +77,7 @@ impl AcmeManager { /// Returns a Vec> as a tasks handles and a map of domain to ServerConfig for challenge. pub fn spawn_manager_tasks( &self, - cancel_token: Option, + cancel_token: tokio_util::sync::CancellationToken, ) -> (Vec>, HashMap>) { let rustls_client_config = rustls::ClientConfig::builder() .dangerous() // The `Verifier` we're using is actually safe @@ -115,13 +115,10 @@ impl AcmeManager { } } }; - if let Some(cancel_token) = cancel_token.as_ref() { - tokio::select! { - _ = task => {}, - _ = cancel_token.cancelled() => { debug!("rpxy ACME manager task for {domain} terminated") } - } - } else { - task.await; + + tokio::select! { + _ = task => {}, + _ = cancel_token.cancelled() => { debug!("rpxy ACME manager task for {domain} terminated") } } } }) diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index d1156d8..855561f 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -99,7 +99,7 @@ impl RpxyService { }) } - async fn start(&self, cancel_token: Option) -> Result<(), anyhow::Error> { + async fn start(&self, cancel_token: CancellationToken) -> Result<(), anyhow::Error> { let RpxyService { runtime_handle, proxy_conf, @@ -114,17 +114,19 @@ impl RpxyService { { let (acme_join_handles, server_config_acme_challenge) = acme_manager .as_ref() - .map(|m| m.spawn_manager_tasks(cancel_token.as_ref().map(|t| t.child_token()))) + .map(|m| m.spawn_manager_tasks(cancel_token.child_token())) .unwrap_or((vec![], Default::default())); let rpxy_opts = RpxyOptionsBuilder::default() .proxy_config(proxy_conf.clone()) .app_config_list(app_conf.clone()) .cert_rx(cert_rx.clone()) .runtime_handle(runtime_handle.clone()) - .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) .server_configs_acme_challenge(Arc::new(server_config_acme_challenge)) .build()?; - self.start_inner(rpxy_opts, acme_join_handles).await.map_err(|e| anyhow!(e)) + self + .start_inner(rpxy_opts, cancel_token, acme_join_handles) + .await + .map_err(|e| anyhow!(e)) } #[cfg(not(feature = "acme"))] @@ -134,9 +136,8 @@ impl RpxyService { .app_config_list(app_conf.clone()) .cert_rx(cert_rx.clone()) .runtime_handle(runtime_handle.clone()) - .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) .build()?; - self.start_inner(rpxy_opts).await.map_err(|e| anyhow!(e)) + self.start_inner(rpxy_opts, cancel_token).await.map_err(|e| anyhow!(e)) } } @@ -144,19 +145,19 @@ impl RpxyService { async fn start_inner( &self, rpxy_opts: RpxyOptions, + cancel_token: CancellationToken, #[cfg(feature = "acme")] acme_task_handles: Vec>, ) -> Result<(), anyhow::Error> { - let cancel_token = rpxy_opts.cancel_token.clone(); + let cancel_token = cancel_token.clone(); let runtime_handle = rpxy_opts.runtime_handle.clone(); // spawn rpxy entrypoint, where cancellation token is possibly contained inside the service let cancel_token_clone = cancel_token.clone(); + let child_cancel_token = cancel_token.child_token(); let rpxy_handle = runtime_handle.spawn(async move { - if let Err(e) = entrypoint(&rpxy_opts).await { + if let Err(e) = entrypoint(&rpxy_opts, child_cancel_token).await { error!("rpxy entrypoint exited on error: {e}"); - if let Some(cancel_token) = cancel_token_clone { - cancel_token.cancel(); - } + cancel_token_clone.cancel(); return Err(anyhow!(e)); } Ok(()) @@ -169,24 +170,20 @@ impl RpxyService { // spawn certificate reloader service, where cert service does not have cancellation token inside the service let cert_service = self.cert_service.as_ref().unwrap().clone(); let cancel_token_clone = cancel_token.clone(); - let child_cancel_token = cancel_token.as_ref().map(|c| c.child_token()); + let child_cancel_token = cancel_token.child_token(); let cert_handle = runtime_handle.spawn(async move { - if let Some(child_cancel_token) = child_cancel_token { - tokio::select! { - cert_res = cert_service.start() => { - if let Err(ref e) = cert_res { - error!("cert reloader service exited on error: {e}"); - } - cancel_token_clone.unwrap().cancel(); - cert_res.map_err(|e| anyhow!(e)) - } - _ = child_cancel_token.cancelled() => { - debug!("cert reloader service terminated"); - Ok(()) + tokio::select! { + cert_res = cert_service.start() => { + if let Err(ref e) = cert_res { + error!("cert reloader service exited on error: {e}"); } + cancel_token_clone.cancel(); + cert_res.map_err(|e| anyhow!(e)) + } + _ = child_cancel_token.cancelled() => { + debug!("cert reloader service terminated"); + Ok(()) } - } else { - cert_service.start().await.map_err(|e| anyhow!(e)) } }); @@ -221,9 +218,7 @@ impl RpxyService { if let Err(ref e) = acme_res { error!("acme manager exited on error: {e}"); } - if let Some(cancel_token) = cancel_token_clone { - cancel_token.cancel(); - } + cancel_token_clone.cancel(); acme_res.map_err(|e| anyhow!(e)) }); let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle); @@ -245,7 +240,8 @@ async fn rpxy_service_without_watcher( info!("Start rpxy service"); let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?; let service = RpxyService::new(&config_toml, runtime_handle).await?; - service.start(None).await + // Create cancel token that is never be called as dummy + service.start(tokio_util::sync::CancellationToken::new()).await } async fn rpxy_service_with_watcher( @@ -268,7 +264,7 @@ async fn rpxy_service_with_watcher( tokio::select! { /* ---------- */ - rpxy_res = service.start(Some(cancel_token.clone())) => { + rpxy_res = service.start(cancel_token.clone()) => { if let Err(ref e) = rpxy_res { error!("rpxy service exited on error: {e}"); } else { diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index a52f066..c25483f 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -2,7 +2,6 @@ use crate::{constants::*, count::RequestCount}; use hot_reload::ReloaderReceiver; use rpxy_certs::ServerCryptoBase; use std::{net::SocketAddr, time::Duration}; -use tokio_util::sync::CancellationToken; /// Global object containing proxy configurations and shared object like counters. /// But note that in Globals, we do not have Mutex and RwLock. It is indeed, the context shared among async tasks. @@ -13,8 +12,6 @@ pub struct Globals { pub request_count: RequestCount, /// Shared context - Async task runtime handler pub runtime_handle: tokio::runtime::Handle, - /// Shared context - Notify object to stop async tasks - pub cancel_token: Option, /// Shared context - Certificate reloader service receiver // TODO: newer one pub cert_reloader_rx: Option>, diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index 4cc23ab..4ec60e0 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -43,8 +43,6 @@ pub struct RpxyOptions { pub cert_rx: Option>, // TODO: /// Async task runtime handler pub runtime_handle: tokio::runtime::Handle, - /// Notify object to stop async tasks - pub cancel_token: Option, #[cfg(feature = "acme")] /// ServerConfig used for only ACME challenge for ACME domains @@ -58,10 +56,10 @@ pub async fn entrypoint( app_config_list, cert_rx, // TODO: runtime_handle, - cancel_token, #[cfg(feature = "acme")] server_configs_acme_challenge, }: &RpxyOptions, + cancel_token: CancellationToken, ) -> RpxyResult<()> { #[cfg(all(feature = "http3-quinn", feature = "http3-s2n"))] warn!("Both \"http3-quinn\" and \"http3-s2n\" features are enabled. \"http3-quinn\" will be used"); @@ -117,7 +115,6 @@ pub async fn entrypoint( proxy_config: proxy_config.clone(), request_count: Default::default(), runtime_handle: runtime_handle.clone(), - cancel_token: cancel_token.clone(), cert_reloader_rx: cert_rx.clone(), #[cfg(feature = "acme")] @@ -153,25 +150,21 @@ pub async fn entrypoint( message_handler: message_handler.clone(), }; - let cancel_token = globals.cancel_token.as_ref().map(|t| t.child_token()); - let parent_cancel_token_clone = globals.cancel_token.clone(); + let cancel_token = cancel_token.clone(); globals.runtime_handle.spawn(async move { info!("rpxy proxy service for {listening_on} started"); - if let Some(cancel_token) = cancel_token { - tokio::select! { - _ = cancel_token.cancelled() => { - debug!("rpxy proxy service for {listening_on} terminated"); - Ok(()) - }, - proxy_res = proxy.start() => { - info!("rpxy proxy service for {listening_on} exited"); - // cancel other proxy tasks - parent_cancel_token_clone.unwrap().cancel(); - proxy_res - } + + tokio::select! { + _ = cancel_token.cancelled() => { + debug!("rpxy proxy service for {listening_on} terminated"); + Ok(()) + }, + proxy_res = proxy.start(cancel_token.child_token()) => { + info!("rpxy proxy service for {listening_on} exited"); + // cancel other proxy tasks + cancel_token.cancel(); + proxy_res } - } else { - proxy.start().await } }) }); diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 40e14e5..5244ecf 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -22,6 +22,7 @@ use hyper_util::{client::legacy::connect::Connect, rt::TokioIo, server::conn::au use rpxy_certs::ServerCrypto; use std::{net::SocketAddr, sync::Arc, time::Duration}; use tokio::time::timeout; +use tokio_util::sync::CancellationToken; /// Wrapper function to handle request for HTTP/1.1 and HTTP/2 /// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler @@ -129,7 +130,7 @@ where } /// Start with TLS (HTTPS) - pub(super) async fn start_with_tls(&self) -> RpxyResult<()> { + pub(super) async fn start_with_tls(&self, cancel_token: CancellationToken) -> RpxyResult<()> { #[cfg(not(any(feature = "http3-quinn", feature = "http3-s2n")))] { self.tls_listener_service().await?; @@ -139,14 +140,37 @@ where #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] { if self.globals.proxy_config.http3 { - select! { - _ = self.tls_listener_service().fuse() => { - error!("TCP proxy service for TLS exited"); - }, - _ = self.h3_listener_service().fuse() => { - error!("UDP proxy service for QUIC exited"); + let jh_tls = self.globals.runtime_handle.spawn({ + let self_clone = self.clone(); + let cancel_token = cancel_token.clone(); + async move { + select! { + _ = self_clone.tls_listener_service().fuse() => { + error!("TCP proxy service for TLS exited"); + cancel_token.cancel(); + }, + _ = cancel_token.cancelled().fuse() => { + debug!("Cancel token is called for TLS listener"); + } + } } - }; + }); + let jh_h3 = self.globals.runtime_handle.spawn({ + let self_clone = self.clone(); + async move { + select! { + _ = self_clone.h3_listener_service().fuse() => { + error!("UDP proxy service for QUIC exited"); + cancel_token.cancel(); + }, + _ = cancel_token.cancelled().fuse() => { + debug!("Cancel token is called for QUIC listener"); + } + } + } + }); + let _ = futures::future::join(jh_tls, jh_h3).await; + Ok(()) } else { self.tls_listener_service().await?; @@ -303,10 +327,10 @@ where } /// Entrypoint for HTTP/1.1, 2 and 3 servers - pub async fn start(&self) -> RpxyResult<()> { + pub async fn start(&self, cancel_token: CancellationToken) -> RpxyResult<()> { let proxy_service = async { if self.tls_enabled { - self.start_with_tls().await + self.start_with_tls(cancel_token).await } else { self.start_without_tls().await }