From a9ce26ae7657f431e039a0864bacdb7a82c205a2 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Mon, 27 Nov 2023 15:39:19 +0900 Subject: [PATCH] wip: implementing message handler --- rpxy-lib/src/error.rs | 4 + rpxy-lib/src/hyper_ext/body_type.rs | 26 +--- rpxy-lib/src/hyper_ext/mod.rs | 3 +- rpxy-lib/src/lib.rs | 26 ++-- .../src/message_handle/canonical_address.rs | 61 ++++++++ rpxy-lib/src/message_handle/handler.rs | 131 ++++++++++++++++++ rpxy-lib/src/message_handle/http_log.rs | 99 +++++++++++++ rpxy-lib/src/message_handle/http_result.rs | 36 +++++ rpxy-lib/src/message_handle/mod.rs | 8 ++ .../src/message_handle/synthetic_response.rs | 57 ++++++++ rpxy-lib/src/message_handle/utils_request.rs | 43 ++++++ rpxy-lib/src/message_handler/mod.rs | 0 rpxy-lib/src/proxy/proxy_h3.rs | 47 +++---- rpxy-lib/src/proxy/proxy_main.rs | 56 ++++---- rpxy-lib/src/proxy/proxy_quic_quinn.rs | 15 +- rpxy-lib/src/proxy/proxy_quic_s2n.rs | 6 +- 16 files changed, 520 insertions(+), 98 deletions(-) create mode 100644 rpxy-lib/src/message_handle/canonical_address.rs create mode 100644 rpxy-lib/src/message_handle/handler.rs create mode 100644 rpxy-lib/src/message_handle/http_log.rs create mode 100644 rpxy-lib/src/message_handle/http_result.rs create mode 100644 rpxy-lib/src/message_handle/mod.rs create mode 100644 rpxy-lib/src/message_handle/synthetic_response.rs create mode 100644 rpxy-lib/src/message_handle/utils_request.rs delete mode 100644 rpxy-lib/src/message_handler/mod.rs diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 49a1d1c..a05a612 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -69,6 +69,10 @@ pub enum RpxyError { #[error("Failed to build backend app: {0}")] FailedToBuildBackendApp(#[from] crate::backend::BackendAppBuilderError), + // Handler errors + #[error("Failed to build message handler: {0}")] + FailedToBuildMessageHandler(#[from] crate::message_handle::HttpMessageHandlerBuilderError), + // Upstream connection setting errors #[error("Unsupported upstream option")] UnsupportedUpstreamOption, diff --git a/rpxy-lib/src/hyper_ext/body_type.rs b/rpxy-lib/src/hyper_ext/body_type.rs index ba1bdc2..516e569 100644 --- a/rpxy-lib/src/hyper_ext/body_type.rs +++ b/rpxy-lib/src/hyper_ext/body_type.rs @@ -1,5 +1,3 @@ -use crate::error::*; -use http::{Response, StatusCode}; use http_body_util::{combinators, BodyExt, Either, Empty, Full}; use hyper::body::{Bytes, Incoming}; @@ -8,30 +6,8 @@ pub(crate) type BoxBody = combinators::BoxBody; /// Type for either passthrough body or given body type, specifically synthetic boxed body pub(crate) type IncomingOr = Either; -/// helper function to build http response with passthrough body -pub(crate) fn passthrough_response(response: Response) -> RpxyResult>> -where - B: hyper::body::Body, -{ - Ok(response.map(IncomingOr::Left)) -} - -/// helper function to build http response with synthetic body -pub(crate) fn synthetic_response(response: Response) -> RpxyResult>> { - Ok(response.map(IncomingOr::Right)) -} - -/// build http response with status code of 4xx and 5xx -pub(crate) fn synthetic_error_response(status_code: StatusCode) -> RpxyResult>> { - let res = Response::builder() - .status(status_code) - .body(IncomingOr::Right(BoxBody::new(empty()))) - .unwrap(); - Ok(res) -} - /// helper function to build a empty body -fn empty() -> BoxBody { +pub(crate) fn empty() -> BoxBody { Empty::::new().map_err(|never| match never {}).boxed() } diff --git a/rpxy-lib/src/hyper_ext/mod.rs b/rpxy-lib/src/hyper_ext/mod.rs index 19511a1..a39aef9 100644 --- a/rpxy-lib/src/hyper_ext/mod.rs +++ b/rpxy-lib/src/hyper_ext/mod.rs @@ -8,6 +8,5 @@ pub(crate) mod rt { } pub(crate) mod body { pub(crate) use super::body_incoming_like::IncomingLike; - pub(crate) use super::body_type::{BoxBody, IncomingOr}; + pub(crate) use super::body_type::{empty, full, BoxBody, IncomingOr}; } -pub(crate) use body_type::{full, passthrough_response, synthetic_error_response, synthetic_response}; diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index 4e63cbe..c45327a 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -6,11 +6,14 @@ mod error; mod globals; mod hyper_ext; mod log; -mod message_handler; +mod message_handle; mod name_exp; mod proxy; -use crate::{crypto::build_cert_reloader, error::*, globals::Globals, log::*, proxy::Proxy}; +use crate::{ + crypto::build_cert_reloader, error::*, globals::Globals, log::*, message_handle::HttpMessageHandlerBuilder, + proxy::Proxy, +}; use futures::future::select_all; use std::sync::Arc; @@ -86,16 +89,15 @@ where cert_reloader_rx: cert_reloader_rx.clone(), }); - // TODO: 2. build message handler with Arc-ed http_client and backends, and make it contained in Arc as well - // // build message handler including a request forwarder - // let msg_handler = Arc::new( - // HttpMessageHandlerBuilder::default() - // // .forwarder(Arc::new(Forwarder::new(&globals).await)) - // .globals(globals.clone()) - // .build()?, - // ); + // 4. build message handler containing Arc-ed http_client and backends, and make it contained in Arc as well + let message_handler = Arc::new( + HttpMessageHandlerBuilder::default() + .globals(globals.clone()) + .app_manager(app_manager.clone()) + .build()?, + ); - // TODO: 3. spawn each proxy for a given socket with copied Arc-ed message_handler. + // 5. spawn each proxy for a given socket with copied Arc-ed message_handler. // build hyper connection builder shared with proxy instances let connection_builder = proxy::connection_builder(&globals); @@ -111,7 +113,7 @@ where listening_on, tls_enabled, connection_builder: connection_builder.clone(), - // TODO: message_handler + message_handler: message_handler.clone(), }; globals.runtime_handle.spawn(async move { proxy.start().await }) }); diff --git a/rpxy-lib/src/message_handle/canonical_address.rs b/rpxy-lib/src/message_handle/canonical_address.rs new file mode 100644 index 0000000..32dad78 --- /dev/null +++ b/rpxy-lib/src/message_handle/canonical_address.rs @@ -0,0 +1,61 @@ +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; + +/// Trait to convert an IP address to its canonical form +pub trait ToCanonical { + fn to_canonical(&self) -> Self; +} + +impl ToCanonical for SocketAddr { + fn to_canonical(&self) -> Self { + match self { + SocketAddr::V4(_) => *self, + SocketAddr::V6(v6) => match v6.ip().to_ipv4() { + Some(mapped) => { + if mapped == Ipv4Addr::new(0, 0, 0, 1) { + *self + } else { + SocketAddr::new(IpAddr::V4(mapped), self.port()) + } + } + None => *self, + }, + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::net::Ipv6Addr; + #[test] + fn ipv4_loopback_to_canonical() { + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080); + assert_eq!(socket.to_canonical(), socket); + } + #[test] + fn ipv6_loopback_to_canonical() { + let socket = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 8080); + assert_eq!(socket.to_canonical(), socket); + } + #[test] + fn ipv4_to_canonical() { + let socket = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 168, 1, 1)), 8080); + assert_eq!(socket.to_canonical(), socket); + } + #[test] + fn ipv6_to_canonical() { + let socket = SocketAddr::new( + IpAddr::V6(Ipv6Addr::new(0x2001, 0x0db8, 0, 0, 0, 0, 0xdead, 0xbeef)), + 8080, + ); + assert_eq!(socket.to_canonical(), socket); + } + #[test] + fn ipv4_mapped_to_ipv6_to_canonical() { + let socket = SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0xffff, 0xc00a, 0x2ff)), 8080); + assert_eq!( + socket.to_canonical(), + SocketAddr::new(IpAddr::V4(Ipv4Addr::new(192, 10, 2, 255)), 8080) + ); + } +} diff --git a/rpxy-lib/src/message_handle/handler.rs b/rpxy-lib/src/message_handle/handler.rs new file mode 100644 index 0000000..145d8ba --- /dev/null +++ b/rpxy-lib/src/message_handle/handler.rs @@ -0,0 +1,131 @@ +use super::{ + http_log::HttpMessageLog, + http_result::{HttpError, HttpResult}, + synthetic_response::{secure_redirection_response, synthetic_error_response}, + utils_request::ParseHost, +}; +use crate::{ + backend::BackendAppManager, + crypto::CryptoSource, + error::*, + globals::Globals, + hyper_ext::body::{BoxBody, IncomingLike, IncomingOr}, + log::*, + name_exp::ServerName, +}; +use derive_builder::Builder; +use http::{Request, Response, StatusCode}; +use std::{net::SocketAddr, sync::Arc}; + +#[derive(Clone, Builder)] +/// HTTP message handler for requests from clients and responses from backend applications, +/// responsible to manipulate and forward messages to upstream backends and downstream clients. +// pub struct HttpMessageHandler +pub struct HttpMessageHandler +where + // T: Connect + Clone + Sync + Send + 'static, + U: CryptoSource + Clone, +{ + // forwarder: Arc>, + globals: Arc, + app_manager: Arc>, +} + +impl HttpMessageHandler +where + // T: Connect + Clone + Sync + Send + 'static, + U: CryptoSource + Clone, +{ + /// Handle incoming request message from a client. + /// Responsible to passthrough responses from backend applications or generate synthetic error responses. + pub async fn handle_request( + &self, + mut req: Request>, + client_addr: SocketAddr, // For access control + listen_addr: SocketAddr, + tls_enabled: bool, + tls_server_name: Option, + ) -> RpxyResult>> { + let mut log_data = HttpMessageLog::from(&req); + + let http_result = self + .handle_request_inner( + &mut log_data, + req, + client_addr, + listen_addr, + tls_enabled, + tls_server_name, + ) + .await; + + // passthrough or synthetic response + match http_result { + Ok(v) => { + log_data.status_code(&v.status()).output(); + Ok(v) + } + Err(e) => { + debug!("{e}"); + let code = StatusCode::from(e); + log_data.status_code(&code).output(); + synthetic_error_response(code) + } + } + } + + /// Handle inner with no synthetic error response. + /// Synthetic response is generated by caller. + async fn handle_request_inner( + &self, + mut log_data: &mut HttpMessageLog, + mut req: Request>, + client_addr: SocketAddr, // For access control + listen_addr: SocketAddr, + tls_enabled: bool, + tls_server_name: Option, + ) -> HttpResult>> { + // preparing log data + let mut log_data = HttpMessageLog::from(&req); + log_data.client_addr(&client_addr); + + // Here we start to handle with server_name + let server_name = req.parse_host().map(ServerName::from)?; + + // check consistency of between TLS SNI and HOST/Request URI Line. + #[allow(clippy::collapsible_if)] + if tls_enabled && self.globals.proxy_config.sni_consistency { + if server_name != tls_server_name.unwrap_or_default() { + return Err(HttpError::SniHostInconsistency); + } + } + // Find backend application for given server_name, and drop if incoming request is invalid as request. + let backend_app = match self.app_manager.apps.get(&server_name) { + Some(backend_app) => backend_app, + None => { + let Some(default_server_name) = &self.app_manager.default_server_name else { + return Err(HttpError::NoMatchingBackendApp); + }; + debug!("Serving by default app"); + self.app_manager.apps.get(default_server_name).unwrap() + } + }; + + // Redirect to https if !tls_enabled and redirect_to_https is true + if !tls_enabled && backend_app.https_redirection.unwrap_or(false) { + debug!( + "Redirect to secure connection: {}", + <&ServerName as TryInto>::try_into(&backend_app.server_name).unwrap_or_default() + ); + return secure_redirection_response(&backend_app.server_name, self.globals.proxy_config.https_port, &req); + } + + ////////////// + // // TODO: remove later + let body = crate::hyper_ext::body::full(hyper::body::Bytes::from("not yet implemented")); + let res = super::synthetic_response::synthetic_response(Response::builder().body(body).unwrap()); + Ok(res) + ////////////// + // todo!() + } +} diff --git a/rpxy-lib/src/message_handle/http_log.rs b/rpxy-lib/src/message_handle/http_log.rs new file mode 100644 index 0000000..7056c80 --- /dev/null +++ b/rpxy-lib/src/message_handle/http_log.rs @@ -0,0 +1,99 @@ +use super::canonical_address::ToCanonical; +use crate::log::*; +use http::header; +use std::net::SocketAddr; + +/// Struct to log HTTP messages +#[derive(Debug, Clone)] +pub struct HttpMessageLog { + // pub tls_server_name: String, + pub client_addr: String, + pub method: String, + pub host: String, + pub p_and_q: String, + pub version: hyper::Version, + pub uri_scheme: String, + pub uri_host: String, + pub ua: String, + pub xff: String, + pub status: String, + pub upstream: String, +} + +impl From<&hyper::Request> for HttpMessageLog { + fn from(req: &hyper::Request) -> Self { + let header_mapper = |v: header::HeaderName| { + req + .headers() + .get(v) + .map_or_else(|| "", |s| s.to_str().unwrap_or("")) + .to_string() + }; + Self { + // tls_server_name: "".to_string(), + client_addr: "".to_string(), + method: req.method().to_string(), + host: header_mapper(header::HOST), + p_and_q: req + .uri() + .path_and_query() + .map_or_else(|| "", |v| v.as_str()) + .to_string(), + version: req.version(), + uri_scheme: req.uri().scheme_str().unwrap_or("").to_string(), + uri_host: req.uri().host().unwrap_or("").to_string(), + ua: header_mapper(header::USER_AGENT), + xff: header_mapper(header::HeaderName::from_static("x-forwarded-for")), + status: "".to_string(), + upstream: "".to_string(), + } + } +} + +impl HttpMessageLog { + pub fn client_addr(&mut self, client_addr: &SocketAddr) -> &mut Self { + self.client_addr = client_addr.to_canonical().to_string(); + self + } + // pub fn tls_server_name(&mut self, tls_server_name: &str) -> &mut Self { + // self.tls_server_name = tls_server_name.to_string(); + // self + // } + pub fn status_code(&mut self, status_code: &hyper::StatusCode) -> &mut Self { + self.status = status_code.to_string(); + self + } + pub fn xff(&mut self, xff: &Option<&header::HeaderValue>) -> &mut Self { + self.xff = xff.map_or_else(|| "", |v| v.to_str().unwrap_or("")).to_string(); + self + } + pub fn upstream(&mut self, upstream: &hyper::Uri) -> &mut Self { + self.upstream = upstream.to_string(); + self + } + + pub fn output(&self) { + info!( + "{} <- {} -- {} {} {:?} -- {} -- {} \"{}\", \"{}\" \"{}\"", + if !self.host.is_empty() { + self.host.as_str() + } else { + self.uri_host.as_str() + }, + self.client_addr, + self.method, + self.p_and_q, + self.version, + self.status, + if !self.uri_scheme.is_empty() && !self.uri_host.is_empty() { + format!("{}://{}", self.uri_scheme, self.uri_host) + } else { + "".to_string() + }, + self.ua, + self.xff, + self.upstream, + // self.tls_server_name + ); + } +} diff --git a/rpxy-lib/src/message_handle/http_result.rs b/rpxy-lib/src/message_handle/http_result.rs new file mode 100644 index 0000000..8e9d6b4 --- /dev/null +++ b/rpxy-lib/src/message_handle/http_result.rs @@ -0,0 +1,36 @@ +use http::StatusCode; +use thiserror::Error; + +/// HTTP result type, T is typically a hyper::Response +/// HttpError is used to generate a synthetic error response +pub(crate) type HttpResult = std::result::Result; + +/// Describes things that can go wrong in the forwarder +#[derive(Debug, Error)] +pub enum HttpError { + #[error("No host is give nin request header")] + NoHostInRequestHeader, + #[error("Invalid host in request header")] + InvalidHostInRequestHeader, + #[error("SNI and Host header mismatch")] + SniHostInconsistency, + #[error("No matching backend app")] + NoMatchingBackendApp, + #[error("Failed to redirect: {0}")] + FailedToRedirect(String), + + #[error(transparent)] + Other(#[from] anyhow::Error), +} + +impl From for StatusCode { + fn from(e: HttpError) -> StatusCode { + match e { + HttpError::NoHostInRequestHeader => StatusCode::BAD_REQUEST, + HttpError::InvalidHostInRequestHeader => StatusCode::BAD_REQUEST, + HttpError::SniHostInconsistency => StatusCode::MISDIRECTED_REQUEST, + HttpError::NoMatchingBackendApp => StatusCode::SERVICE_UNAVAILABLE, + _ => StatusCode::INTERNAL_SERVER_ERROR, + } + } +} diff --git a/rpxy-lib/src/message_handle/mod.rs b/rpxy-lib/src/message_handle/mod.rs new file mode 100644 index 0000000..f00b417 --- /dev/null +++ b/rpxy-lib/src/message_handle/mod.rs @@ -0,0 +1,8 @@ +mod canonical_address; +mod handler; +mod http_log; +mod http_result; +mod synthetic_response; +mod utils_request; + +pub(crate) use handler::{HttpMessageHandler, HttpMessageHandlerBuilder, HttpMessageHandlerBuilderError}; diff --git a/rpxy-lib/src/message_handle/synthetic_response.rs b/rpxy-lib/src/message_handle/synthetic_response.rs new file mode 100644 index 0000000..0038997 --- /dev/null +++ b/rpxy-lib/src/message_handle/synthetic_response.rs @@ -0,0 +1,57 @@ +use crate::{ + error::*, + hyper_ext::body::{empty, BoxBody, IncomingOr}, + name_exp::ServerName, +}; +use http::{Request, Response, StatusCode, Uri}; +use hyper::body::Incoming; + +use super::http_result::{HttpError, HttpResult}; + +/// helper function to build http response with passthrough body +pub(crate) fn passthrough_response(response: Response) -> Response> +where + B: hyper::body::Body, +{ + response.map(IncomingOr::Left) +} + +/// helper function to build http response with synthetic body +pub(crate) fn synthetic_response(response: Response) -> Response> { + response.map(IncomingOr::Right) +} + +/// build http response with status code of 4xx and 5xx +pub(crate) fn synthetic_error_response(status_code: StatusCode) -> RpxyResult>> { + let res = Response::builder() + .status(status_code) + .body(IncomingOr::Right(empty())) + .unwrap(); + Ok(res) +} + +/// Generate synthetic response message of a redirection to https host with 301 +pub(super) fn secure_redirection_response( + server_name: &ServerName, + tls_port: Option, + req: &Request, +) -> HttpResult>> { + let server_name: String = server_name.try_into().unwrap_or_default(); + let pq = match req.uri().path_and_query() { + Some(x) => x.as_str(), + _ => "", + }; + let new_uri = Uri::builder().scheme("https").path_and_query(pq); + let dest_uri = match tls_port { + Some(443) | None => new_uri.authority(server_name), + Some(p) => new_uri.authority(format!("{server_name}:{p}")), + } + .build() + .map_err(|e| HttpError::FailedToRedirect(e.to_string()))?; + let response = Response::builder() + .status(StatusCode::MOVED_PERMANENTLY) + .header("Location", dest_uri.to_string()) + .body(IncomingOr::Right(empty())) + .map_err(|e| HttpError::FailedToRedirect(e.to_string()))?; + Ok(response) +} diff --git a/rpxy-lib/src/message_handle/utils_request.rs b/rpxy-lib/src/message_handle/utils_request.rs new file mode 100644 index 0000000..a8f9bd4 --- /dev/null +++ b/rpxy-lib/src/message_handle/utils_request.rs @@ -0,0 +1,43 @@ +use super::http_result::*; +use http::{header, Request}; + +/// Trait defining parser of hostname +pub trait ParseHost { + type Error; + fn parse_host(&self) -> Result<&[u8], Self::Error>; +} +impl ParseHost for Request { + type Error = HttpError; + /// Extract hostname from either the request HOST header or request line + fn parse_host(&self) -> HttpResult<&[u8]> { + let headers_host = self.headers().get(header::HOST); + let uri_host = self.uri().host(); + // let uri_port = self.uri().port_u16(); + + if !(!(headers_host.is_none() && uri_host.is_none())) { + return Err(HttpError::NoHostInRequestHeader); + } + + // prioritize server_name in uri + uri_host.map_or_else( + || { + let m = headers_host.unwrap().as_bytes(); + if m.starts_with(&[b'[']) { + // v6 address with bracket case. if port is specified, always it is in this case. + let mut iter = m.split(|ptr| ptr == &b'[' || ptr == &b']'); + iter.next().ok_or(HttpError::InvalidHostInRequestHeader)?; // first item is always blank + iter.next().ok_or(HttpError::InvalidHostInRequestHeader) + } else if m.len() - m.split(|v| v == &b':').fold(0, |acc, s| acc + s.len()) >= 2 { + // v6 address case, if 2 or more ':' is contained + Ok(m) + } else { + // v4 address or hostname + m.split(|colon| colon == &b':') + .next() + .ok_or(HttpError::InvalidHostInRequestHeader) + } + }, + |v| Ok(v.as_bytes()), + ) + } +} diff --git a/rpxy-lib/src/message_handler/mod.rs b/rpxy-lib/src/message_handler/mod.rs deleted file mode 100644 index e69de29..0000000 diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index 6ca6528..922b857 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -1,10 +1,8 @@ use super::proxy_main::Proxy; use crate::{ + crypto::CryptoSource, error::*, - hyper_ext::{ - body::{IncomingLike, IncomingOr}, - full, synthetic_response, - }, + hyper_ext::body::{IncomingLike, IncomingOr}, log::*, name_exp::ServerName, }; @@ -22,13 +20,11 @@ use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic // use futures::Stream; // use hyper_util::client::legacy::connect::Connect; -// impl Proxy -// where -// // T: Connect + Clone + Sync + Send + 'static, -// U: CryptoSource + Clone + Sync + Send + 'static, -// { - -impl Proxy { +impl Proxy +where + // T: Connect + Clone + Sync + Send + 'static, + U: CryptoSource + Clone + Sync + Send + 'static, +{ pub(super) async fn h3_serve_connection( &self, quic_connection: C, @@ -151,24 +147,17 @@ impl Proxy { }); let mut new_req: Request> = Request::from_parts(req_parts, IncomingOr::Right(req_body)); - - // let res = selfw - // .msg_handler - // .clone() - // .handle_request( - // new_req, - // client_addr, - // self.listening_on, - // self.tls_enabled, - // Some(tls_server_name), - // ) - // .await?; - - // TODO: TODO: TODO: remove later - let body = full(Bytes::from("hello h3 echo")); - // here response is IncomingOr from message handler - let res = synthetic_response(Response::builder().body(body).unwrap())?; - ///////////////// + // Response> wrapped by RpxyResult + let res = self + .message_handler + .handle_request( + new_req, + client_addr, + self.listening_on, + self.tls_enabled, + Some(tls_server_name), + ) + .await?; let (new_res_parts, new_body) = res.into_parts(); let new_res = Response::from_parts(new_res_parts, ()); diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index cc6636d..abdea64 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -1,16 +1,15 @@ use super::socket::bind_tcp_socket; use crate::{ constants::TLS_HANDSHAKE_TIMEOUT_SEC, - crypto::{ServerCrypto, SniServerCryptoMap}, + crypto::{CryptoSource, ServerCrypto, SniServerCryptoMap}, error::*, globals::Globals, hyper_ext::{ body::{BoxBody, IncomingOr}, - full, rt::LocalExecutor, - synthetic_response, }, log::*, + message_handle::HttpMessageHandler, name_exp::ServerName, }; use futures::{select, FutureExt}; @@ -26,34 +25,37 @@ use tokio::time::timeout; /// Wrapper function to handle request for HTTP/1.1 and HTTP/2 /// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler -async fn serve_request( +async fn serve_request( mut req: Request, // handler: Arc>, - // handler: Arc>, + handler: Arc>, client_addr: SocketAddr, listen_addr: SocketAddr, tls_enabled: bool, tls_server_name: Option, -) -> RpxyResult>> { - // match handler - // .handle_request(req, client_addr, listen_addr, tls_enabled, tls_server_name) - // .await? - // { - // Ok(res) => passthrough_response(res), - // Err(e) => synthetic_error_response(StatusCode::from(e)), - // } - - ////////////// - // TODO: remove later - let body = full(hyper::body::Bytes::from("hello")); - let res = Response::builder().body(body).unwrap(); - synthetic_response(res) - ////////////// +) -> RpxyResult>> +where + // T: Connect + Clone + Sync + Send + 'static, + U: CryptoSource + Clone, +{ + handler + .handle_request( + req.map(IncomingOr::Left), + client_addr, + listen_addr, + tls_enabled, + tls_server_name, + ) + .await } #[derive(Clone)] /// Proxy main object responsible to serve requests received from clients at the given socket address. -pub(crate) struct Proxy { +pub(crate) struct Proxy +where + // T: Connect + Clone + Sync + Send + 'static, + U: CryptoSource + Clone + Sync + Send + 'static, +{ /// global context shared among async tasks pub globals: Arc, /// listen socket address @@ -62,9 +64,15 @@ pub(crate) struct Proxy { pub tls_enabled: bool, /// hyper connection builder serving http request pub connection_builder: Arc>, + /// message handler serving incoming http request + pub message_handler: Arc>, } -impl Proxy { +impl Proxy +where + // T: Connect + Clone + Sync + Send + 'static, + U: CryptoSource + Clone + Sync + Send + 'static, +{ /// Serves requests from clients fn serve_connection(&self, stream: I, peer_addr: SocketAddr, tls_server_name: Option) where @@ -78,7 +86,7 @@ impl Proxy { debug!("Request incoming: current # {}", request_count.current()); let server_clone = self.connection_builder.clone(); - // let msg_handler_clone = self.msg_handler.clone(); + let message_handler_clone = self.message_handler.clone(); let timeout_sec = self.globals.proxy_config.proxy_timeout; let tls_enabled = self.tls_enabled; let listening_on = self.listening_on; @@ -90,7 +98,7 @@ impl Proxy { service_fn(move |req: Request| { serve_request( req, - // msg_handler_clone.clone(), + message_handler_clone.clone(), peer_addr, listening_on, tls_enabled, diff --git a/rpxy-lib/src/proxy/proxy_quic_quinn.rs b/rpxy-lib/src/proxy/proxy_quic_quinn.rs index bde3b00..8380f6e 100644 --- a/rpxy-lib/src/proxy/proxy_quic_quinn.rs +++ b/rpxy-lib/src/proxy/proxy_quic_quinn.rs @@ -1,15 +1,20 @@ use super::proxy_main::Proxy; use super::socket::bind_udp_socket; -use crate::{crypto::ServerCrypto, error::*, log::*, name_exp::ByteName}; +use crate::{ + crypto::{CryptoSource, ServerCrypto}, + error::*, + log::*, + name_exp::ByteName, +}; // use hyper_util::client::legacy::connect::Connect; use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerConfig, TransportConfig}; use rustls::ServerConfig; use std::sync::Arc; -impl Proxy -// where -// // T: Connect + Clone + Sync + Send + 'static, -// U: CryptoSource + Clone + Sync + Send + 'static, +impl Proxy +where + // T: Connect + Clone + Sync + Send + 'static, + U: CryptoSource + Clone + Sync + Send + 'static, { pub(super) async fn h3_listener_service(&self) -> RpxyResult<()> { let Some(mut server_crypto_rx) = self.globals.cert_reloader_rx.clone() else { diff --git a/rpxy-lib/src/proxy/proxy_quic_s2n.rs b/rpxy-lib/src/proxy/proxy_quic_s2n.rs index 32be619..3ab41d0 100644 --- a/rpxy-lib/src/proxy/proxy_quic_s2n.rs +++ b/rpxy-lib/src/proxy/proxy_quic_s2n.rs @@ -10,7 +10,11 @@ use std::sync::Arc; // use hyper_util::client::legacy::connect::Connect; use s2n_quic::provider; -impl Proxy { +impl Proxy +where + // T: Connect + Clone + Sync + Send + 'static, + U: CryptoSource + Clone + Sync + Send + 'static, +{ /// Start UDP proxy serving with HTTP/3 request for configured host names pub(super) async fn h3_listener_service(&self) -> RpxyResult<()> { let Some(mut server_crypto_rx) = self.globals.cert_reloader_rx.clone() else {