diff --git a/README.md b/README.md index ef5d0fe..20d7891 100644 --- a/README.md +++ b/README.md @@ -104,11 +104,11 @@ If you want to host multiple and distinct domain names in a single IP address/po ```toml default_application = "app1" -[app.app1] +[apps.app1] server_name = "app1.example.com" #... -[app.app2] +[apps.app2] server_name = "app2.example.org" #... ``` diff --git a/config-example.toml b/config-example.toml index ec79f3d..458061c 100644 --- a/config-example.toml +++ b/config-example.toml @@ -57,8 +57,8 @@ upstream = [ ] load_balance = "round_robin" # or "random" or "sticky" (sticky session) or "none" (fix to the first one, default) upstream_options = [ - "override_host", - "force_http2_upstream", # mutually exclusive with "force_http11_upstream" + "disable_override_host", # do not overwrite HOST value with upstream hostname (like 192.168.xx.x seen from rpxy) + "force_http2_upstream", # mutually exclusive with "force_http11_upstream" ] # Non-default destination in "localhost" app, which is routed by "path" @@ -76,7 +76,7 @@ upstream = [ ] load_balance = "random" # or "round_robin" or "sticky" (sticky session) or "none" (fix to the first one, default) upstream_options = [ - "override_host", + "disable_override_host", "upgrade_insecure_requests", "force_http11_upstream", ] diff --git a/rpxy-lib/src/backend/load_balance/mod.rs b/rpxy-lib/src/backend/load_balance/mod.rs index d876517..38d312b 100644 --- a/rpxy-lib/src/backend/load_balance/mod.rs +++ b/rpxy-lib/src/backend/load_balance/mod.rs @@ -12,6 +12,8 @@ pub use load_balance_main::{ }; #[cfg(feature = "sticky-cookie")] pub use load_balance_sticky::LoadBalanceStickyBuilder; +#[cfg(feature = "sticky-cookie")] +pub use sticky_cookie::{StickyCookie, StickyCookieValue}; /// Result type for load balancing type LoadBalanceResult = std::result::Result; diff --git a/rpxy-lib/src/backend/mod.rs b/rpxy-lib/src/backend/mod.rs index 68f97a8..3d3ddbf 100644 --- a/rpxy-lib/src/backend/mod.rs +++ b/rpxy-lib/src/backend/mod.rs @@ -3,12 +3,11 @@ mod load_balance; mod upstream; mod upstream_opts; -pub use backend_main::{BackendAppBuilderError, BackendAppManager}; -pub use upstream::Upstream; // #[cfg(feature = "sticky-cookie")] -// pub use sticky_cookie::{StickyCookie, StickyCookieValue}; -// pub use self::{ -// load_balance::{LbContext, LoadBalance}, -// upstream::{ReverseProxy, Upstream, UpstreamGroup, UpstreamGroupBuilder}, -// upstream_opts::UpstreamOption, -// }; +// pub use self::load_balance::{StickyCookie, StickyCookieValue}; +pub(crate) use self::{ + load_balance::{LoadBalance, LoadBalanceContext, StickyCookie, StickyCookieValue}, + upstream::{PathManager, Upstream, UpstreamCandidates}, + upstream_opts::UpstreamOption, +}; +pub(crate) use backend_main::{BackendAppBuilderError, BackendAppManager}; diff --git a/rpxy-lib/src/backend/upstream_opts.rs b/rpxy-lib/src/backend/upstream_opts.rs index 3f5fbc8..f19acb4 100644 --- a/rpxy-lib/src/backend/upstream_opts.rs +++ b/rpxy-lib/src/backend/upstream_opts.rs @@ -2,7 +2,7 @@ use crate::error::*; #[derive(Debug, Clone, Hash, Eq, PartialEq)] pub enum UpstreamOption { - OverrideHost, + DisableOverrideHost, UpgradeInsecureRequests, ForceHttp11Upstream, ForceHttp2Upstream, @@ -12,7 +12,7 @@ impl TryFrom<&str> for UpstreamOption { type Error = RpxyError; fn try_from(val: &str) -> RpxyResult { match val { - "override_host" => Ok(Self::OverrideHost), + "diaable_override_host" => Ok(Self::DisableOverrideHost), "upgrade_insecure_requests" => Ok(Self::UpgradeInsecureRequests), "force_http11_upstream" => Ok(Self::ForceHttp11Upstream), "force_http2_upstream" => Ok(Self::ForceHttp2Upstream), diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index a05a612..da65234 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -1,4 +1,3 @@ -pub use anyhow::{anyhow, bail, ensure, Context}; use thiserror::Error; pub type RpxyResult = std::result::Result; diff --git a/rpxy-lib/src/message_handle/handler.rs b/rpxy-lib/src/message_handle/handler.rs deleted file mode 100644 index 145d8ba..0000000 --- a/rpxy-lib/src/message_handle/handler.rs +++ /dev/null @@ -1,131 +0,0 @@ -use super::{ - http_log::HttpMessageLog, - http_result::{HttpError, HttpResult}, - synthetic_response::{secure_redirection_response, synthetic_error_response}, - utils_request::ParseHost, -}; -use crate::{ - backend::BackendAppManager, - crypto::CryptoSource, - error::*, - globals::Globals, - hyper_ext::body::{BoxBody, IncomingLike, IncomingOr}, - log::*, - name_exp::ServerName, -}; -use derive_builder::Builder; -use http::{Request, Response, StatusCode}; -use std::{net::SocketAddr, sync::Arc}; - -#[derive(Clone, Builder)] -/// HTTP message handler for requests from clients and responses from backend applications, -/// responsible to manipulate and forward messages to upstream backends and downstream clients. -// pub struct HttpMessageHandler -pub struct HttpMessageHandler -where - // T: Connect + Clone + Sync + Send + 'static, - U: CryptoSource + Clone, -{ - // forwarder: Arc>, - globals: Arc, - app_manager: Arc>, -} - -impl HttpMessageHandler -where - // T: Connect + Clone + Sync + Send + 'static, - U: CryptoSource + Clone, -{ - /// Handle incoming request message from a client. - /// Responsible to passthrough responses from backend applications or generate synthetic error responses. - pub async fn handle_request( - &self, - mut req: Request>, - client_addr: SocketAddr, // For access control - listen_addr: SocketAddr, - tls_enabled: bool, - tls_server_name: Option, - ) -> RpxyResult>> { - let mut log_data = HttpMessageLog::from(&req); - - let http_result = self - .handle_request_inner( - &mut log_data, - req, - client_addr, - listen_addr, - tls_enabled, - tls_server_name, - ) - .await; - - // passthrough or synthetic response - match http_result { - Ok(v) => { - log_data.status_code(&v.status()).output(); - Ok(v) - } - Err(e) => { - debug!("{e}"); - let code = StatusCode::from(e); - log_data.status_code(&code).output(); - synthetic_error_response(code) - } - } - } - - /// Handle inner with no synthetic error response. - /// Synthetic response is generated by caller. - async fn handle_request_inner( - &self, - mut log_data: &mut HttpMessageLog, - mut req: Request>, - client_addr: SocketAddr, // For access control - listen_addr: SocketAddr, - tls_enabled: bool, - tls_server_name: Option, - ) -> HttpResult>> { - // preparing log data - let mut log_data = HttpMessageLog::from(&req); - log_data.client_addr(&client_addr); - - // Here we start to handle with server_name - let server_name = req.parse_host().map(ServerName::from)?; - - // check consistency of between TLS SNI and HOST/Request URI Line. - #[allow(clippy::collapsible_if)] - if tls_enabled && self.globals.proxy_config.sni_consistency { - if server_name != tls_server_name.unwrap_or_default() { - return Err(HttpError::SniHostInconsistency); - } - } - // Find backend application for given server_name, and drop if incoming request is invalid as request. - let backend_app = match self.app_manager.apps.get(&server_name) { - Some(backend_app) => backend_app, - None => { - let Some(default_server_name) = &self.app_manager.default_server_name else { - return Err(HttpError::NoMatchingBackendApp); - }; - debug!("Serving by default app"); - self.app_manager.apps.get(default_server_name).unwrap() - } - }; - - // Redirect to https if !tls_enabled and redirect_to_https is true - if !tls_enabled && backend_app.https_redirection.unwrap_or(false) { - debug!( - "Redirect to secure connection: {}", - <&ServerName as TryInto>::try_into(&backend_app.server_name).unwrap_or_default() - ); - return secure_redirection_response(&backend_app.server_name, self.globals.proxy_config.https_port, &req); - } - - ////////////// - // // TODO: remove later - let body = crate::hyper_ext::body::full(hyper::body::Bytes::from("not yet implemented")); - let res = super::synthetic_response::synthetic_response(Response::builder().body(body).unwrap()); - Ok(res) - ////////////// - // todo!() - } -} diff --git a/rpxy-lib/src/message_handle/handler_main.rs b/rpxy-lib/src/message_handle/handler_main.rs new file mode 100644 index 0000000..5be08f1 --- /dev/null +++ b/rpxy-lib/src/message_handle/handler_main.rs @@ -0,0 +1,255 @@ +use super::{ + http_log::HttpMessageLog, + http_result::{HttpError, HttpResult}, + synthetic_response::{secure_redirection_response, synthetic_error_response}, + utils_headers::*, + utils_request::InspectParseHost, +}; +use crate::{ + backend::{BackendAppManager, LoadBalanceContext}, + crypto::CryptoSource, + error::*, + globals::Globals, + hyper_ext::body::{BoxBody, IncomingLike, IncomingOr}, + log::*, + name_exp::ServerName, +}; +use derive_builder::Builder; +use http::{Request, Response, StatusCode}; +use std::{net::SocketAddr, sync::Arc}; + +#[allow(dead_code)] +#[derive(Debug)] +/// Context object to handle sticky cookies at HTTP message handler +pub(super) struct HandlerContext { + #[cfg(feature = "sticky-cookie")] + pub(super) context_lb: Option, + #[cfg(not(feature = "sticky-cookie"))] + pub(super) context_lb: Option<()>, +} + +#[derive(Clone, Builder)] +/// HTTP message handler for requests from clients and responses from backend applications, +/// responsible to manipulate and forward messages to upstream backends and downstream clients. +// pub struct HttpMessageHandler +pub struct HttpMessageHandler +where + // T: Connect + Clone + Sync + Send + 'static, + U: CryptoSource + Clone, +{ + // forwarder: Arc>, + globals: Arc, + app_manager: Arc>, +} + +impl HttpMessageHandler +where + // T: Connect + Clone + Sync + Send + 'static, + U: CryptoSource + Clone, +{ + /// Handle incoming request message from a client. + /// Responsible to passthrough responses from backend applications or generate synthetic error responses. + pub async fn handle_request( + &self, + mut req: Request>, + client_addr: SocketAddr, // For access control + listen_addr: SocketAddr, + tls_enabled: bool, + tls_server_name: Option, + ) -> RpxyResult>> { + let mut log_data = HttpMessageLog::from(&req); + + let http_result = self + .handle_request_inner( + &mut log_data, + req, + client_addr, + listen_addr, + tls_enabled, + tls_server_name, + ) + .await; + + // passthrough or synthetic response + match http_result { + Ok(v) => { + log_data.status_code(&v.status()).output(); + Ok(v) + } + Err(e) => { + debug!("{e}"); + let code = StatusCode::from(e); + log_data.status_code(&code).output(); + synthetic_error_response(code) + } + } + } + + /// Handle inner with no synthetic error response. + /// Synthetic response is generated by caller. + async fn handle_request_inner( + &self, + log_data: &mut HttpMessageLog, + mut req: Request>, + client_addr: SocketAddr, // For access control + listen_addr: SocketAddr, + tls_enabled: bool, + tls_server_name: Option, + ) -> HttpResult>> { + // preparing log data + let mut log_data = HttpMessageLog::from(&req); + log_data.client_addr(&client_addr); + + // Here we start to inspect and parse with server_name + let server_name = req + .inspect_parse_host() + .map(|v| ServerName::from(v.as_slice())) + .map_err(|_e| HttpError::InvalidHostInRequestHeader)?; + + // check consistency of between TLS SNI and HOST/Request URI Line. + #[allow(clippy::collapsible_if)] + if tls_enabled && self.globals.proxy_config.sni_consistency { + if server_name != tls_server_name.unwrap_or_default() { + return Err(HttpError::SniHostInconsistency); + } + } + // Find backend application for given server_name, and drop if incoming request is invalid as request. + let backend_app = match self.app_manager.apps.get(&server_name) { + Some(backend_app) => backend_app, + None => { + let Some(default_server_name) = &self.app_manager.default_server_name else { + return Err(HttpError::NoMatchingBackendApp); + }; + debug!("Serving by default app"); + self.app_manager.apps.get(default_server_name).unwrap() + } + }; + + // Redirect to https if !tls_enabled and redirect_to_https is true + if !tls_enabled && backend_app.https_redirection.unwrap_or(false) { + debug!( + "Redirect to secure connection: {}", + <&ServerName as TryInto>::try_into(&backend_app.server_name).unwrap_or_default() + ); + return secure_redirection_response(&backend_app.server_name, self.globals.proxy_config.https_port, &req); + } + + // Find reverse proxy for given path and choose one of upstream host + // Longest prefix match + let path = req.uri().path(); + let Some(upstream_candidates) = backend_app.path_manager.get(path) else { + return Err(HttpError::NoUpstreamCandidates); + }; + + // Upgrade in request header + let upgrade_in_request = extract_upgrade(req.headers()); + let request_upgraded = req.extensions_mut().remove::(); + + // Build request from destination information + let _context = match self.generate_request_forwarded( + &client_addr, + &listen_addr, + &mut req, + &upgrade_in_request, + upstream_candidates, + tls_enabled, + ) { + Err(e) => { + error!("Failed to generate destination uri for backend application: {}", e); + return Err(HttpError::FailedToGenerateUpstreamRequest(e.to_string())); + } + Ok(v) => v, + }; + debug!( + "Request to be forwarded: uri {}, version {:?}, headers {:?}", + req.uri(), + req.version(), + req.headers() + ); + log_data.xff(&req.headers().get("x-forwarded-for")); + log_data.upstream(req.uri()); + ////// + + ////////////// + // // TODO: remove later + let body = crate::hyper_ext::body::full(hyper::body::Bytes::from("not yet implemented")); + let mut res_backend = super::synthetic_response::synthetic_response(Response::builder().body(body).unwrap()); + // // Forward request to a chosen backend + // let mut res_backend = { + // let Ok(result) = timeout(self.globals.proxy_config.upstream_timeout, self.forwarder.request(req)).await else { + // return self.return_with_error_log(StatusCode::GATEWAY_TIMEOUT, &mut log_data); + // }; + // match result { + // Ok(res) => res, + // Err(e) => { + // error!("Failed to get response from backend: {}", e); + // return self.return_with_error_log(StatusCode::SERVICE_UNAVAILABLE, &mut log_data); + // } + // } + // }; + ////////////// + // Process reverse proxy context generated during the forwarding request generation. + #[cfg(feature = "sticky-cookie")] + if let Some(context_from_lb) = _context.context_lb { + let res_headers = res_backend.headers_mut(); + if let Err(e) = set_sticky_cookie_lb_context(res_headers, &context_from_lb) { + error!("Failed to append context to the response given from backend: {}", e); + return Err(HttpError::FailedToAddSetCookeInResponse); + } + } + + if res_backend.status() != StatusCode::SWITCHING_PROTOCOLS { + // // Generate response to client + // if self.generate_response_forwarded(&mut res_backend, backend).is_err() { + // return self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data); + // } + // log_data.status_code(&res_backend.status()).output(); + // return Ok(res_backend); + } + + // // Handle StatusCode::SWITCHING_PROTOCOLS in response + // let upgrade_in_response = extract_upgrade(res_backend.headers()); + // let should_upgrade = if let (Some(u_req), Some(u_res)) = (upgrade_in_request.as_ref(), upgrade_in_response.as_ref()) + // { + // u_req.to_ascii_lowercase() == u_res.to_ascii_lowercase() + // } else { + // false + // }; + // if !should_upgrade { + // error!( + // "Backend tried to switch to protocol {:?} when {:?} was requested", + // upgrade_in_response, upgrade_in_request + // ); + // return self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data); + // } + // let Some(request_upgraded) = request_upgraded else { + // error!("Request does not have an upgrade extension"); + // return self.return_with_error_log(StatusCode::BAD_REQUEST, &mut log_data); + // }; + // let Some(onupgrade) = res_backend.extensions_mut().remove::() else { + // error!("Response does not have an upgrade extension"); + // return self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data); + // }; + + // self.globals.runtime_handle.spawn(async move { + // let mut response_upgraded = onupgrade.await.map_err(|e| { + // error!("Failed to upgrade response: {}", e); + // RpxyError::Hyper(e) + // })?; + // let mut request_upgraded = request_upgraded.await.map_err(|e| { + // error!("Failed to upgrade request: {}", e); + // RpxyError::Hyper(e) + // })?; + // copy_bidirectional(&mut response_upgraded, &mut request_upgraded) + // .await + // .map_err(|e| { + // error!("Coping between upgraded connections failed: {}", e); + // RpxyError::Io(e) + // })?; + // Ok(()) as Result<()> + // }); + // log_data.status_code(&res_backend.status()).output(); + + Ok(res_backend) + } +} diff --git a/rpxy-lib/src/message_handle/handler_manipulate_messages.rs b/rpxy-lib/src/message_handle/handler_manipulate_messages.rs new file mode 100644 index 0000000..28a62b1 --- /dev/null +++ b/rpxy-lib/src/message_handle/handler_manipulate_messages.rs @@ -0,0 +1,195 @@ +use super::{ + handler_main::HandlerContext, utils_headers::*, utils_request::apply_upstream_options_to_request_line, + HttpMessageHandler, +}; +use crate::{backend::UpstreamCandidates, log::*, CryptoSource}; +use anyhow::{anyhow, ensure, Result}; +use http::{header, uri::Scheme, HeaderValue, Request, Uri, Version}; +use std::net::SocketAddr; + +impl HttpMessageHandler +where + U: CryptoSource + Clone, +{ + //////////////////////////////////////////////////// + // Functions to generate messages + //////////////////////////////////////////////////// + + // /// Manipulate a response message sent from a backend application to forward downstream to a client. + // fn generate_response_forwarded(&self, response: &mut Response, chosen_backend: &Backend) -> Result<()> { + // where + // B: core::fmt::Debug, + // { + // let headers = response.headers_mut(); + // remove_connection_header(headers); + // remove_hop_header(headers); + // add_header_entry_overwrite_if_exist(headers, "server", RESPONSE_HEADER_SERVER)?; + + // #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + // { + // // Manipulate ALT_SVC allowing h3 in response message only when mutual TLS is not enabled + // // TODO: This is a workaround for avoiding a client authentication in HTTP/3 + // if self.globals.proxy_config.http3 + // && chosen_backend + // .crypto_source + // .as_ref() + // .is_some_and(|v| !v.is_mutual_tls()) + // { + // if let Some(port) = self.globals.proxy_config.https_port { + // add_header_entry_overwrite_if_exist( + // headers, + // header::ALT_SVC.as_str(), + // format!( + // "h3=\":{}\"; ma={}, h3-29=\":{}\"; ma={}", + // port, self.globals.proxy_config.h3_alt_svc_max_age, port, self.globals.proxy_config.h3_alt_svc_max_age + // ), + // )?; + // } + // } else { + // // remove alt-svc to disallow requests via http3 + // headers.remove(header::ALT_SVC.as_str()); + // } + // } + // #[cfg(not(any(feature = "http3-quinn", feature = "http3-s2n")))] + // { + // if let Some(port) = self.globals.proxy_config.https_port { + // headers.remove(header::ALT_SVC.as_str()); + // } + // } + + // Ok(()) + // todo!() + // } + + #[allow(clippy::too_many_arguments)] + /// Manipulate a request message sent from a client to forward upstream to a backend application + pub(super) fn generate_request_forwarded( + &self, + client_addr: &SocketAddr, + listen_addr: &SocketAddr, + req: &mut Request, + upgrade: &Option, + upstream_candidates: &UpstreamCandidates, + tls_enabled: bool, + ) -> Result { + debug!("Generate request to be forwarded"); + + // Add te: trailer if contained in original request + let contains_te_trailers = { + if let Some(te) = req.headers().get(header::TE) { + te.as_bytes() + .split(|v| v == &b',' || v == &b' ') + .any(|x| x == "trailers".as_bytes()) + } else { + false + } + }; + + let uri = req.uri().to_string(); + let headers = req.headers_mut(); + // delete headers specified in header.connection + remove_connection_header(headers); + // delete hop headers including header.connection + remove_hop_header(headers); + // X-Forwarded-For + add_forwarding_header(headers, client_addr, listen_addr, tls_enabled, &uri)?; + + // Add te: trailer if te_trailer + if contains_te_trailers { + headers.insert(header::TE, HeaderValue::from_bytes("trailers".as_bytes()).unwrap()); + } + + // add "host" header of original server_name if not exist (default) + if req.headers().get(header::HOST).is_none() { + let org_host = req.uri().host().ok_or_else(|| anyhow!("Invalid request"))?.to_owned(); + req + .headers_mut() + .insert(header::HOST, HeaderValue::from_str(&org_host)?); + }; + + ///////////////////////////////////////////// + // Fix unique upstream destination since there could be multiple ones. + #[cfg(feature = "sticky-cookie")] + let (upstream_chosen_opt, context_from_lb) = { + let context_to_lb = if let crate::backend::LoadBalance::StickyRoundRobin(lb) = &upstream_candidates.load_balance { + takeout_sticky_cookie_lb_context(req.headers_mut(), &lb.sticky_config.name)? + } else { + None + }; + upstream_candidates.get(&context_to_lb) + }; + #[cfg(not(feature = "sticky-cookie"))] + let (upstream_chosen_opt, _) = upstream_candidates.get(&None); + + let upstream_chosen = upstream_chosen_opt.ok_or_else(|| anyhow!("Failed to get upstream"))?; + let context = HandlerContext { + #[cfg(feature = "sticky-cookie")] + context_lb: context_from_lb, + #[cfg(not(feature = "sticky-cookie"))] + context_lb: None, + }; + ///////////////////////////////////////////// + + // apply upstream-specific headers given in upstream_option + let headers = req.headers_mut(); + // by default, host header is overwritten with upstream hostname + override_host_header(headers, &upstream_chosen.uri)?; + // apply upstream options to header + apply_upstream_options_to_header(headers, upstream_candidates)?; + + // update uri in request + ensure!( + upstream_chosen.uri.authority().is_some() && upstream_chosen.uri.scheme().is_some(), + "Upstream uri `scheme` and `authority` is broken" + ); + + let new_uri = Uri::builder() + .scheme(upstream_chosen.uri.scheme().unwrap().as_str()) + .authority(upstream_chosen.uri.authority().unwrap().as_str()); + let org_pq = match req.uri().path_and_query() { + Some(pq) => pq.to_string(), + None => "/".to_string(), + } + .into_bytes(); + + // replace some parts of path if opt_replace_path is enabled for chosen upstream + let new_pq = match &upstream_candidates.replace_path { + Some(new_path) => { + let matched_path: &[u8] = upstream_candidates.path.as_ref(); + ensure!( + !matched_path.is_empty() && org_pq.len() >= matched_path.len(), + "Upstream uri `path and query` is broken" + ); + let mut new_pq = Vec::::with_capacity(org_pq.len() - matched_path.len() + new_path.len()); + new_pq.extend_from_slice(new_path.as_ref()); + new_pq.extend_from_slice(&org_pq[matched_path.len()..]); + new_pq + } + None => org_pq, + }; + *req.uri_mut() = new_uri.path_and_query(new_pq).build()?; + + // upgrade + if let Some(v) = upgrade { + req.headers_mut().insert(header::UPGRADE, v.parse()?); + req + .headers_mut() + .insert(header::CONNECTION, HeaderValue::from_static("upgrade")); + } + + // If not specified (force_httpXX_upstream) and https, version is preserved except for http/3 + if upstream_chosen.uri.scheme() == Some(&Scheme::HTTP) { + // Change version to http/1.1 when destination scheme is http + debug!("Change version to http/1.1 when destination scheme is http unless upstream option enabled."); + *req.version_mut() = Version::HTTP_11; + } else if req.version() == Version::HTTP_3 { + // HTTP/3 is always https + debug!("HTTP/3 is currently unsupported for request to upstream."); + *req.version_mut() = Version::HTTP_2; + } + + apply_upstream_options_to_request_line(req, upstream_candidates)?; + + Ok(context) + } +} diff --git a/rpxy-lib/src/message_handle/http_result.rs b/rpxy-lib/src/message_handle/http_result.rs index 8e9d6b4..3f9df23 100644 --- a/rpxy-lib/src/message_handle/http_result.rs +++ b/rpxy-lib/src/message_handle/http_result.rs @@ -18,6 +18,13 @@ pub enum HttpError { NoMatchingBackendApp, #[error("Failed to redirect: {0}")] FailedToRedirect(String), + #[error("No upstream candidates")] + NoUpstreamCandidates, + #[error("Failed to generate upstream request: {0}")] + FailedToGenerateUpstreamRequest(String), + + #[error("Failed to add set-cookie header in response")] + FailedToAddSetCookeInResponse, #[error(transparent)] Other(#[from] anyhow::Error), @@ -30,6 +37,10 @@ impl From for StatusCode { HttpError::InvalidHostInRequestHeader => StatusCode::BAD_REQUEST, HttpError::SniHostInconsistency => StatusCode::MISDIRECTED_REQUEST, HttpError::NoMatchingBackendApp => StatusCode::SERVICE_UNAVAILABLE, + HttpError::FailedToRedirect(_) => StatusCode::INTERNAL_SERVER_ERROR, + HttpError::NoUpstreamCandidates => StatusCode::NOT_FOUND, + HttpError::FailedToGenerateUpstreamRequest(_) => StatusCode::INTERNAL_SERVER_ERROR, + HttpError::FailedToAddSetCookeInResponse => StatusCode::INTERNAL_SERVER_ERROR, _ => StatusCode::INTERNAL_SERVER_ERROR, } } diff --git a/rpxy-lib/src/message_handle/mod.rs b/rpxy-lib/src/message_handle/mod.rs index f00b417..a9cb195 100644 --- a/rpxy-lib/src/message_handle/mod.rs +++ b/rpxy-lib/src/message_handle/mod.rs @@ -1,8 +1,10 @@ mod canonical_address; -mod handler; +mod handler_main; +mod handler_manipulate_messages; mod http_log; mod http_result; mod synthetic_response; +mod utils_headers; mod utils_request; -pub(crate) use handler::{HttpMessageHandler, HttpMessageHandlerBuilder, HttpMessageHandlerBuilderError}; +pub(crate) use handler_main::{HttpMessageHandler, HttpMessageHandlerBuilder, HttpMessageHandlerBuilderError}; diff --git a/rpxy-lib/src/message_handle/utils_headers.rs b/rpxy-lib/src/message_handle/utils_headers.rs new file mode 100644 index 0000000..32bc7f3 --- /dev/null +++ b/rpxy-lib/src/message_handle/utils_headers.rs @@ -0,0 +1,292 @@ +use super::canonical_address::ToCanonical; +use crate::{ + backend::{UpstreamCandidates, UpstreamOption}, + log::*, +}; +use anyhow::{anyhow, ensure, Result}; +use bytes::BufMut; +use http::{header, HeaderMap, HeaderName, HeaderValue, Uri}; +use std::{borrow::Cow, net::SocketAddr}; + +#[cfg(feature = "sticky-cookie")] +use crate::backend::{LoadBalanceContext, StickyCookie, StickyCookieValue}; +// use crate::backend::{UpstreamGroup, UpstreamOption}; + +// //////////////////////////////////////////////////// +// // Functions to manipulate headers +#[cfg(feature = "sticky-cookie")] +/// Take sticky cookie header value from request header, +/// and returns LoadBalanceContext to be forwarded to LB if exist and if needed. +/// Removing sticky cookie is needed and it must not be passed to the upstream. +pub(super) fn takeout_sticky_cookie_lb_context( + headers: &mut HeaderMap, + expected_cookie_name: &str, +) -> Result> { + let mut headers_clone = headers.clone(); + + match headers_clone.entry(header::COOKIE) { + header::Entry::Vacant(_) => Ok(None), + header::Entry::Occupied(entry) => { + let cookies_iter = entry + .iter() + .flat_map(|v| v.to_str().unwrap_or("").split(';').map(|v| v.trim())); + let (sticky_cookies, without_sticky_cookies): (Vec<_>, Vec<_>) = cookies_iter + .into_iter() + .partition(|v| v.starts_with(expected_cookie_name)); + if sticky_cookies.is_empty() { + return Ok(None); + } + ensure!( + sticky_cookies.len() == 1, + "Invalid cookie: Multiple sticky cookie values" + ); + + let cookies_passed_to_upstream = without_sticky_cookies.join("; "); + let cookie_passed_to_lb = sticky_cookies.first().unwrap(); + headers.remove(header::COOKIE); + headers.insert(header::COOKIE, cookies_passed_to_upstream.parse()?); + + let sticky_cookie = StickyCookie { + value: StickyCookieValue::try_from(cookie_passed_to_lb, expected_cookie_name)?, + info: None, + }; + Ok(Some(LoadBalanceContext { sticky_cookie })) + } + } +} + +#[cfg(feature = "sticky-cookie")] +/// Set-Cookie if LB Sticky is enabled and if cookie is newly created/updated. +/// Set-Cookie response header could be in multiple lines. +/// https://developer.mozilla.org/ja/docs/Web/HTTP/Headers/Set-Cookie +pub(super) fn set_sticky_cookie_lb_context( + headers: &mut HeaderMap, + context_from_lb: &LoadBalanceContext, +) -> Result<()> { + let sticky_cookie_string: String = context_from_lb.sticky_cookie.clone().try_into()?; + let new_header_val: HeaderValue = sticky_cookie_string.parse()?; + let expected_cookie_name = &context_from_lb.sticky_cookie.value.name; + match headers.entry(header::SET_COOKIE) { + header::Entry::Vacant(entry) => { + entry.insert(new_header_val); + } + header::Entry::Occupied(mut entry) => { + let mut flag = false; + for e in entry.iter_mut() { + if e.to_str().unwrap_or("").starts_with(expected_cookie_name) { + *e = new_header_val.clone(); + flag = true; + } + } + if !flag { + entry.append(new_header_val); + } + } + }; + Ok(()) +} + +/// default: overwrite HOST value with upstream hostname (like 192.168.xx.x seen from rpxy) +pub(super) fn override_host_header(headers: &mut HeaderMap, upstream_base_uri: &Uri) -> Result<()> { + let mut upstream_host = upstream_base_uri + .host() + .ok_or_else(|| anyhow!("No hostname is given"))? + .to_string(); + // add port if it is not default + if let Some(port) = upstream_base_uri.port_u16() { + upstream_host = format!("{}:{}", upstream_host, port); + } + + // overwrite host header, this removes all the HOST header values + headers.insert(header::HOST, HeaderValue::from_str(&upstream_host)?); + Ok(()) +} + +/// Apply options to request header, which are specified in the configuration +pub(super) fn apply_upstream_options_to_header( + headers: &mut HeaderMap, + // _client_addr: &SocketAddr, + upstream: &UpstreamCandidates, + // _upstream_base_uri: &Uri, +) -> Result<()> { + for opt in upstream.options.iter() { + match opt { + UpstreamOption::DisableOverrideHost => { + // simply remove HOST header value + headers + .remove(header::HOST) + .ok_or_else(|| anyhow!("Failed to remove host header in disable_override_host option"))?; + } + UpstreamOption::UpgradeInsecureRequests => { + // add upgrade-insecure-requests in request header if not exist + headers + .entry(header::UPGRADE_INSECURE_REQUESTS) + .or_insert(HeaderValue::from_bytes(&[b'1']).unwrap()); + } + _ => (), + } + } + + Ok(()) +} + +/// Append header entry with comma according to [RFC9110](https://datatracker.ietf.org/doc/html/rfc9110) +pub(super) fn append_header_entry_with_comma(headers: &mut HeaderMap, key: &str, value: &str) -> Result<()> { + match headers.entry(HeaderName::from_bytes(key.as_bytes())?) { + header::Entry::Vacant(entry) => { + entry.insert(value.parse::()?); + } + header::Entry::Occupied(mut entry) => { + // entry.append(value.parse::()?); + let mut new_value = Vec::::with_capacity(entry.get().as_bytes().len() + 2 + value.len()); + new_value.put_slice(entry.get().as_bytes()); + new_value.put_slice(&[b',', b' ']); + new_value.put_slice(value.as_bytes()); + entry.insert(HeaderValue::from_bytes(&new_value)?); + } + } + + Ok(()) +} + +/// Add header entry if not exist +pub(super) fn add_header_entry_if_not_exist( + headers: &mut HeaderMap, + key: impl Into>, + value: impl Into>, +) -> Result<()> { + match headers.entry(HeaderName::from_bytes(key.into().as_bytes())?) { + header::Entry::Vacant(entry) => { + entry.insert(value.into().parse::()?); + } + header::Entry::Occupied(_) => (), + }; + + Ok(()) +} + +/// Overwrite header entry if exist +pub(super) fn add_header_entry_overwrite_if_exist( + headers: &mut HeaderMap, + key: impl Into>, + value: impl Into>, +) -> Result<()> { + match headers.entry(HeaderName::from_bytes(key.into().as_bytes())?) { + header::Entry::Vacant(entry) => { + entry.insert(value.into().parse::()?); + } + header::Entry::Occupied(mut entry) => { + entry.insert(HeaderValue::from_bytes(value.into().as_bytes())?); + } + } + + Ok(()) +} + +/// Align cookie values in single line +/// Sometimes violates [RFC6265](https://www.rfc-editor.org/rfc/rfc6265#section-5.4) (for http/1.1). +/// This is allowed in RFC7540 (for http/2) as mentioned [here](https://stackoverflow.com/questions/4843556/in-http-specification-what-is-the-string-that-separates-cookies). +pub(super) fn make_cookie_single_line(headers: &mut HeaderMap) -> Result<()> { + let cookies = headers + .iter() + .filter(|(k, _)| **k == header::COOKIE) + .map(|(_, v)| v.to_str().unwrap_or("")) + .collect::>() + .join("; "); + if !cookies.is_empty() { + headers.remove(header::COOKIE); + headers.insert(header::COOKIE, HeaderValue::from_bytes(cookies.as_bytes())?); + } + Ok(()) +} + +/// Add forwarding headers like `x-forwarded-for`. +pub(super) fn add_forwarding_header( + headers: &mut HeaderMap, + client_addr: &SocketAddr, + listen_addr: &SocketAddr, + tls: bool, + uri_str: &str, +) -> Result<()> { + // default process + // optional process defined by upstream_option is applied in fn apply_upstream_options + let canonical_client_addr = client_addr.to_canonical().ip().to_string(); + append_header_entry_with_comma(headers, "x-forwarded-for", &canonical_client_addr)?; + + // Single line cookie header + // TODO: This should be only for HTTP/1.1. For 2+, this can be multi-lined. + make_cookie_single_line(headers)?; + + /////////// As Nginx + // If we receive X-Forwarded-Proto, pass it through; otherwise, pass along the + // scheme used to connect to this server + add_header_entry_if_not_exist(headers, "x-forwarded-proto", if tls { "https" } else { "http" })?; + // If we receive X-Forwarded-Port, pass it through; otherwise, pass along the + // server port the client connected to + add_header_entry_if_not_exist(headers, "x-forwarded-port", listen_addr.port().to_string())?; + + /////////// As Nginx-Proxy + // x-real-ip + add_header_entry_overwrite_if_exist(headers, "x-real-ip", canonical_client_addr)?; + // x-forwarded-ssl + add_header_entry_overwrite_if_exist(headers, "x-forwarded-ssl", if tls { "on" } else { "off" })?; + // x-original-uri + add_header_entry_overwrite_if_exist(headers, "x-original-uri", uri_str.to_string())?; + // proxy + add_header_entry_overwrite_if_exist(headers, "proxy", "")?; + + Ok(()) +} + +/// Remove connection header +pub(super) fn remove_connection_header(headers: &mut HeaderMap) { + if let Some(values) = headers.get(header::CONNECTION) { + if let Ok(v) = values.clone().to_str() { + for m in v.split(',') { + if !m.is_empty() { + headers.remove(m.trim()); + } + } + } + } +} + +/// Hop header values which are removed at proxy +const HOP_HEADERS: &[&str] = &[ + "connection", + "te", + "trailer", + "keep-alive", + "proxy-connection", + "proxy-authenticate", + "proxy-authorization", + "transfer-encoding", + "upgrade", +]; + +/// Remove hop headers +pub(super) fn remove_hop_header(headers: &mut HeaderMap) { + HOP_HEADERS.iter().for_each(|key| { + headers.remove(*key); + }); +} + +/// Extract upgrade header value if exist +pub(super) fn extract_upgrade(headers: &HeaderMap) -> Option { + if let Some(c) = headers.get(header::CONNECTION) { + if c + .to_str() + .unwrap_or("") + .split(',') + .any(|w| w.trim().to_ascii_lowercase() == header::UPGRADE.as_str().to_ascii_lowercase()) + { + if let Some(u) = headers.get(header::UPGRADE) { + if let Ok(m) = u.to_str() { + debug!("Upgrade in request header: {}", m); + return Some(m.to_owned()); + } + } + } + } + None +} diff --git a/rpxy-lib/src/message_handle/utils_request.rs b/rpxy-lib/src/message_handle/utils_request.rs index a8f9bd4..37d5e0b 100644 --- a/rpxy-lib/src/message_handle/utils_request.rs +++ b/rpxy-lib/src/message_handle/utils_request.rs @@ -1,43 +1,71 @@ -use super::http_result::*; +use crate::backend::{UpstreamCandidates, UpstreamOption}; +use anyhow::{anyhow, ensure, Result}; use http::{header, Request}; /// Trait defining parser of hostname -pub trait ParseHost { +/// Inspect and extract hostname from either the request HOST header or request line +pub trait InspectParseHost { type Error; - fn parse_host(&self) -> Result<&[u8], Self::Error>; + fn inspect_parse_host(&self) -> Result, Self::Error>; } -impl ParseHost for Request { - type Error = HttpError; - /// Extract hostname from either the request HOST header or request line - fn parse_host(&self) -> HttpResult<&[u8]> { - let headers_host = self.headers().get(header::HOST); - let uri_host = self.uri().host(); +impl InspectParseHost for Request { + type Error = anyhow::Error; + /// Inspect and extract hostname from either the request HOST header or request line + fn inspect_parse_host(&self) -> Result> { + let drop_port = |v: &[u8]| { + if v.starts_with(&[b'[']) { + // v6 address with bracket case. if port is specified, always it is in this case. + let mut iter = v.split(|ptr| ptr == &b'[' || ptr == &b']'); + iter.next().ok_or(anyhow!("Invalid Host header"))?; // first item is always blank + iter.next().ok_or(anyhow!("Invalid Host header")).map(|b| b.to_owned()) + } else if v.len() - v.split(|v| v == &b':').fold(0, |acc, s| acc + s.len()) >= 2 { + // v6 address case, if 2 or more ':' is contained + Ok(v.to_owned()) + } else { + // v4 address or hostname + v.split(|colon| colon == &b':') + .next() + .ok_or(anyhow!("Invalid Host header")) + .map(|v| v.to_ascii_lowercase()) + } + }; + + let headers_host = self.headers().get(header::HOST).map(|v| drop_port(v.as_bytes())); + let uri_host = self.uri().host().map(|v| drop_port(v.as_bytes())); // let uri_port = self.uri().port_u16(); - if !(!(headers_host.is_none() && uri_host.is_none())) { - return Err(HttpError::NoHostInRequestHeader); - } - // prioritize server_name in uri - uri_host.map_or_else( - || { - let m = headers_host.unwrap().as_bytes(); - if m.starts_with(&[b'[']) { - // v6 address with bracket case. if port is specified, always it is in this case. - let mut iter = m.split(|ptr| ptr == &b'[' || ptr == &b']'); - iter.next().ok_or(HttpError::InvalidHostInRequestHeader)?; // first item is always blank - iter.next().ok_or(HttpError::InvalidHostInRequestHeader) - } else if m.len() - m.split(|v| v == &b':').fold(0, |acc, s| acc + s.len()) >= 2 { - // v6 address case, if 2 or more ':' is contained - Ok(m) - } else { - // v4 address or hostname - m.split(|colon| colon == &b':') - .next() - .ok_or(HttpError::InvalidHostInRequestHeader) - } - }, - |v| Ok(v.as_bytes()), - ) + match (headers_host, uri_host) { + (Some(Ok(hh)), Some(Ok(hu))) => { + ensure!(hh == hu, "Host header and uri host mismatch"); + Ok(hh) + } + (Some(Ok(hh)), None) => Ok(hh), + (None, Some(Ok(hu))) => Ok(hu), + _ => Err(anyhow!("Neither Host header nor uri host is valid")), + } } } + +//////////////////////////////////////////////////// +// Functions to manipulate request line + +/// Apply upstream options in request line, specified in the configuration +pub(super) fn apply_upstream_options_to_request_line( + req: &mut Request, + upstream: &UpstreamCandidates, +) -> anyhow::Result<()> { + for opt in upstream.options.iter() { + match opt { + UpstreamOption::ForceHttp11Upstream => *req.version_mut() = hyper::Version::HTTP_11, + UpstreamOption::ForceHttp2Upstream => { + // case: h2c -> https://www.rfc-editor.org/rfc/rfc9113.txt + // Upgrade from HTTP/1.1 to HTTP/2 is deprecated. So, http-2 prior knowledge is required. + *req.version_mut() = hyper::Version::HTTP_2; + } + _ => (), + } + } + + Ok(()) +}