From 3d60175c11a5c4479a6f9e6f78b0449c1ef35ffd Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Wed, 9 Aug 2023 11:31:38 +0900 Subject: [PATCH 1/2] fix: fix message handler (changed this inside Arc) --- TODO.md | 1 + rpxy-bin/Cargo.toml | 2 +- rpxy-lib/src/handler/handler_main.rs | 2 +- rpxy-lib/src/lib.rs | 10 ++++++---- rpxy-lib/src/proxy/proxy_h3.rs | 4 ++-- rpxy-lib/src/proxy/proxy_main.rs | 23 ++++++++++++++++++++--- rpxy-lib/src/proxy/socket.rs | 2 +- 7 files changed, 32 insertions(+), 12 deletions(-) diff --git a/TODO.md b/TODO.md index abd51de..ad4f6b2 100644 --- a/TODO.md +++ b/TODO.md @@ -2,6 +2,7 @@ - [Done in 0.6.0] ~~Fix strategy for `h2c` requests on forwarded requests upstream. This needs to update forwarder definition. Also, maybe forwarder would have a cache corresponding to the following task.~~ - [Try in v0.6.0] **Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55))** +- Fix dynamic reloading of configuration file - Improvement of path matcher - More flexible option for rewriting path - Refactoring diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index e11df63..65fe24d 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -46,7 +46,7 @@ tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } [target.'cfg(not(target_env = "msvc"))'.dependencies] -tikv-jemallocator = "0.5.0" +tikv-jemallocator = "0.5.4" [dev-dependencies] diff --git a/rpxy-lib/src/handler/handler_main.rs b/rpxy-lib/src/handler/handler_main.rs index 98d563b..fb83d69 100644 --- a/rpxy-lib/src/handler/handler_main.rs +++ b/rpxy-lib/src/handler/handler_main.rs @@ -50,7 +50,7 @@ where /// Handle incoming request message from a client pub async fn handle_request( - self, + &self, mut req: Request, client_addr: SocketAddr, // アクセス制御用 listen_addr: SocketAddr, diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index c2b8f0e..5369cc2 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -69,10 +69,12 @@ where }); // TODO: HTTP2 only client is needed for http2 cleartext case - let msg_handler = HttpMessageHandlerBuilder::default() - .forwarder(Arc::new(Forwarder::new().await)) - .globals(globals.clone()) - .build()?; + let msg_handler = Arc::new( + HttpMessageHandlerBuilder::default() + .forwarder(Arc::new(Forwarder::new().await)) + .globals(globals.clone()) + .build()?, + ); let addresses = globals.proxy_config.listen_sockets.clone(); let futures = select_all(addresses.into_iter().map(|addr| { diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index 7773ad9..fd07521 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -15,7 +15,7 @@ where U: CryptoSource + Clone + Sync + Send + 'static, { pub(super) async fn connection_serve_h3( - self, + &self, quic_connection: C, tls_server_name: ServerNameBytesExp, client_addr: SocketAddr, @@ -79,7 +79,7 @@ where } async fn stream_serve_h3( - self, + &self, req: Request<()>, stream: RequestStream, client_addr: SocketAddr, diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 166f048..9bfcde9 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -40,7 +40,7 @@ where { pub listening_on: SocketAddr, pub tls_enabled: bool, // TCP待受がTLSかどうか - pub msg_handler: HttpMessageHandler, + pub msg_handler: Arc>, pub globals: Arc>, } @@ -49,6 +49,21 @@ where T: Connect + Clone + Sync + Send + 'static, U: CryptoSource + Clone + Sync + Send, { + /// Wrapper function to handle request + async fn serve( + handler: Arc>, + req: Request, + client_addr: SocketAddr, + listen_addr: SocketAddr, + tls_enabled: bool, + tls_server_name: Option, + ) -> Result> { + handler + .handle_request(req, client_addr, listen_addr, tls_enabled, tls_server_name) + .await + } + + /// Serves requests from clients pub(super) fn client_serve( self, stream: I, @@ -72,7 +87,8 @@ where .serve_connection( stream, service_fn(move |req: Request| { - self.msg_handler.clone().handle_request( + Self::serve( + self.msg_handler.clone(), req, peer_addr, self.listening_on, @@ -91,11 +107,11 @@ where }); } + /// Start without TLS (HTTP cleartext) async fn start_without_tls(self, server: Http) -> Result<()> { let listener_service = async { let tcp_socket = bind_tcp_socket(&self.listening_on)?; let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?; - // let tcp_listener = TcpListener::bind(&self.listening_on).await?; info!("Start TCP proxy serving with HTTP request for configured host names"); while let Ok((stream, _client_addr)) = tcp_listener.accept().await { self.clone().client_serve(stream, server.clone(), _client_addr, None); @@ -106,6 +122,7 @@ where Ok(()) } + /// Entrypoint for HTTP/1.1 and HTTP/2 servers pub async fn start(self) -> Result<()> { let mut server = Http::new(); server.http1_keep_alive(self.globals.proxy_config.keepalive); diff --git a/rpxy-lib/src/proxy/socket.rs b/rpxy-lib/src/proxy/socket.rs index a8b9f01..9e4c8f9 100644 --- a/rpxy-lib/src/proxy/socket.rs +++ b/rpxy-lib/src/proxy/socket.rs @@ -32,7 +32,7 @@ pub(super) fn bind_udp_socket(listening_on: &SocketAddr) -> Result { } else { Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) }?; - // socket.set_reuse_address(true)?; // This isn't necessary + socket.set_reuse_address(true)?; // This isn't necessary? socket.set_reuse_port(true)?; if let Err(e) = socket.bind(&(*listening_on).into()) { From 7c2205f275221aaaaa93400769c9b7a68b52b337 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Wed, 9 Aug 2023 14:07:40 +0900 Subject: [PATCH 2/2] fix: bug for dynamic reloading of config files --- CHANGELOG.md | 1 + TODO.md | 5 +++-- rpxy-bin/src/main.rs | 11 ++++++++--- rpxy-lib/src/lib.rs | 5 +++-- rpxy-lib/src/proxy/proxy_main.rs | 34 +++++++++++++++++++++++++++----- 5 files changed, 44 insertions(+), 12 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index df3a364..ff5cd18 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,7 @@ ### Bugfix - Fix: fix `server` in the response header (`rpxy_lib` -> `rpxy`) +- Fix: fix bug for hot-reloading configuration file (Add termination notification receiver in proxy services) ## 0.5.0 diff --git a/TODO.md b/TODO.md index ad4f6b2..8ec6d5d 100644 --- a/TODO.md +++ b/TODO.md @@ -1,8 +1,7 @@ # TODO List -- [Done in 0.6.0] ~~Fix strategy for `h2c` requests on forwarded requests upstream. This needs to update forwarder definition. Also, maybe forwarder would have a cache corresponding to the following task.~~ +- [Done in 0.6.0] But we need more sophistication on `Forwarder` struct. ~~Fix strategy for `h2c` requests on forwarded requests upstream. This needs to update forwarder definition. Also, maybe forwarder would have a cache corresponding to the following task.~~ - [Try in v0.6.0] **Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55))** -- Fix dynamic reloading of configuration file - Improvement of path matcher - More flexible option for rewriting path - Refactoring @@ -33,5 +32,7 @@ ~~Benchmark with other reverse proxy implementations like Sozu ([#58](https://github.com/junkurihara/rust-rpxy/issues/58)) Currently, Sozu can work only on `amd64` format due to its HTTP message parser limitation... Since the main developer have only `arm64` (Apple M1) laptops, so we should do that on VPS?~~ - Done in v0.4.0: ~~Split `rpxy` source codes into `rpxy-lib` and `rpxy-bin` to make the core part (reverse proxy) isolated from the misc part like toml file loader. This is in order to make the configuration-related part more flexible (related to [#33](https://github.com/junkurihara/rust-rpxy/issues/33))~~ +- Done in 0.6.0: + ~~Fix dynamic reloading of configuration file~~ - etc. diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index 861c3d5..474eff1 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -84,7 +84,7 @@ async fn rpxy_service_without_watcher( return Err(anyhow::anyhow!(e)); } }; - entrypoint(&proxy_conf, &app_conf, &runtime_handle) + entrypoint(&proxy_conf, &app_conf, &runtime_handle, None) .await .map_err(|e| anyhow::anyhow!(e)) } @@ -105,10 +105,13 @@ async fn rpxy_service_with_watcher( } }; + // Notifier for proxy service termination + let term_notify = std::sync::Arc::new(tokio::sync::Notify::new()); + // Continuous monitoring loop { tokio::select! { - _ = entrypoint(&proxy_conf, &app_conf, &runtime_handle) => { + _ = entrypoint(&proxy_conf, &app_conf, &runtime_handle, Some(term_notify.clone())) => { error!("rpxy entrypoint exited"); break; } @@ -127,7 +130,9 @@ async fn rpxy_service_with_watcher( continue; } }; - info!("Configuration updated. Force to re-bind TCP/UDP sockets"); + info!("Configuration updated. Terminate all spawned proxy services and force to re-bind TCP/UDP sockets"); + term_notify.notify_waiters(); + // tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } else => break } diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index 5369cc2..b3af2a8 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -36,6 +36,7 @@ pub async fn entrypoint( proxy_config: &ProxyConfig, app_config_list: &AppConfigList, runtime_handle: &tokio::runtime::Handle, + term_notify: Option>, ) -> Result<()> where T: CryptoSource + Clone + Send + Sync + 'static, @@ -68,7 +69,7 @@ where runtime_handle: runtime_handle.clone(), }); - // TODO: HTTP2 only client is needed for http2 cleartext case + // build message handler including a request forwarder let msg_handler = Arc::new( HttpMessageHandlerBuilder::default() .forwarder(Arc::new(Forwarder::new().await)) @@ -91,7 +92,7 @@ where .build() .unwrap(); - globals.runtime_handle.spawn(proxy.start()) + globals.runtime_handle.spawn(proxy.start(term_notify.clone())) })); // wait for all future diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 9bfcde9..bd52ea9 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -8,6 +8,7 @@ use std::{net::SocketAddr, sync::Arc}; use tokio::{ io::{AsyncRead, AsyncWrite}, runtime::Handle, + sync::Notify, time::{timeout, Duration}, }; @@ -123,7 +124,7 @@ where } /// Entrypoint for HTTP/1.1 and HTTP/2 servers - pub async fn start(self) -> Result<()> { + pub async fn start(self, term_notify: Option>) -> Result<()> { let mut server = Http::new(); server.http1_keep_alive(self.globals.proxy_config.keepalive); server.http2_max_concurrent_streams(self.globals.proxy_config.max_concurrent_streams); @@ -131,12 +132,35 @@ where let executor = LocalExecutor::new(self.globals.runtime_handle.clone()); let server = server.with_executor(executor); - if self.tls_enabled { - self.start_with_tls(server).await?; - } else { - self.start_without_tls(server).await?; + let listening_on = self.listening_on; + + let proxy_service = async { + if self.tls_enabled { + self.start_with_tls(server).await + } else { + self.start_without_tls(server).await + } + }; + + match term_notify { + Some(term) => { + tokio::select! { + _ = proxy_service => { + warn!("Proxy service got down"); + } + _ = term.notified() => { + info!("Proxy service listening on {} receives term signal", listening_on); + } + } + } + None => { + proxy_service.await?; + warn!("Proxy service got down"); + } } + // proxy_service.await?; + Ok(()) } }