diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index 0d72e56..8cddc71 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -12,11 +12,11 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -# default = ["http3-s2n", "cache"] -# http3-quinn = ["rpxy-lib/http3-quinn"] -# http3-s2n = ["rpxy-lib/http3-s2n"] -# cache = ["rpxy-lib/cache"] -# native-roots = ["rpxy-lib/native-roots"] +default = ["http3-s2n", "cache"] +http3-quinn = ["rpxy-lib/http3-quinn"] +http3-s2n = ["rpxy-lib/http3-s2n"] +cache = ["rpxy-lib/cache"] +native-roots = ["rpxy-lib/native-roots"] [dependencies] rpxy-lib = { path = "../rpxy-lib/", default-features = false, features = [ diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index 3db2b42..cb6b5b6 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -12,19 +12,19 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -# default = ["http3-s2n", "sticky-cookie", "cache"] -# http3-quinn = ["quinn", "h3", "h3-quinn", "socket2"] -# http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"] -# sticky-cookie = ["base64", "sha2", "chrono"] -# cache = ["http-cache-semantics", "lru"] -# native-roots = ["hyper-rustls/native-tokio"] +default = ["http3-s2n", "sticky-cookie", "cache"] +http3-quinn = ["socket2"] #"quinn", "h3", "h3-quinn", ] +http3-s2n = [] #"h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"] +sticky-cookie = [] #"base64", "sha2", "chrono"] +cache = [] #"http-cache-semantics", "lru"] +native-roots = [] #"hyper-rustls/native-tokio"] [dependencies] # rand = "0.8.5" # rustc-hash = "1.1.0" # bytes = "1.5.0" # derive_builder = "0.12.0" -# futures = { version = "0.3.29", features = ["alloc", "async-await"] } +futures = { version = "0.3.29", features = ["alloc", "async-await"] } tokio = { version = "1.34.0", default-features = false, features = [ "net", "rt-multi-thread", @@ -44,7 +44,7 @@ thiserror = "1.0.50" http = "1.0.0" # http-body-util = "0.1.0" hyper = { version = "1.0.1", default-features = false } -# hyper-util = { version = "0.1.1", features = ["full"] } +hyper-util = { version = "0.1.1", features = ["full"] } # hyper-rustls = { version = "0.24.2", default-features = false, features = [ # "tokio-runtime", # "webpki-tokio", @@ -68,8 +68,8 @@ tracing = { version = "0.1.40" } # ], optional = true } # s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true } # s2n-quic-rustls = { version = "0.31.0", optional = true } -# # for UDP socket wit SO_REUSEADDR when h3 with quinn -# socket2 = { version = "0.5.5", features = ["all"], optional = true } +# for UDP socket wit SO_REUSEADDR when h3 with quinn +socket2 = { version = "0.5.5", features = ["all"], optional = true } # # cache # http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } diff --git a/rpxy-lib/src/count.rs b/rpxy-lib/src/count.rs new file mode 100644 index 0000000..2ca4028 --- /dev/null +++ b/rpxy-lib/src/count.rs @@ -0,0 +1,31 @@ +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + +#[derive(Debug, Clone, Default)] +/// Counter for serving requests +pub struct RequestCount(Arc); + +impl RequestCount { + pub fn current(&self) -> usize { + self.0.load(Ordering::Relaxed) + } + + pub fn increment(&self) -> usize { + self.0.fetch_add(1, Ordering::Relaxed) + } + + pub fn decrement(&self) -> usize { + let mut count; + while { + count = self.0.load(Ordering::Relaxed); + count > 0 + && self + .0 + .compare_exchange(count, count - 1, Ordering::Relaxed, Ordering::Relaxed) + != Ok(count) + } {} + count + } +} diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 34769dc..7662a9d 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -1,8 +1,11 @@ pub use anyhow::{anyhow, bail, ensure, Context}; use thiserror::Error; -pub type Result = std::result::Result; +pub type RpxyResult = std::result::Result; /// Describes things that can go wrong in the Rpxy #[derive(Debug, Error)] -pub enum RpxyError {} +pub enum RpxyError { + #[error("IO error: {0}")] + Io(#[from] std::io::Error), +} diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index 2db2805..88d6cbf 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -1,13 +1,30 @@ -use crate::{certs::CryptoSource, constants::*}; -use std::{net::SocketAddr, time::Duration}; +use crate::{certs::CryptoSource, constants::*, count::RequestCount}; +use std::{net::SocketAddr, sync::Arc, time::Duration}; + +/// Global object containing proxy configurations and shared object like counters. +/// But note that in Globals, we do not have Mutex and RwLock. It is indeed, the context shared among async tasks. +pub struct Globals { + /// Configuration parameters for proxy transport and request handlers + pub proxy_config: ProxyConfig, + /// Shared context - Counter for serving requests + pub request_count: RequestCount, + /// Shared context - Async task runtime handler + pub runtime_handle: tokio::runtime::Handle, + /// Shared context - Notify object to stop async tasks + pub term_notify: Option>, +} /// Configuration parameters for proxy transport and request handlers #[derive(PartialEq, Eq, Clone)] pub struct ProxyConfig { - pub listen_sockets: Vec, // when instantiate server - pub http_port: Option, // when instantiate server - pub https_port: Option, // when instantiate server - pub tcp_listen_backlog: u32, // when instantiate server + /// listen socket addresses + pub listen_sockets: Vec, + /// http port + pub http_port: Option, + /// https port + pub https_port: Option, + /// tcp listen backlog + pub tcp_listen_backlog: u32, pub proxy_timeout: Duration, // when serving requests at Proxy pub upstream_timeout: Duration, // when serving requests at Handler diff --git a/rpxy-lib/src/hyper_executor.rs b/rpxy-lib/src/hyper_executor.rs new file mode 100644 index 0000000..579251e --- /dev/null +++ b/rpxy-lib/src/hyper_executor.rs @@ -0,0 +1,23 @@ +use tokio::runtime::Handle; + +#[derive(Clone)] +/// Executor for hyper +pub struct LocalExecutor { + runtime_handle: Handle, +} + +impl LocalExecutor { + pub fn new(runtime_handle: Handle) -> Self { + LocalExecutor { runtime_handle } + } +} + +impl hyper::rt::Executor for LocalExecutor +where + F: std::future::Future + Send + 'static, + F::Output: Send, +{ + fn execute(&self, fut: F) { + self.runtime_handle.spawn(fut); + } +} diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index b201b52..e0a9d21 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -1,10 +1,14 @@ mod certs; mod constants; +mod count; mod error; mod globals; +mod hyper_executor; mod log; +mod proxy; -use crate::{error::*, log::*}; +use crate::{error::*, globals::Globals, log::*, proxy::Proxy}; +use futures::future::select_all; use std::sync::Arc; pub use crate::{ @@ -25,7 +29,7 @@ pub async fn entrypoint( app_config_list: &AppConfigList, runtime_handle: &tokio::runtime::Handle, term_notify: Option>, -) -> Result<()> +) -> RpxyResult<()> where T: CryptoSource + Clone + Send + Sync + 'static, { @@ -58,15 +62,18 @@ where info!("Cache is disabled") } - // // build global - // let globals = Arc::new(Globals { - // proxy_config: proxy_config.clone(), - // backends: app_config_list.clone().try_into()?, - // request_count: Default::default(), - // runtime_handle: runtime_handle.clone(), - // term_notify: term_notify.clone(), - // }); + // build global shared context + let globals = Arc::new(Globals { + proxy_config: proxy_config.clone(), + request_count: Default::default(), + runtime_handle: runtime_handle.clone(), + term_notify: term_notify.clone(), + }); + // TODO: 1. build backends, and make it contained in Arc + // app_config_list: app_config_list.clone(), + + // TODO: 2. build message handler with Arc-ed http_client and backends, and make it contained in Arc as well // // build message handler including a request forwarder // let msg_handler = Arc::new( // HttpMessageHandlerBuilder::default() @@ -75,31 +82,31 @@ where // .build()?, // ); - // let http_server = Arc::new(build_http_server(&globals)); + // TODO: 3. spawn each proxy for a given socket with copied Arc-ed message_handler. + // build hyper connection builder shared with proxy instances + let connection_builder = proxy::connection_builder(&globals); - // let addresses = globals.proxy_config.listen_sockets.clone(); - // let futures = select_all(addresses.into_iter().map(|addr| { - // let mut tls_enabled = false; - // if let Some(https_port) = globals.proxy_config.https_port { - // tls_enabled = https_port == addr.port() - // } + // spawn each proxy for a given socket with copied Arc-ed backend, message_handler and connection builder. + let addresses = globals.proxy_config.listen_sockets.clone(); + let futures_iter = addresses.into_iter().map(|listening_on| { + let mut tls_enabled = false; + if let Some(https_port) = globals.proxy_config.https_port { + tls_enabled = https_port == listening_on.port() + } + let proxy = Proxy { + globals: globals.clone(), + listening_on, + tls_enabled, + connection_builder: connection_builder.clone(), + // TODO: message_handler + }; + globals.runtime_handle.spawn(async move { proxy.start().await }) + }); - // let proxy = ProxyBuilder::default() - // .globals(globals.clone()) - // .listening_on(addr) - // .tls_enabled(tls_enabled) - // .http_server(http_server.clone()) - // .msg_handler(msg_handler.clone()) - // .build() - // .unwrap(); - - // globals.runtime_handle.spawn(async move { proxy.start().await }) - // })); - - // // wait for all future - // if let (Ok(Err(e)), _, _) = futures.await { - // error!("Some proxy services are down: {}", e); - // }; + // wait for all future + if let (Ok(Err(e)), _, _) = select_all(futures_iter).await { + error!("Some proxy services are down: {}", e); + }; Ok(()) } diff --git a/rpxy-lib/src/proxy/mod.rs b/rpxy-lib/src/proxy/mod.rs new file mode 100644 index 0000000..9b55e1d --- /dev/null +++ b/rpxy-lib/src/proxy/mod.rs @@ -0,0 +1,22 @@ +mod proxy_main; +mod socket; + +use crate::{globals::Globals, hyper_executor::LocalExecutor}; +use hyper_util::server::{self, conn::auto::Builder as ConnectionBuilder}; +use std::sync::Arc; + +pub(crate) use proxy_main::Proxy; + +/// build connection builder shared with proxy instances +pub(crate) fn connection_builder(globals: &Arc) -> Arc> { + let executor = LocalExecutor::new(globals.runtime_handle.clone()); + let mut http_server = server::conn::auto::Builder::new(executor); + http_server + .http1() + .keep_alive(globals.proxy_config.keepalive) + .pipeline_flush(true); + http_server + .http2() + .max_concurrent_streams(globals.proxy_config.max_concurrent_streams); + Arc::new(http_server) +} diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs new file mode 100644 index 0000000..416ecac --- /dev/null +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -0,0 +1,63 @@ +use super::socket::bind_tcp_socket; +use crate::{error::RpxyResult, globals::Globals, hyper_executor::LocalExecutor, log::*}; +use hyper_util::server::conn::auto::Builder as ConnectionBuilder; +use std::{net::SocketAddr, sync::Arc}; + +/// Proxy main object responsible to serve requests received from clients at the given socket address. +pub(crate) struct Proxy { + /// global context shared among async tasks + pub globals: Arc, + /// listen socket address + pub listening_on: SocketAddr, + /// whether TLS is enabled or not + pub tls_enabled: bool, + /// hyper connection builder serving http request + pub connection_builder: Arc>, +} + +impl Proxy { + /// Start without TLS (HTTP cleartext) + async fn start_without_tls(&self) -> RpxyResult<()> { + let listener_service = async { + let tcp_socket = bind_tcp_socket(&self.listening_on)?; + let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?; + info!("Start TCP proxy serving with HTTP request for configured host names"); + while let Ok((stream, client_addr)) = tcp_listener.accept().await { + // self.serve_connection(TokioIo::new(stream), client_addr, None); + } + Ok(()) as RpxyResult<()> + }; + listener_service.await?; + Ok(()) + } + + /// Entrypoint for HTTP/1.1, 2 and 3 servers + pub async fn start(&self) -> RpxyResult<()> { + let proxy_service = async { + // if self.tls_enabled { + // self.start_with_tls().await + // } else { + self.start_without_tls().await + // } + }; + + match &self.globals.term_notify { + Some(term) => { + tokio::select! { + _ = proxy_service => { + warn!("Proxy service got down"); + } + _ = term.notified() => { + info!("Proxy service listening on {} receives term signal", self.listening_on); + } + } + } + None => { + proxy_service.await?; + warn!("Proxy service got down"); + } + } + + Ok(()) + } +} diff --git a/rpxy-lib/src/proxy/socket.rs b/rpxy-lib/src/proxy/socket.rs new file mode 100644 index 0000000..322b42b --- /dev/null +++ b/rpxy-lib/src/proxy/socket.rs @@ -0,0 +1,46 @@ +use crate::{error::*, log::*}; +#[cfg(feature = "http3-quinn")] +use socket2::{Domain, Protocol, Socket, Type}; +use std::net::SocketAddr; +#[cfg(feature = "http3-quinn")] +use std::net::UdpSocket; +use tokio::net::TcpSocket; + +/// Bind TCP socket to the given `SocketAddr`, and returns the TCP socket with `SO_REUSEADDR` and `SO_REUSEPORT` options. +/// This option is required to re-bind the socket address when the proxy instance is reconstructed. +pub(super) fn bind_tcp_socket(listening_on: &SocketAddr) -> RpxyResult { + let tcp_socket = if listening_on.is_ipv6() { + TcpSocket::new_v6() + } else { + TcpSocket::new_v4() + }?; + tcp_socket.set_reuseaddr(true)?; + tcp_socket.set_reuseport(true)?; + if let Err(e) = tcp_socket.bind(*listening_on) { + error!("Failed to bind TCP socket: {}", e); + return Err(RpxyError::Io(e)); + }; + Ok(tcp_socket) +} + +#[cfg(feature = "http3-quinn")] +/// Bind UDP socket to the given `SocketAddr`, and returns the UDP socket with `SO_REUSEADDR` and `SO_REUSEPORT` options. +/// This option is required to re-bind the socket address when the proxy instance is reconstructed. +pub(super) fn bind_udp_socket(listening_on: &SocketAddr) -> RpxyResult { + let socket = if listening_on.is_ipv6() { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + }?; + socket.set_reuse_address(true)?; // This isn't necessary? + socket.set_reuse_port(true)?; + socket.set_nonblocking(true)?; // This was made true inside quinn. so this line isn't necessary here. but just in case. + + if let Err(e) = socket.bind(&(*listening_on).into()) { + error!("Failed to bind UDP socket: {}", e); + return Err(RpxyError::Io(e)); + }; + let udp_socket: UdpSocket = socket.into(); + + Ok(udp_socket) +}