Merge branch 'develop' of github.com:junkurihara/rust-rpxy into develop
This commit is contained in:
		
				commit
				
					
						38f1f52e0a
					
				
			
		
					 5 changed files with 79 additions and 72 deletions
				
			
		|  | @ -77,7 +77,7 @@ impl AcmeManager { | ||||||
|   /// Returns a Vec<JoinHandle<()>> as a tasks handles and a map of domain to ServerConfig for challenge.
 |   /// Returns a Vec<JoinHandle<()>> as a tasks handles and a map of domain to ServerConfig for challenge.
 | ||||||
|   pub fn spawn_manager_tasks( |   pub fn spawn_manager_tasks( | ||||||
|     &self, |     &self, | ||||||
|     cancel_token: Option<tokio_util::sync::CancellationToken>, |     cancel_token: tokio_util::sync::CancellationToken, | ||||||
|   ) -> (Vec<tokio::task::JoinHandle<()>>, HashMap<String, Arc<ServerConfig>>) { |   ) -> (Vec<tokio::task::JoinHandle<()>>, HashMap<String, Arc<ServerConfig>>) { | ||||||
|     let rustls_client_config = rustls::ClientConfig::builder() |     let rustls_client_config = rustls::ClientConfig::builder() | ||||||
|       .dangerous() // The `Verifier` we're using is actually safe
 |       .dangerous() // The `Verifier` we're using is actually safe
 | ||||||
|  | @ -115,13 +115,10 @@ impl AcmeManager { | ||||||
|                 } |                 } | ||||||
|               } |               } | ||||||
|             }; |             }; | ||||||
|             if let Some(cancel_token) = cancel_token.as_ref() { | 
 | ||||||
|               tokio::select! { |             tokio::select! { | ||||||
|                 _ = task => {}, |               _ = task => {}, | ||||||
|                 _ = cancel_token.cancelled() => { debug!("rpxy ACME manager task for {domain} terminated") } |               _ = cancel_token.cancelled() => { debug!("rpxy ACME manager task for {domain} terminated") } | ||||||
|               } |  | ||||||
|             } else { |  | ||||||
|               task.await; |  | ||||||
|             } |             } | ||||||
|           } |           } | ||||||
|         }) |         }) | ||||||
|  |  | ||||||
|  | @ -99,7 +99,7 @@ impl RpxyService { | ||||||
|     }) |     }) | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   async fn start(&self, cancel_token: Option<CancellationToken>) -> Result<(), anyhow::Error> { |   async fn start(&self, cancel_token: CancellationToken) -> Result<(), anyhow::Error> { | ||||||
|     let RpxyService { |     let RpxyService { | ||||||
|       runtime_handle, |       runtime_handle, | ||||||
|       proxy_conf, |       proxy_conf, | ||||||
|  | @ -114,17 +114,19 @@ impl RpxyService { | ||||||
|     { |     { | ||||||
|       let (acme_join_handles, server_config_acme_challenge) = acme_manager |       let (acme_join_handles, server_config_acme_challenge) = acme_manager | ||||||
|         .as_ref() |         .as_ref() | ||||||
|         .map(|m| m.spawn_manager_tasks(cancel_token.as_ref().map(|t| t.child_token()))) |         .map(|m| m.spawn_manager_tasks(cancel_token.child_token())) | ||||||
|         .unwrap_or((vec![], Default::default())); |         .unwrap_or((vec![], Default::default())); | ||||||
|       let rpxy_opts = RpxyOptionsBuilder::default() |       let rpxy_opts = RpxyOptionsBuilder::default() | ||||||
|         .proxy_config(proxy_conf.clone()) |         .proxy_config(proxy_conf.clone()) | ||||||
|         .app_config_list(app_conf.clone()) |         .app_config_list(app_conf.clone()) | ||||||
|         .cert_rx(cert_rx.clone()) |         .cert_rx(cert_rx.clone()) | ||||||
|         .runtime_handle(runtime_handle.clone()) |         .runtime_handle(runtime_handle.clone()) | ||||||
|         .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) |  | ||||||
|         .server_configs_acme_challenge(Arc::new(server_config_acme_challenge)) |         .server_configs_acme_challenge(Arc::new(server_config_acme_challenge)) | ||||||
|         .build()?; |         .build()?; | ||||||
|       self.start_inner(rpxy_opts, acme_join_handles).await.map_err(|e| anyhow!(e)) |       self | ||||||
|  |         .start_inner(rpxy_opts, cancel_token, acme_join_handles) | ||||||
|  |         .await | ||||||
|  |         .map_err(|e| anyhow!(e)) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     #[cfg(not(feature = "acme"))] |     #[cfg(not(feature = "acme"))] | ||||||
|  | @ -134,9 +136,8 @@ impl RpxyService { | ||||||
|         .app_config_list(app_conf.clone()) |         .app_config_list(app_conf.clone()) | ||||||
|         .cert_rx(cert_rx.clone()) |         .cert_rx(cert_rx.clone()) | ||||||
|         .runtime_handle(runtime_handle.clone()) |         .runtime_handle(runtime_handle.clone()) | ||||||
|         .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) |  | ||||||
|         .build()?; |         .build()?; | ||||||
|       self.start_inner(rpxy_opts).await.map_err(|e| anyhow!(e)) |       self.start_inner(rpxy_opts, cancel_token).await.map_err(|e| anyhow!(e)) | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|  | @ -144,19 +145,19 @@ impl RpxyService { | ||||||
|   async fn start_inner( |   async fn start_inner( | ||||||
|     &self, |     &self, | ||||||
|     rpxy_opts: RpxyOptions, |     rpxy_opts: RpxyOptions, | ||||||
|  |     cancel_token: CancellationToken, | ||||||
|     #[cfg(feature = "acme")] acme_task_handles: Vec<tokio::task::JoinHandle<()>>, |     #[cfg(feature = "acme")] acme_task_handles: Vec<tokio::task::JoinHandle<()>>, | ||||||
|   ) -> Result<(), anyhow::Error> { |   ) -> Result<(), anyhow::Error> { | ||||||
|     let cancel_token = rpxy_opts.cancel_token.clone(); |     let cancel_token = cancel_token.clone(); | ||||||
|     let runtime_handle = rpxy_opts.runtime_handle.clone(); |     let runtime_handle = rpxy_opts.runtime_handle.clone(); | ||||||
| 
 | 
 | ||||||
|     // spawn rpxy entrypoint, where cancellation token is possibly contained inside the service
 |     // spawn rpxy entrypoint, where cancellation token is possibly contained inside the service
 | ||||||
|     let cancel_token_clone = cancel_token.clone(); |     let cancel_token_clone = cancel_token.clone(); | ||||||
|  |     let child_cancel_token = cancel_token.child_token(); | ||||||
|     let rpxy_handle = runtime_handle.spawn(async move { |     let rpxy_handle = runtime_handle.spawn(async move { | ||||||
|       if let Err(e) = entrypoint(&rpxy_opts).await { |       if let Err(e) = entrypoint(&rpxy_opts, child_cancel_token).await { | ||||||
|         error!("rpxy entrypoint exited on error: {e}"); |         error!("rpxy entrypoint exited on error: {e}"); | ||||||
|         if let Some(cancel_token) = cancel_token_clone { |         cancel_token_clone.cancel(); | ||||||
|           cancel_token.cancel(); |  | ||||||
|         } |  | ||||||
|         return Err(anyhow!(e)); |         return Err(anyhow!(e)); | ||||||
|       } |       } | ||||||
|       Ok(()) |       Ok(()) | ||||||
|  | @ -169,24 +170,20 @@ impl RpxyService { | ||||||
|     // spawn certificate reloader service, where cert service does not have cancellation token inside the service
 |     // spawn certificate reloader service, where cert service does not have cancellation token inside the service
 | ||||||
|     let cert_service = self.cert_service.as_ref().unwrap().clone(); |     let cert_service = self.cert_service.as_ref().unwrap().clone(); | ||||||
|     let cancel_token_clone = cancel_token.clone(); |     let cancel_token_clone = cancel_token.clone(); | ||||||
|     let child_cancel_token = cancel_token.as_ref().map(|c| c.child_token()); |     let child_cancel_token = cancel_token.child_token(); | ||||||
|     let cert_handle = runtime_handle.spawn(async move { |     let cert_handle = runtime_handle.spawn(async move { | ||||||
|       if let Some(child_cancel_token) = child_cancel_token { |       tokio::select! { | ||||||
|         tokio::select! { |         cert_res = cert_service.start() => { | ||||||
|           cert_res = cert_service.start() => { |           if let Err(ref e) = cert_res { | ||||||
|             if let Err(ref e) = cert_res { |             error!("cert reloader service exited on error: {e}"); | ||||||
|               error!("cert reloader service exited on error: {e}"); |  | ||||||
|             } |  | ||||||
|             cancel_token_clone.unwrap().cancel(); |  | ||||||
|             cert_res.map_err(|e| anyhow!(e)) |  | ||||||
|           } |  | ||||||
|           _ = child_cancel_token.cancelled() => { |  | ||||||
|             debug!("cert reloader service terminated"); |  | ||||||
|             Ok(()) |  | ||||||
|           } |           } | ||||||
|  |           cancel_token_clone.cancel(); | ||||||
|  |           cert_res.map_err(|e| anyhow!(e)) | ||||||
|  |         } | ||||||
|  |         _ = child_cancel_token.cancelled() => { | ||||||
|  |           debug!("cert reloader service terminated"); | ||||||
|  |           Ok(()) | ||||||
|         } |         } | ||||||
|       } else { |  | ||||||
|         cert_service.start().await.map_err(|e| anyhow!(e)) |  | ||||||
|       } |       } | ||||||
|     }); |     }); | ||||||
| 
 | 
 | ||||||
|  | @ -221,9 +218,7 @@ impl RpxyService { | ||||||
|         if let Err(ref e) = acme_res { |         if let Err(ref e) = acme_res { | ||||||
|           error!("acme manager exited on error: {e}"); |           error!("acme manager exited on error: {e}"); | ||||||
|         } |         } | ||||||
|         if let Some(cancel_token) = cancel_token_clone { |         cancel_token_clone.cancel(); | ||||||
|           cancel_token.cancel(); |  | ||||||
|         } |  | ||||||
|         acme_res.map_err(|e| anyhow!(e)) |         acme_res.map_err(|e| anyhow!(e)) | ||||||
|       }); |       }); | ||||||
|       let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle); |       let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle); | ||||||
|  | @ -245,7 +240,8 @@ async fn rpxy_service_without_watcher( | ||||||
|   info!("Start rpxy service"); |   info!("Start rpxy service"); | ||||||
|   let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?; |   let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?; | ||||||
|   let service = RpxyService::new(&config_toml, runtime_handle).await?; |   let service = RpxyService::new(&config_toml, runtime_handle).await?; | ||||||
|   service.start(None).await |   // Create cancel token that is never be called as dummy
 | ||||||
|  |   service.start(tokio_util::sync::CancellationToken::new()).await | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn rpxy_service_with_watcher( | async fn rpxy_service_with_watcher( | ||||||
|  | @ -268,7 +264,7 @@ async fn rpxy_service_with_watcher( | ||||||
| 
 | 
 | ||||||
|     tokio::select! { |     tokio::select! { | ||||||
|       /* ---------- */ |       /* ---------- */ | ||||||
|       rpxy_res = service.start(Some(cancel_token.clone())) => { |       rpxy_res = service.start(cancel_token.clone()) => { | ||||||
|         if let Err(ref e) = rpxy_res { |         if let Err(ref e) = rpxy_res { | ||||||
|           error!("rpxy service exited on error: {e}"); |           error!("rpxy service exited on error: {e}"); | ||||||
|         } else { |         } else { | ||||||
|  |  | ||||||
|  | @ -2,7 +2,6 @@ use crate::{constants::*, count::RequestCount}; | ||||||
| use hot_reload::ReloaderReceiver; | use hot_reload::ReloaderReceiver; | ||||||
| use rpxy_certs::ServerCryptoBase; | use rpxy_certs::ServerCryptoBase; | ||||||
| use std::{net::SocketAddr, time::Duration}; | use std::{net::SocketAddr, time::Duration}; | ||||||
| use tokio_util::sync::CancellationToken; |  | ||||||
| 
 | 
 | ||||||
| /// Global object containing proxy configurations and shared object like counters.
 | /// Global object containing proxy configurations and shared object like counters.
 | ||||||
| /// But note that in Globals, we do not have Mutex and RwLock. It is indeed, the context shared among async tasks.
 | /// But note that in Globals, we do not have Mutex and RwLock. It is indeed, the context shared among async tasks.
 | ||||||
|  | @ -13,8 +12,6 @@ pub struct Globals { | ||||||
|   pub request_count: RequestCount, |   pub request_count: RequestCount, | ||||||
|   /// Shared context - Async task runtime handler
 |   /// Shared context - Async task runtime handler
 | ||||||
|   pub runtime_handle: tokio::runtime::Handle, |   pub runtime_handle: tokio::runtime::Handle, | ||||||
|   /// Shared context - Notify object to stop async tasks
 |  | ||||||
|   pub cancel_token: Option<CancellationToken>, |  | ||||||
|   /// Shared context - Certificate reloader service receiver // TODO: newer one
 |   /// Shared context - Certificate reloader service receiver // TODO: newer one
 | ||||||
|   pub cert_reloader_rx: Option<ReloaderReceiver<ServerCryptoBase>>, |   pub cert_reloader_rx: Option<ReloaderReceiver<ServerCryptoBase>>, | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -43,8 +43,6 @@ pub struct RpxyOptions { | ||||||
|   pub cert_rx: Option<ReloaderReceiver<ServerCryptoBase>>, // TODO:
 |   pub cert_rx: Option<ReloaderReceiver<ServerCryptoBase>>, // TODO:
 | ||||||
|   /// Async task runtime handler
 |   /// Async task runtime handler
 | ||||||
|   pub runtime_handle: tokio::runtime::Handle, |   pub runtime_handle: tokio::runtime::Handle, | ||||||
|   /// Notify object to stop async tasks
 |  | ||||||
|   pub cancel_token: Option<CancellationToken>, |  | ||||||
| 
 | 
 | ||||||
|   #[cfg(feature = "acme")] |   #[cfg(feature = "acme")] | ||||||
|   /// ServerConfig used for only ACME challenge for ACME domains
 |   /// ServerConfig used for only ACME challenge for ACME domains
 | ||||||
|  | @ -58,10 +56,10 @@ pub async fn entrypoint( | ||||||
|     app_config_list, |     app_config_list, | ||||||
|     cert_rx, // TODO:
 |     cert_rx, // TODO:
 | ||||||
|     runtime_handle, |     runtime_handle, | ||||||
|     cancel_token, |  | ||||||
|     #[cfg(feature = "acme")] |     #[cfg(feature = "acme")] | ||||||
|     server_configs_acme_challenge, |     server_configs_acme_challenge, | ||||||
|   }: &RpxyOptions, |   }: &RpxyOptions, | ||||||
|  |   cancel_token: CancellationToken, | ||||||
| ) -> RpxyResult<()> { | ) -> RpxyResult<()> { | ||||||
|   #[cfg(all(feature = "http3-quinn", feature = "http3-s2n"))] |   #[cfg(all(feature = "http3-quinn", feature = "http3-s2n"))] | ||||||
|   warn!("Both \"http3-quinn\" and \"http3-s2n\" features are enabled. \"http3-quinn\" will be used"); |   warn!("Both \"http3-quinn\" and \"http3-s2n\" features are enabled. \"http3-quinn\" will be used"); | ||||||
|  | @ -117,7 +115,6 @@ pub async fn entrypoint( | ||||||
|     proxy_config: proxy_config.clone(), |     proxy_config: proxy_config.clone(), | ||||||
|     request_count: Default::default(), |     request_count: Default::default(), | ||||||
|     runtime_handle: runtime_handle.clone(), |     runtime_handle: runtime_handle.clone(), | ||||||
|     cancel_token: cancel_token.clone(), |  | ||||||
|     cert_reloader_rx: cert_rx.clone(), |     cert_reloader_rx: cert_rx.clone(), | ||||||
| 
 | 
 | ||||||
|     #[cfg(feature = "acme")] |     #[cfg(feature = "acme")] | ||||||
|  | @ -153,25 +150,21 @@ pub async fn entrypoint( | ||||||
|       message_handler: message_handler.clone(), |       message_handler: message_handler.clone(), | ||||||
|     }; |     }; | ||||||
| 
 | 
 | ||||||
|     let cancel_token = globals.cancel_token.as_ref().map(|t| t.child_token()); |     let cancel_token = cancel_token.clone(); | ||||||
|     let parent_cancel_token_clone = globals.cancel_token.clone(); |  | ||||||
|     globals.runtime_handle.spawn(async move { |     globals.runtime_handle.spawn(async move { | ||||||
|       info!("rpxy proxy service for {listening_on} started"); |       info!("rpxy proxy service for {listening_on} started"); | ||||||
|       if let Some(cancel_token) = cancel_token { | 
 | ||||||
|         tokio::select! { |       tokio::select! { | ||||||
|           _ = cancel_token.cancelled() => { |         _ = cancel_token.cancelled() => { | ||||||
|             debug!("rpxy proxy service for {listening_on} terminated"); |           debug!("rpxy proxy service for {listening_on} terminated"); | ||||||
|             Ok(()) |           Ok(()) | ||||||
|           }, |         }, | ||||||
|           proxy_res = proxy.start() => { |         proxy_res = proxy.start(cancel_token.child_token()) => { | ||||||
|             info!("rpxy proxy service for {listening_on} exited"); |           info!("rpxy proxy service for {listening_on} exited"); | ||||||
|             // cancel other proxy tasks
 |           // cancel other proxy tasks
 | ||||||
|             parent_cancel_token_clone.unwrap().cancel(); |           cancel_token.cancel(); | ||||||
|             proxy_res |           proxy_res | ||||||
|           } |  | ||||||
|         } |         } | ||||||
|       } else { |  | ||||||
|         proxy.start().await |  | ||||||
|       } |       } | ||||||
|     }) |     }) | ||||||
|   }); |   }); | ||||||
|  |  | ||||||
|  | @ -22,6 +22,7 @@ use hyper_util::{client::legacy::connect::Connect, rt::TokioIo, server::conn::au | ||||||
| use rpxy_certs::ServerCrypto; | use rpxy_certs::ServerCrypto; | ||||||
| use std::{net::SocketAddr, sync::Arc, time::Duration}; | use std::{net::SocketAddr, sync::Arc, time::Duration}; | ||||||
| use tokio::time::timeout; | use tokio::time::timeout; | ||||||
|  | use tokio_util::sync::CancellationToken; | ||||||
| 
 | 
 | ||||||
| /// Wrapper function to handle request for HTTP/1.1 and HTTP/2
 | /// Wrapper function to handle request for HTTP/1.1 and HTTP/2
 | ||||||
| /// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler
 | /// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler
 | ||||||
|  | @ -129,7 +130,7 @@ where | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   /// Start with TLS (HTTPS)
 |   /// Start with TLS (HTTPS)
 | ||||||
|   pub(super) async fn start_with_tls(&self) -> RpxyResult<()> { |   pub(super) async fn start_with_tls(&self, cancel_token: CancellationToken) -> RpxyResult<()> { | ||||||
|     #[cfg(not(any(feature = "http3-quinn", feature = "http3-s2n")))] |     #[cfg(not(any(feature = "http3-quinn", feature = "http3-s2n")))] | ||||||
|     { |     { | ||||||
|       self.tls_listener_service().await?; |       self.tls_listener_service().await?; | ||||||
|  | @ -139,14 +140,37 @@ where | ||||||
|     #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] |     #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] | ||||||
|     { |     { | ||||||
|       if self.globals.proxy_config.http3 { |       if self.globals.proxy_config.http3 { | ||||||
|         select! { |         let jh_tls = self.globals.runtime_handle.spawn({ | ||||||
|           _ = self.tls_listener_service().fuse() => { |           let self_clone = self.clone(); | ||||||
|             error!("TCP proxy service for TLS exited"); |           let cancel_token = cancel_token.clone(); | ||||||
|           }, |           async move { | ||||||
|           _ = self.h3_listener_service().fuse() => { |             select! { | ||||||
|             error!("UDP proxy service for QUIC exited"); |               _ = self_clone.tls_listener_service().fuse() => { | ||||||
|  |                 error!("TCP proxy service for TLS exited"); | ||||||
|  |                 cancel_token.cancel(); | ||||||
|  |               }, | ||||||
|  |               _ = cancel_token.cancelled().fuse() => { | ||||||
|  |                 debug!("Cancel token is called for TLS listener"); | ||||||
|  |               } | ||||||
|  |             } | ||||||
|           } |           } | ||||||
|         }; |         }); | ||||||
|  |         let jh_h3 = self.globals.runtime_handle.spawn({ | ||||||
|  |           let self_clone = self.clone(); | ||||||
|  |           async move { | ||||||
|  |             select! { | ||||||
|  |               _ = self_clone.h3_listener_service().fuse() => { | ||||||
|  |                 error!("UDP proxy service for QUIC exited"); | ||||||
|  |                 cancel_token.cancel(); | ||||||
|  |               }, | ||||||
|  |               _ = cancel_token.cancelled().fuse() => { | ||||||
|  |                 debug!("Cancel token is called for QUIC listener"); | ||||||
|  |               } | ||||||
|  |             } | ||||||
|  |           } | ||||||
|  |         }); | ||||||
|  |         let _ = futures::future::join(jh_tls, jh_h3).await; | ||||||
|  | 
 | ||||||
|         Ok(()) |         Ok(()) | ||||||
|       } else { |       } else { | ||||||
|         self.tls_listener_service().await?; |         self.tls_listener_service().await?; | ||||||
|  | @ -303,10 +327,10 @@ where | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   /// Entrypoint for HTTP/1.1, 2 and 3 servers
 |   /// Entrypoint for HTTP/1.1, 2 and 3 servers
 | ||||||
|   pub async fn start(&self) -> RpxyResult<()> { |   pub async fn start(&self, cancel_token: CancellationToken) -> RpxyResult<()> { | ||||||
|     let proxy_service = async { |     let proxy_service = async { | ||||||
|       if self.tls_enabled { |       if self.tls_enabled { | ||||||
|         self.start_with_tls().await |         self.start_with_tls(cancel_token).await | ||||||
|       } else { |       } else { | ||||||
|         self.start_without_tls().await |         self.start_without_tls().await | ||||||
|       } |       } | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Jun Kurihara
				Jun Kurihara