From 58e22d33afe04115e1594915aca8c80ac8bf01f5 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Sun, 23 Jul 2023 01:42:39 +0900 Subject: [PATCH] feat: hot-reloading of config file --- config-example.toml | 3 ++ rpxy-bin/Cargo.toml | 2 +- rpxy-bin/src/config/mod.rs | 7 ++- rpxy-bin/src/config/parse.rs | 25 ++++++----- rpxy-bin/src/config/service.rs | 24 ++++++++++ rpxy-bin/src/config/toml.rs | 22 +++++---- rpxy-bin/src/constants.rs | 1 + rpxy-bin/src/main.rs | 76 +++++++++++++++++++++++++++----- rpxy-lib/Cargo.toml | 4 +- rpxy-lib/src/constants.rs | 1 + rpxy-lib/src/globals.rs | 14 +++--- rpxy-lib/src/lib.rs | 16 +++---- rpxy-lib/src/proxy/mod.rs | 1 + rpxy-lib/src/proxy/proxy_main.rs | 7 +-- rpxy-lib/src/proxy/proxy_tls.rs | 27 ++++++++---- rpxy-lib/src/proxy/socket.rs | 41 +++++++++++++++++ 16 files changed, 213 insertions(+), 58 deletions(-) create mode 100644 rpxy-bin/src/config/service.rs create mode 100644 rpxy-lib/src/proxy/socket.rs diff --git a/config-example.toml b/config-example.toml index 0382393..605067c 100644 --- a/config-example.toml +++ b/config-example.toml @@ -10,6 +10,9 @@ listen_port = 8080 listen_port_tls = 8443 +# Optional for h2 and http1.1 +tcp_listen_backlog = 1024 + # Optional for h2 and http1.1 max_concurrent_streams = 100 diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index 90094c4..fb6d4aa 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -35,7 +35,7 @@ rustls-pemfile = "1.0.3" # config clap = { version = "4.3.17", features = ["std", "cargo", "wrap_help"] } toml = { version = "0.7.6", default-features = false, features = ["parse"] } -# hot_reload = "0.1.2" +hot_reload = "0.1.4" # logging tracing = { version = "0.1.37" } diff --git a/rpxy-bin/src/config/mod.rs b/rpxy-bin/src/config/mod.rs index a71ca6e..09ec2b9 100644 --- a/rpxy-bin/src/config/mod.rs +++ b/rpxy-bin/src/config/mod.rs @@ -1,4 +1,9 @@ mod parse; +mod service; mod toml; -pub use parse::build_settings; +pub use { + self::toml::ConfigToml, + parse::{build_settings, parse_opts}, + service::ConfigTomlReloader, +}; diff --git a/rpxy-bin/src/config/parse.rs b/rpxy-bin/src/config/parse.rs index 09b24b3..56c6f2c 100644 --- a/rpxy-bin/src/config/parse.rs +++ b/rpxy-bin/src/config/parse.rs @@ -7,28 +7,30 @@ use crate::{ use clap::Arg; use rpxy_lib::{AppConfig, AppConfigList, ProxyConfig}; -pub fn build_settings() -> std::result::Result<(ProxyConfig, AppConfigList), anyhow::Error> { +pub fn parse_opts() -> Result { let _ = include_str!("../../Cargo.toml"); let options = clap::command!().arg( Arg::new("config_file") .long("config") .short('c') .value_name("FILE") - .help("Configuration file path like \"./config.toml\""), + .required(true) + .help("Configuration file path like ./config.toml"), ); let matches = options.get_matches(); /////////////////////////////////// - let config = if let Some(config_file_path) = matches.get_one::("config_file") { - ConfigToml::new(config_file_path)? - } else { - // Default config Toml - ConfigToml::default() - }; + let config_file_path = matches.get_one::("config_file").unwrap(); + Ok(config_file_path.to_string()) +} + +pub fn build_settings( + config: &ConfigToml, +) -> std::result::Result<(ProxyConfig, AppConfigList), anyhow::Error> { /////////////////////////////////// // build proxy config - let proxy_config: ProxyConfig = (&config).try_into()?; + let proxy_config: ProxyConfig = config.try_into()?; // For loggings if proxy_config.listen_sockets.iter().any(|addr| addr.is_ipv6()) { info!("Listen both IPv4 and IPv6") @@ -50,7 +52,7 @@ pub fn build_settings() -> std::result::Result<(ProxyConfig, AppConfigList std::result::Result<(ProxyConfig, AppConfigList for ConfigTomlReloader { + type Source = String; + async fn new(source: &Self::Source) -> Result> { + Ok(Self { + config_path: source.clone(), + }) + } + + async fn reload(&self) -> Result, ReloaderError> { + let conf = ConfigToml::new(&self.config_path) + .map_err(|_e| ReloaderError::::Reload("Failed to reload config toml"))?; + Ok(Some(conf)) + } +} diff --git a/rpxy-bin/src/config/toml.rs b/rpxy-bin/src/config/toml.rs index 6868d21..84260c0 100644 --- a/rpxy-bin/src/config/toml.rs +++ b/rpxy-bin/src/config/toml.rs @@ -8,11 +8,12 @@ use rustc_hash::FxHashMap as HashMap; use serde::Deserialize; use std::{fs, net::SocketAddr}; -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct ConfigToml { pub listen_port: Option, pub listen_port_tls: Option, pub listen_ipv6: Option, + pub tcp_listen_backlog: Option, pub max_concurrent_streams: Option, pub max_clients: Option, pub apps: Option, @@ -21,7 +22,7 @@ pub struct ConfigToml { } #[cfg(feature = "http3")] -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct Http3Option { pub alt_svc_max_age: Option, pub request_max_body_size: Option, @@ -31,24 +32,24 @@ pub struct Http3Option { pub max_idle_timeout: Option, } -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct Experimental { #[cfg(feature = "http3")] pub h3: Option, pub ignore_sni_consistency: Option, } -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct Apps(pub HashMap); -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct Application { pub server_name: Option, pub reverse_proxy: Option>, pub tls: Option, } -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct TlsOption { pub tls_cert_path: Option, pub tls_cert_key_path: Option, @@ -56,7 +57,7 @@ pub struct TlsOption { pub client_ca_cert_path: Option, } -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct ReverseProxyOption { pub path: Option, pub replace_path: Option, @@ -65,7 +66,7 @@ pub struct ReverseProxyOption { pub load_balance: Option, } -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct UpstreamParams { pub location: String, pub tls: Option, @@ -112,6 +113,11 @@ impl TryInto for &ConfigToml { }) .collect(); + // tcp backlog + if let Some(backlog) = self.tcp_listen_backlog { + proxy_config.tcp_listen_backlog = backlog; + } + // max values if let Some(c) = self.max_clients { proxy_config.max_clients = c as usize; diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs index 4181a26..323615f 100644 --- a/rpxy-bin/src/constants.rs +++ b/rpxy-bin/src/constants.rs @@ -1,2 +1,3 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; +pub const CONFIG_WATCH_DELAY_SECS: u32 = 20; diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index ed0cba5..8466372 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -11,7 +11,12 @@ mod constants; mod error; mod log; -use crate::{config::build_settings, log::*}; +use crate::{ + config::{build_settings, parse_opts, ConfigToml, ConfigTomlReloader}, + constants::CONFIG_WATCH_DELAY_SECS, + log::*, +}; +use hot_reload::{ReloaderReceiver, ReloaderService}; use rpxy_lib::entrypoint; fn main() { @@ -23,17 +28,68 @@ fn main() { let runtime = runtime_builder.build().unwrap(); runtime.block_on(async { - let (proxy_conf, app_conf) = match build_settings() { - Ok(g) => g, - Err(e) => { - error!("Invalid configuration: {}", e); + // Initially load config + let Ok(config_path) = parse_opts() else { + error!("Invalid toml file"); std::process::exit(1); - } }; + let (config_service, config_rx) = + ReloaderService::::new(&config_path, CONFIG_WATCH_DELAY_SECS, false) + .await + .unwrap(); - entrypoint(proxy_conf, app_conf, runtime.handle().clone()) - .await - .unwrap() + tokio::select! { + _ = config_service.start() => { + error!("config reloader service exited"); + } + _ = rpxy_service(config_rx, runtime.handle().clone()) => { + error!("rpxy service existed"); + } + } }); - warn!("rpxy exited!"); +} + +async fn rpxy_service( + mut config_rx: ReloaderReceiver, + runtime_handle: tokio::runtime::Handle, +) -> Result<(), anyhow::Error> { + // Initial loading + config_rx.changed().await?; + let config_toml = config_rx.borrow().clone().unwrap(); + let (mut proxy_conf, mut app_conf) = match build_settings(&config_toml) { + Ok(v) => v, + Err(e) => { + error!("Invalid configuration: {e}"); + return Err(anyhow::anyhow!(e)); + } + }; + + // Continuous monitoring + loop { + tokio::select! { + _ = entrypoint(&proxy_conf, &app_conf, &runtime_handle) => { + error!("rpxy entrypoint exited"); + break; + } + _ = config_rx.changed() => { + if config_rx.borrow().is_none() { + error!("Something wrong in config reloader receiver"); + break; + } + let config_toml = config_rx.borrow().clone().unwrap(); + match build_settings(&config_toml) { + Ok((p, a)) => { + (proxy_conf, app_conf) = (p, a) + }, + Err(e) => { + error!("Invalid configuration. Configuration does not updated: {e}"); + continue; + } + }; + info!("Configuration updated. Force to re-bind TCP/UDP sockets"); + } + else => break + } + } + Ok(()) } diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index 48cb437..e36bae6 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -30,7 +30,7 @@ tokio = { version = "1.29.1", default-features = false, features = [ "macros", ] } async-trait = "0.1.72" -hot_reload = "0.1.2" # reloading certs +hot_reload = "0.1.4" # reloading certs # Error handling anyhow = "1.0.72" @@ -63,6 +63,8 @@ quinn = { path = "../quinn/quinn", optional = true } # Tentative to support rust h3 = { path = "../h3/h3/", optional = true } # h3-quinn = { path = "./h3/h3-quinn/", optional = true } h3-quinn = { path = "../h3-quinn/", optional = true } # Tentative to support rustls-0.21 +# for UDP socket wit SO_REUSEADDR +socket2 = { version = "0.5.3", features = ["all"] } # cookie handling for sticky cookie chrono = { version = "0.4.26", default-features = false, features = [ diff --git a/rpxy-lib/src/constants.rs b/rpxy-lib/src/constants.rs index 72cce78..9d7fb5e 100644 --- a/rpxy-lib/src/constants.rs +++ b/rpxy-lib/src/constants.rs @@ -1,5 +1,6 @@ // pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; // pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; +pub const TCP_LISTEN_BACKLOG: u32 = 1024; // pub const HTTP_LISTEN_PORT: u16 = 8080; // pub const HTTPS_LISTEN_PORT: u16 = 8443; pub const PROXY_TIMEOUT_SEC: u64 = 60; diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index 6614b5c..44808dd 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -36,11 +36,12 @@ where } /// Configuration parameters for proxy transport and request handlers -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Clone)] pub struct ProxyConfig { pub listen_sockets: Vec, // when instantiate server pub http_port: Option, // when instantiate server pub https_port: Option, // when instantiate server + pub tcp_listen_backlog: u32, // when instantiate server pub proxy_timeout: Duration, // when serving requests at Proxy pub upstream_timeout: Duration, // when serving requests at Handler @@ -74,6 +75,7 @@ impl Default for ProxyConfig { listen_sockets: Vec::new(), http_port: None, https_port: None, + tcp_listen_backlog: TCP_LISTEN_BACKLOG, // TODO: Reconsider each timeout values proxy_timeout: Duration::from_secs(PROXY_TIMEOUT_SEC), @@ -104,7 +106,7 @@ impl Default for ProxyConfig { } /// Configuration parameters for backend applications -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Clone)] pub struct AppConfigList where T: CryptoSource, @@ -152,7 +154,7 @@ where } /// Configuration parameters for single backend application -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Clone)] pub struct AppConfig where T: CryptoSource, @@ -234,7 +236,7 @@ where } /// Configuration parameters for single reverse proxy corresponding to the path -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Clone)] pub struct ReverseProxyConfig { pub path: Option, pub replace_path: Option, @@ -244,7 +246,7 @@ pub struct ReverseProxyConfig { } /// Configuration parameters for single upstream destination from a reverse proxy -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Clone)] pub struct UpstreamUri { pub inner: hyper::Uri, } @@ -259,7 +261,7 @@ impl TryInto for &UpstreamUri { } /// Configuration parameters on TLS for a single backend application -#[derive(PartialEq, Eq)] +#[derive(PartialEq, Eq, Clone)] pub struct TlsConfig where T: CryptoSource, diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index 7820db6..4fd64ce 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -25,19 +25,19 @@ pub mod reexports { /// Entrypoint that creates and spawns tasks of reverse proxy services pub async fn entrypoint( - proxy_config: ProxyConfig, - app_config_list: AppConfigList, - runtime_handle: tokio::runtime::Handle, + proxy_config: &ProxyConfig, + app_config_list: &AppConfigList, + runtime_handle: &tokio::runtime::Handle, ) -> Result<()> where T: CryptoSource + Clone + Send + Sync + 'static, { // build global let globals = Arc::new(Globals { - proxy_config, - backends: app_config_list.try_into()?, + proxy_config: proxy_config.clone(), + backends: app_config_list.clone().try_into()?, request_count: Default::default(), - runtime_handle, + runtime_handle: runtime_handle.clone(), }); // let connector = TrustDnsResolver::default().into_rustls_webpki_https_connector(); let connector = hyper_rustls::HttpsConnectorBuilder::new() @@ -71,8 +71,8 @@ where })); // wait for all future - if let (Ok(_), _, _) = futures.await { - error!("Some proxy services are down"); + if let (Ok(Err(e)), _, _) = futures.await { + error!("Some proxy services are down: {:?}", e); }; Ok(()) diff --git a/rpxy-lib/src/proxy/mod.rs b/rpxy-lib/src/proxy/mod.rs index 73a4002..749239c 100644 --- a/rpxy-lib/src/proxy/mod.rs +++ b/rpxy-lib/src/proxy/mod.rs @@ -4,5 +4,6 @@ mod proxy_client_cert; mod proxy_h3; mod proxy_main; mod proxy_tls; +mod socket; pub use proxy_main::{Proxy, ProxyBuilder, ProxyBuilderError}; diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index e5a02a5..166f048 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -1,4 +1,4 @@ -// use super::proxy_handler::handle_request; +use super::socket::bind_tcp_socket; use crate::{ certs::CryptoSource, error::*, globals::Globals, handler::HttpMessageHandler, log::*, utils::ServerNameBytesExp, }; @@ -7,7 +7,6 @@ use hyper::{client::connect::Connect, server::conn::Http, service::service_fn, B use std::{net::SocketAddr, sync::Arc}; use tokio::{ io::{AsyncRead, AsyncWrite}, - net::TcpListener, runtime::Handle, time::{timeout, Duration}, }; @@ -94,7 +93,9 @@ where async fn start_without_tls(self, server: Http) -> Result<()> { let listener_service = async { - let tcp_listener = TcpListener::bind(&self.listening_on).await?; + let tcp_socket = bind_tcp_socket(&self.listening_on)?; + let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?; + // let tcp_listener = TcpListener::bind(&self.listening_on).await?; info!("Start TCP proxy serving with HTTP request for configured host names"); while let Ok((stream, _client_addr)) = tcp_listener.accept().await { self.clone().client_serve(stream, server.clone(), _client_addr, None); diff --git a/rpxy-lib/src/proxy/proxy_tls.rs b/rpxy-lib/src/proxy/proxy_tls.rs index 6e0a6b6..5512eff 100644 --- a/rpxy-lib/src/proxy/proxy_tls.rs +++ b/rpxy-lib/src/proxy/proxy_tls.rs @@ -1,6 +1,9 @@ +#[cfg(feature = "http3")] +use super::socket::bind_udp_socket; use super::{ crypto_service::{CryptoReloader, ServerCrypto, ServerCryptoBase, SniServerCryptoMap}, proxy_main::{LocalExecutor, Proxy}, + socket::bind_tcp_socket, }; use crate::{certs::CryptoSource, constants::*, error::*, log::*, utils::BytesName}; use hot_reload::{ReloaderReceiver, ReloaderService}; @@ -10,10 +13,7 @@ use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerC #[cfg(feature = "http3")] use rustls::ServerConfig; use std::sync::Arc; -use tokio::{ - net::TcpListener, - time::{timeout, Duration}, -}; +use tokio::time::{timeout, Duration}; impl Proxy where @@ -26,7 +26,8 @@ where server: Http, mut server_crypto_rx: ReloaderReceiver, ) -> Result<()> { - let tcp_listener = TcpListener::bind(&self.listening_on).await?; + let tcp_socket = bind_tcp_socket(&self.listening_on)?; + let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?; info!("Start TCP proxy serving with HTTPS request for configured host names"); let mut server_crypto_map: Option> = None; @@ -130,7 +131,17 @@ where let mut server_config_h3 = QuicServerConfig::with_crypto(Arc::new(rustls_server_config)); server_config_h3.transport = Arc::new(transport_config_quic); server_config_h3.concurrent_connections(self.globals.proxy_config.h3_max_concurrent_connections); - let endpoint = Endpoint::server(server_config_h3, self.listening_on)?; + + // To reuse address + let udp_socket = bind_udp_socket(&self.listening_on)?; + let runtime = quinn::default_runtime() + .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "No async runtime found"))?; + let endpoint = Endpoint::new( + quinn::EndpointConfig::default(), + Some(server_config_h3), + udp_socket, + runtime, + )?; let mut server_crypto: Option> = None; loop { @@ -199,10 +210,10 @@ where #[cfg(not(feature = "http3"))] { tokio::select! { - _= self.cert_service(tx) => { + _= cert_reloader_service.start() => { error!("Cert service for TLS exited"); }, - _ = self.listener_service(server, rx) => { + _ = self.listener_service(server, cert_reloader_rx) => { error!("TCP proxy service for TLS exited"); }, else => { diff --git a/rpxy-lib/src/proxy/socket.rs b/rpxy-lib/src/proxy/socket.rs new file mode 100644 index 0000000..8858732 --- /dev/null +++ b/rpxy-lib/src/proxy/socket.rs @@ -0,0 +1,41 @@ +use crate::{error::*, log::*}; +#[cfg(feature = "http3")] +use socket2::{Domain, Protocol, Socket, Type}; +use std::net::SocketAddr; +#[cfg(feature = "http3")] +use std::net::UdpSocket; +use tokio::net::TcpSocket; + +pub(super) fn bind_tcp_socket(listening_on: &SocketAddr) -> Result { + let tcp_socket = if listening_on.is_ipv6() { + TcpSocket::new_v6() + } else { + TcpSocket::new_v4() + }?; + tcp_socket.set_reuseaddr(true)?; + tcp_socket.set_reuseport(true)?; + if let Err(e) = tcp_socket.bind(*listening_on) { + error!("Failed to bind TCP socket: {}", e); + return Err(RpxyError::Io(e)); + }; + Ok(tcp_socket) +} + +#[cfg(feature = "http3")] +pub(super) fn bind_udp_socket(listening_on: &SocketAddr) -> Result { + let socket = if listening_on.is_ipv6() { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + }?; + socket.set_reuse_address(true)?; + socket.set_reuse_port(true)?; + + if let Err(e) = socket.bind(&(*listening_on).into()) { + error!("Failed to bind UDP socket: {}", e); + return Err(RpxyError::Io(e)); + }; + let udp_socket: UdpSocket = socket.into(); + + Ok(udp_socket) +}