From a40d8a00721246934cfac96d293359ad9e0e6843 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Fri, 28 Jul 2023 15:18:21 +0900 Subject: [PATCH] refactor http handler --- quinn | 2 +- rpxy-bin/Cargo.toml | 4 +- rpxy-lib/src/handler/handler_main.rs | 124 ++++++++++--------- rpxy-lib/src/handler/mod.rs | 1 + rpxy-lib/src/handler/utils_headers.rs | 27 ++-- rpxy-lib/src/handler/utils_request.rs | 3 + rpxy-lib/src/handler/utils_synth_response.rs | 2 + 7 files changed, 91 insertions(+), 72 deletions(-) diff --git a/quinn b/quinn index 0ae7c60..532ba7d 160000 --- a/quinn +++ b/quinn @@ -1 +1 @@ -Subproject commit 0ae7c60b15637d7343410ba1e5cc3151e3814557 +Subproject commit 532ba7d80405ad083fd05546fa71becbe5eff1a4 diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index f4bdc68..0ed54d2 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -20,7 +20,7 @@ rpxy-lib = { path = "../rpxy-lib/", features = ["http3", "sticky-cookie"] } anyhow = "1.0.72" rustc-hash = "1.1.0" -serde = { version = "1.0.174", default-features = false, features = ["derive"] } +serde = { version = "1.0.177", default-features = false, features = ["derive"] } derive_builder = "0.12.0" tokio = { version = "1.29.1", default-features = false, features = [ "net", @@ -43,7 +43,7 @@ tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } [target.'cfg(not(target_env = "msvc"))'.dependencies] -tikv-jemallocator = "0.5.0" +tikv-jemallocator = "0.5.4" [dev-dependencies] diff --git a/rpxy-lib/src/handler/handler_main.rs b/rpxy-lib/src/handler/handler_main.rs index c23fd24..f6c4dc7 100644 --- a/rpxy-lib/src/handler/handler_main.rs +++ b/rpxy-lib/src/handler/handler_main.rs @@ -19,6 +19,8 @@ use std::{env, net::SocketAddr, sync::Arc}; use tokio::{io::copy_bidirectional, time::timeout}; #[derive(Clone, Builder)] +/// HTTP message handler for requests from clients and responses from backend applications, +/// responsible to manipulate and forward messages to upstream backends and downstream clients. pub struct HttpMessageHandler where T: Connect + Clone + Sync + Send + 'static, @@ -33,11 +35,13 @@ where T: Connect + Clone + Sync + Send + 'static, U: CryptoSource + Clone, { + /// Return with an arbitrary status code of error and log message fn return_with_error_log(&self, status_code: StatusCode, log_data: &mut MessageLog) -> Result> { log_data.status_code(&status_code).output(); http_error(status_code) } + /// Handle incoming request message from a client pub async fn handle_request( self, mut req: Request, @@ -65,13 +69,15 @@ where } } // Find backend application for given server_name, and drop if incoming request is invalid as request. - let backend = if let Some(be) = self.globals.backends.apps.get(&server_name) { - be - } else if let Some(default_server_name) = &self.globals.backends.default_server_name_bytes { - debug!("Serving by default app"); - self.globals.backends.apps.get(default_server_name).unwrap() - } else { - return self.return_with_error_log(StatusCode::SERVICE_UNAVAILABLE, &mut log_data); + let backend = match self.globals.backends.apps.get(&server_name) { + Some(be) => be, + None => { + let Some(default_server_name) = &self.globals.backends.default_server_name_bytes else { + return self.return_with_error_log(StatusCode::SERVICE_UNAVAILABLE, &mut log_data); + }; + debug!("Serving by default app"); + self.globals.backends.apps.get(default_server_name).unwrap() + } }; // Redirect to https if !tls_enabled and redirect_to_https is true @@ -84,9 +90,8 @@ where // Find reverse proxy for given path and choose one of upstream host // Longest prefix match let path = req.uri().path(); - let upstream_group = match backend.reverse_proxy.get(path) { - Some(ug) => ug, - None => return self.return_with_error_log(StatusCode::NOT_FOUND, &mut log_data), + let Some(upstream_group) = backend.reverse_proxy.get(path) else { + return self.return_with_error_log(StatusCode::NOT_FOUND, &mut log_data) }; // Upgrade in request header @@ -113,19 +118,17 @@ where log_data.upstream(req.uri()); ////// - // Forward request to + // Forward request to a chosen backend let mut res_backend = { - match timeout(self.globals.proxy_config.upstream_timeout, self.forwarder.request(req)).await { - Err(_) => { - return self.return_with_error_log(StatusCode::GATEWAY_TIMEOUT, &mut log_data); + let Ok(result) = timeout(self.globals.proxy_config.upstream_timeout, self.forwarder.request(req)).await else { + return self.return_with_error_log(StatusCode::GATEWAY_TIMEOUT, &mut log_data); + }; + match result { + Ok(res) => res, + Err(e) => { + error!("Failed to get response from backend: {}", e); + return self.return_with_error_log(StatusCode::SERVICE_UNAVAILABLE, &mut log_data); } - Ok(x) => match x { - Ok(res) => res, - Err(e) => { - error!("Failed to get response from backend: {}", e); - return self.return_with_error_log(StatusCode::SERVICE_UNAVAILABLE, &mut log_data); - } - }, } }; @@ -141,62 +144,63 @@ where if res_backend.status() != StatusCode::SWITCHING_PROTOCOLS { // Generate response to client - if self.generate_response_forwarded(&mut res_backend, backend).is_ok() { - log_data.status_code(&res_backend.status()).output(); - return Ok(res_backend); - } else { + if self.generate_response_forwarded(&mut res_backend, backend).is_err() { return self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data); } + log_data.status_code(&res_backend.status()).output(); + return Ok(res_backend); } // Handle StatusCode::SWITCHING_PROTOCOLS in response let upgrade_in_response = extract_upgrade(res_backend.headers()); - if if let (Some(u_req), Some(u_res)) = (upgrade_in_request.as_ref(), upgrade_in_response.as_ref()) { + let should_upgrade = if let (Some(u_req), Some(u_res)) = (upgrade_in_request.as_ref(), upgrade_in_response.as_ref()) + { u_req.to_ascii_lowercase() == u_res.to_ascii_lowercase() } else { false - } { - if let Some(request_upgraded) = request_upgraded { - let Some(onupgrade) = res_backend.extensions_mut().remove::() else { - error!("Response does not have an upgrade extension"); - return self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data); - }; - - self.globals.runtime_handle.spawn(async move { - let mut response_upgraded = onupgrade.await.map_err(|e| { - error!("Failed to upgrade response: {}", e); - RpxyError::Hyper(e) - })?; - let mut request_upgraded = request_upgraded.await.map_err(|e| { - error!("Failed to upgrade request: {}", e); - RpxyError::Hyper(e) - })?; - copy_bidirectional(&mut response_upgraded, &mut request_upgraded) - .await - .map_err(|e| { - error!("Coping between upgraded connections failed: {}", e); - RpxyError::Io(e) - })?; - Ok(()) as Result<()> - }); - log_data.status_code(&res_backend.status()).output(); - Ok(res_backend) - } else { - error!("Request does not have an upgrade extension"); - self.return_with_error_log(StatusCode::BAD_REQUEST, &mut log_data) - } - } else { + }; + if !should_upgrade { error!( "Backend tried to switch to protocol {:?} when {:?} was requested", upgrade_in_response, upgrade_in_request ); - self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data) + return self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data); } + let Some(request_upgraded) = request_upgraded else { + error!("Request does not have an upgrade extension"); + return self.return_with_error_log(StatusCode::BAD_REQUEST, &mut log_data); + }; + let Some(onupgrade) = res_backend.extensions_mut().remove::() else { + error!("Response does not have an upgrade extension"); + return self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data); + }; + + self.globals.runtime_handle.spawn(async move { + let mut response_upgraded = onupgrade.await.map_err(|e| { + error!("Failed to upgrade response: {}", e); + RpxyError::Hyper(e) + })?; + let mut request_upgraded = request_upgraded.await.map_err(|e| { + error!("Failed to upgrade request: {}", e); + RpxyError::Hyper(e) + })?; + copy_bidirectional(&mut response_upgraded, &mut request_upgraded) + .await + .map_err(|e| { + error!("Coping between upgraded connections failed: {}", e); + RpxyError::Io(e) + })?; + Ok(()) as Result<()> + }); + log_data.status_code(&res_backend.status()).output(); + Ok(res_backend) } //////////////////////////////////////////////////// // Functions to generate messages + //////////////////////////////////////////////////// + /// Manipulate a response message sent from a backend application to forward downstream to a client. fn generate_response_forwarded(&self, response: &mut Response, chosen_backend: &Backend) -> Result<()> where B: core::fmt::Debug, @@ -208,7 +212,8 @@ where #[cfg(feature = "http3")] { - // TODO: Workaround for avoid h3 for client authentication + // Manipulate ALT_SVC allowing h3 in response message only when mutual TLS is not enabled + // TODO: This is a workaround for avoiding a client authentication in HTTP/3 if self.globals.proxy_config.http3 && chosen_backend .crypto_source @@ -241,6 +246,7 @@ where } #[allow(clippy::too_many_arguments)] + /// Manipulate a request message sent from a client to forward upstream to a backend application fn generate_request_forwarded( &self, client_addr: &SocketAddr, diff --git a/rpxy-lib/src/handler/mod.rs b/rpxy-lib/src/handler/mod.rs index 8bec011..aed9831 100644 --- a/rpxy-lib/src/handler/mod.rs +++ b/rpxy-lib/src/handler/mod.rs @@ -9,6 +9,7 @@ pub use handler_main::{HttpMessageHandler, HttpMessageHandlerBuilder, HttpMessag #[allow(dead_code)] #[derive(Debug)] +/// Context object to handle sticky cookies at HTTP message handler struct HandlerContext { #[cfg(feature = "sticky-cookie")] context_lb: Option, diff --git a/rpxy-lib/src/handler/utils_headers.rs b/rpxy-lib/src/handler/utils_headers.rs index 944d4d9..d09df79 100644 --- a/rpxy-lib/src/handler/utils_headers.rs +++ b/rpxy-lib/src/handler/utils_headers.rs @@ -8,7 +8,7 @@ use hyper::{ header::{self, HeaderMap, HeaderName, HeaderValue}, Uri, }; -use std::net::SocketAddr; +use std::{borrow::Cow, net::SocketAddr}; //////////////////////////////////////////////////// // Functions to manipulate headers @@ -83,6 +83,7 @@ pub(super) fn set_sticky_cookie_lb_context(headers: &mut HeaderMap, context_from Ok(()) } +/// Apply options to request header, which are specified in the configuration pub(super) fn apply_upstream_options_to_header( headers: &mut HeaderMap, _client_addr: &SocketAddr, @@ -113,7 +114,7 @@ pub(super) fn apply_upstream_options_to_header( Ok(()) } -// https://datatracker.ietf.org/doc/html/rfc9110 +/// Append header entry with comma according to [RFC9110](https://datatracker.ietf.org/doc/html/rfc9110) pub(super) fn append_header_entry_with_comma(headers: &mut HeaderMap, key: &str, value: &str) -> Result<()> { match headers.entry(HeaderName::from_bytes(key.as_bytes())?) { header::Entry::Vacant(entry) => { @@ -132,10 +133,11 @@ pub(super) fn append_header_entry_with_comma(headers: &mut HeaderMap, key: &str, Ok(()) } +/// Add header entry if not exist pub(super) fn add_header_entry_if_not_exist( headers: &mut HeaderMap, - key: impl Into>, - value: impl Into>, + key: impl Into>, + value: impl Into>, ) -> Result<()> { match headers.entry(HeaderName::from_bytes(key.into().as_bytes())?) { header::Entry::Vacant(entry) => { @@ -147,10 +149,11 @@ pub(super) fn add_header_entry_if_not_exist( Ok(()) } +/// Overwrite header entry if exist pub(super) fn add_header_entry_overwrite_if_exist( headers: &mut HeaderMap, - key: impl Into>, - value: impl Into>, + key: impl Into>, + value: impl Into>, ) -> Result<()> { match headers.entry(HeaderName::from_bytes(key.into().as_bytes())?) { header::Entry::Vacant(entry) => { @@ -164,11 +167,10 @@ pub(super) fn add_header_entry_overwrite_if_exist( Ok(()) } +/// Align cookie values in single line +/// Sometimes violates [RFC6265](https://www.rfc-editor.org/rfc/rfc6265#section-5.4) (for http/1.1). +/// This is allowed in RFC7540 (for http/2) as mentioned [here](https://stackoverflow.com/questions/4843556/in-http-specification-what-is-the-string-that-separates-cookies). pub(super) fn make_cookie_single_line(headers: &mut HeaderMap) -> Result<()> { - // Sometimes violates RFC6265 (for http/1.1). - // https://www.rfc-editor.org/rfc/rfc6265#section-5.4 - // This is allowed in RFC7540 (for http/2). - // https://stackoverflow.com/questions/4843556/in-http-specification-what-is-the-string-that-separates-cookies let cookies = headers .iter() .filter(|(k, _)| **k == hyper::header::COOKIE) @@ -182,6 +184,7 @@ pub(super) fn make_cookie_single_line(headers: &mut HeaderMap) -> Result<()> { Ok(()) } +/// Add forwarding headers like `x-forwarded-for`. pub(super) fn add_forwarding_header( headers: &mut HeaderMap, client_addr: &SocketAddr, @@ -219,6 +222,7 @@ pub(super) fn add_forwarding_header( Ok(()) } +/// Remove connection header pub(super) fn remove_connection_header(headers: &mut HeaderMap) { if let Some(values) = headers.get(header::CONNECTION) { if let Ok(v) = values.clone().to_str() { @@ -231,6 +235,7 @@ pub(super) fn remove_connection_header(headers: &mut HeaderMap) { } } +/// Hop header values which are removed at proxy const HOP_HEADERS: &[&str] = &[ "connection", "te", @@ -243,12 +248,14 @@ const HOP_HEADERS: &[&str] = &[ "upgrade", ]; +/// Remove hop headers pub(super) fn remove_hop_header(headers: &mut HeaderMap) { HOP_HEADERS.iter().for_each(|key| { headers.remove(*key); }); } +/// Extract upgrade header value if exist pub(super) fn extract_upgrade(headers: &HeaderMap) -> Option { if let Some(c) = headers.get(header::CONNECTION) { if c diff --git a/rpxy-lib/src/handler/utils_request.rs b/rpxy-lib/src/handler/utils_request.rs index 74e7be7..03e36a1 100644 --- a/rpxy-lib/src/handler/utils_request.rs +++ b/rpxy-lib/src/handler/utils_request.rs @@ -7,6 +7,7 @@ use hyper::{header, Request}; //////////////////////////////////////////////////// // Functions to manipulate request line +/// Apply upstream options in request line, specified in the configuration pub(super) fn apply_upstream_options_to_request_line(req: &mut Request, upstream: &UpstreamGroup) -> Result<()> { for opt in upstream.opts.iter() { match opt { @@ -19,10 +20,12 @@ pub(super) fn apply_upstream_options_to_request_line(req: &mut Request, up Ok(()) } +/// Trait defining parser of hostname pub trait ParseHost { fn parse_host(&self) -> Result<&[u8]>; } impl ParseHost for Request { + /// Extract hostname from either the request HOST header or request line fn parse_host(&self) -> Result<&[u8]> { let headers_host = self.headers().get(header::HOST); let uri_host = self.uri().host(); diff --git a/rpxy-lib/src/handler/utils_synth_response.rs b/rpxy-lib/src/handler/utils_synth_response.rs index e1977f8..baa6987 100644 --- a/rpxy-lib/src/handler/utils_synth_response.rs +++ b/rpxy-lib/src/handler/utils_synth_response.rs @@ -5,11 +5,13 @@ use hyper::{Body, Request, Response, StatusCode, Uri}; //////////////////////////////////////////////////// // Functions to create response (error or redirect) +/// Generate a synthetic response message of a certain error status code pub(super) fn http_error(status_code: StatusCode) -> Result> { let response = Response::builder().status(status_code).body(Body::empty())?; Ok(response) } +/// Generate synthetic response message of a redirection to https host with 301 pub(super) fn secure_redirection( server_name: &str, tls_port: Option,