diff --git a/CHANGELOG.md b/CHANGELOG.md index 20ac679..de8871f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,9 @@ ## 0.7.0 (unreleased) +- Breaking: `hyper`-1.0 for both server and client modules. +- Breaking: Remove `override_host` option in upstream options. Add a reverse option, i.e., `disable_override_host`. That is, `rpxy` always override the host header by the upstream hostname by default. + ## 0.6.2 ### Improvement diff --git a/rpxy-bin/src/log.rs b/rpxy-bin/src/log.rs index 3fcf694..fd7b5cb 100644 --- a/rpxy-bin/src/log.rs +++ b/rpxy-bin/src/log.rs @@ -13,9 +13,9 @@ pub fn init_logger() { .compact(); // This limits the logger to emits only rpxy crate - let level_string = std::env::var(EnvFilter::DEFAULT_ENV).unwrap_or_else(|_| "info".to_string()); - let filter_layer = EnvFilter::new(format!("{}={}", env!("CARGO_PKG_NAME"), level_string)); - // let filter_layer = EnvFilter::from_default_env(); + // let level_string = std::env::var(EnvFilter::DEFAULT_ENV).unwrap_or_else(|_| "info".to_string()); + // let filter_layer = EnvFilter::new(format!("{}={}", env!("CARGO_PKG_NAME"), level_string)); + let filter_layer = EnvFilter::from_default_env(); tracing_subscriber::registry() .with(format_layer) diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index 51631af..5df1060 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -58,6 +58,7 @@ futures-channel = { version = "0.3.29", default-features = false } # "http1", # "http2", # ] } +hyper-tls = { version = "0.6.0", features = ["alpn"] } # tls and cert management hot_reload = "0.1.4" diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index ee47e4f..438e7bb 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -78,6 +78,10 @@ pub enum RpxyError { #[error("Failed to copy bidirectional for upgraded connections: {0}")] FailedToCopyBidirectional(String), + // Forwarder errors + #[error("Failed to fetch from upstream: {0}")] + FailedToFetchFromUpstream(String), + // Upstream connection setting errors #[error("Unsupported upstream option")] UnsupportedUpstreamOption, diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs new file mode 100644 index 0000000..57538fa --- /dev/null +++ b/rpxy-lib/src/forwarder/client.rs @@ -0,0 +1,117 @@ +use crate::{ + error::{RpxyError, RpxyResult}, + globals::Globals, + hyper_ext::{ + body::{wrap_incoming_body_response, IncomingOr}, + rt::LocalExecutor, + }, +}; +use async_trait::async_trait; +use http::{Request, Response, Version}; +use hyper::body::Body; +use hyper_tls::HttpsConnector; +use hyper_util::client::legacy::{ + connect::{Connect, HttpConnector}, + Client, +}; +use std::sync::Arc; + +#[async_trait] +/// Definition of the forwarder that simply forward requests from downstream client to upstream app servers. +pub trait ForwardRequest { + type Error; + async fn request(&self, req: Request) -> Result, Self::Error>; +} + +/// Forwarder http client struct responsible to cache handling +pub struct Forwarder { + // #[cfg(feature = "cache")] + // cache: Option, + inner: Client, + inner_h2: Client, // `h2c` or http/2-only client is defined separately +} + +#[async_trait] +impl ForwardRequest> for Forwarder +where + C: Send + Sync + Connect + Clone + 'static, + B1: Body + Send + Unpin + 'static, + ::Data: Send, + ::Error: Into>, + B2: Body, +{ + type Error = RpxyError; + + async fn request(&self, req: Request) -> Result>, Self::Error> { + self.request_directly(req).await + } +} + +impl Forwarder +where + C: Send + Sync + Connect + Clone + 'static, + B1: Body + Send + Unpin + 'static, + ::Data: Send, + ::Error: Into>, +{ + async fn request_directly(&self, req: Request) -> RpxyResult>> { + match req.version() { + Version::HTTP_2 => self.inner_h2.request(req).await, // handles `h2c` requests + _ => self.inner.request(req).await, + } + .map_err(|e| RpxyError::FailedToFetchFromUpstream(e.to_string())) + .map(wrap_incoming_body_response::) + } +} + +impl Forwarder, B1> +where + B1: Body + Send + Unpin + 'static, + ::Data: Send, + ::Error: Into>, +{ + /// Build forwarder + pub async fn new(_globals: &Arc) -> Self { + // build hyper client with hyper-tls + // TODO: Frame size errorが取れない > H2 どうしようもない。。。。 hyper_rustlsのリリース待ち? + let connector = HttpsConnector::new(); + let executor = LocalExecutor::new(_globals.runtime_handle.clone().clone()); + let inner = Client::builder(executor.clone()).build::<_, B1>(connector); + + let connector = HttpsConnector::new(); + let executor = LocalExecutor::new(_globals.runtime_handle.clone()); + let inner_h2 = Client::builder(executor) + .http2_adaptive_window(true) + .http2_only(true) + .set_host(true) + .build::<_, B1>(connector); + + // #[cfg(feature = "native-roots")] + // let builder = hyper_rustls::HttpsConnectorBuilder::new().with_native_roots(); + // #[cfg(feature = "native-roots")] + // let builder_h2 = hyper_rustls::HttpsConnectorBuilder::new().with_native_roots(); + // #[cfg(feature = "native-roots")] + // info!("Native cert store is used for the connection to backend applications"); + + // #[cfg(not(feature = "native-roots"))] + // let builder = hyper_rustls::HttpsConnectorBuilder::new().with_webpki_roots(); + // #[cfg(not(feature = "native-roots"))] + // let builder_h2 = hyper_rustls::HttpsConnectorBuilder::new().with_webpki_roots(); + // #[cfg(not(feature = "native-roots"))] + // info!("Mozilla WebPKI root certs is used for the connection to backend applications"); + + // let connector = builder.https_or_http().enable_http1().enable_http2().build(); + // let connector_h2 = builder_h2.https_or_http().enable_http2().build(); + + // let inner = Client::builder().build::<_, Body>(connector); + // let inner_h2 = Client::builder().http2_only(true).build::<_, Body>(connector_h2); + + // #[cfg(feature = "cache")] + // { + // let cache = RpxyCache::new(_globals).await; + // Self { inner, inner_h2, cache } + // } + // #[cfg(not(feature = "cache"))] + Self { inner, inner_h2 } + } +} diff --git a/rpxy-lib/src/forwarder/mod.rs b/rpxy-lib/src/forwarder/mod.rs new file mode 100644 index 0000000..1cb67fd --- /dev/null +++ b/rpxy-lib/src/forwarder/mod.rs @@ -0,0 +1,8 @@ +mod client; + +use crate::hyper_ext::body::{IncomingLike, IncomingOr}; +use hyper_tls::HttpsConnector; +use hyper_util::client::legacy::connect::HttpConnector; +pub type Forwarder = client::Forwarder, IncomingOr>; + +pub use client::ForwardRequest; diff --git a/rpxy-lib/src/hyper_ext/body_type.rs b/rpxy-lib/src/hyper_ext/body_type.rs index 516e569..9616306 100644 --- a/rpxy-lib/src/hyper_ext/body_type.rs +++ b/rpxy-lib/src/hyper_ext/body_type.rs @@ -1,3 +1,4 @@ +use http::Response; use http_body_util::{combinators, BodyExt, Either, Empty, Full}; use hyper::body::{Bytes, Incoming}; @@ -6,6 +7,19 @@ pub(crate) type BoxBody = combinators::BoxBody; /// Type for either passthrough body or given body type, specifically synthetic boxed body pub(crate) type IncomingOr = Either; +/// helper function to build http response with passthrough body +pub(crate) fn wrap_incoming_body_response(response: Response) -> Response> +where + B: hyper::body::Body, +{ + response.map(IncomingOr::Left) +} + +/// helper function to build http response with synthetic body +pub(crate) fn wrap_synthetic_body_response(response: Response) -> Response> { + response.map(IncomingOr::Right) +} + /// helper function to build a empty body pub(crate) fn empty() -> BoxBody { Empty::::new().map_err(|never| match never {}).boxed() diff --git a/rpxy-lib/src/hyper_ext/mod.rs b/rpxy-lib/src/hyper_ext/mod.rs index a39aef9..e1b5ae8 100644 --- a/rpxy-lib/src/hyper_ext/mod.rs +++ b/rpxy-lib/src/hyper_ext/mod.rs @@ -8,5 +8,7 @@ pub(crate) mod rt { } pub(crate) mod body { pub(crate) use super::body_incoming_like::IncomingLike; - pub(crate) use super::body_type::{empty, full, BoxBody, IncomingOr}; + pub(crate) use super::body_type::{ + empty, full, wrap_incoming_body_response, wrap_synthetic_body_response, BoxBody, IncomingOr, + }; } diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index d13ade1..c78e8c8 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -3,6 +3,7 @@ mod constants; mod count; mod crypto; mod error; +mod forwarder; mod globals; mod hyper_ext; mod log; @@ -11,8 +12,8 @@ mod name_exp; mod proxy; use crate::{ - crypto::build_cert_reloader, error::*, globals::Globals, log::*, message_handler::HttpMessageHandlerBuilder, - proxy::Proxy, + crypto::build_cert_reloader, error::*, forwarder::Forwarder, globals::Globals, log::*, + message_handler::HttpMessageHandlerBuilder, proxy::Proxy, }; use futures::future::select_all; use std::sync::Arc; @@ -90,10 +91,12 @@ where }); // 4. build message handler containing Arc-ed http_client and backends, and make it contained in Arc as well + let forwarder = Arc::new(Forwarder::new(&globals).await); let message_handler = Arc::new( HttpMessageHandlerBuilder::default() .globals(globals.clone()) .app_manager(app_manager.clone()) + .forwarder(forwarder) .build()?, ); diff --git a/rpxy-lib/src/message_handler/handler_main.rs b/rpxy-lib/src/message_handler/handler_main.rs index 922e024..d94d2c9 100644 --- a/rpxy-lib/src/message_handler/handler_main.rs +++ b/rpxy-lib/src/message_handler/handler_main.rs @@ -9,6 +9,7 @@ use crate::{ backend::{BackendAppManager, LoadBalanceContext}, crypto::CryptoSource, error::*, + forwarder::{ForwardRequest, Forwarder}, globals::Globals, hyper_ext::body::{BoxBody, IncomingLike, IncomingOr}, log::*, @@ -18,7 +19,7 @@ use derive_builder::Builder; use http::{Request, Response, StatusCode}; use hyper_util::rt::TokioIo; use std::{net::SocketAddr, sync::Arc}; -use tokio::io::copy_bidirectional; +use tokio::{io::copy_bidirectional, time::timeout}; #[allow(dead_code)] #[derive(Debug)] @@ -36,10 +37,9 @@ pub(super) struct HandlerContext { // pub struct HttpMessageHandler pub struct HttpMessageHandler where - // T: Connect + Clone + Sync + Send + 'static, U: CryptoSource + Clone, { - // forwarder: Arc>, + forwarder: Arc, pub(super) globals: Arc, app_manager: Arc>, } @@ -81,7 +81,7 @@ where Ok(v) } Err(e) => { - debug!("{e}"); + error!("{e}"); let code = StatusCode::from(e); log_data.status_code(&code).output(); synthetic_error_response(code) @@ -155,14 +155,14 @@ where tls_enabled, ) { Err(e) => { - error!("Failed to generate upstream request for backend application: {}", e); return Err(HttpError::FailedToGenerateUpstreamRequest(e.to_string())); } Ok(v) => v, }; debug!( - "Request to be forwarded: uri {}, version {:?}, headers {:?}", + "Request to be forwarded: [uri {}, method: {}, version {:?}, headers {:?}]", req.uri(), + req.method(), req.version(), req.headers() ); @@ -171,37 +171,31 @@ where ////// ////////////// - // // TODO: remove later - let body = crate::hyper_ext::body::full(hyper::body::Bytes::from("not yet implemented")); - let mut res_backend = super::synthetic_response::synthetic_response(Response::builder().body(body).unwrap()); - // // Forward request to a chosen backend - // let mut res_backend = { - // let Ok(result) = timeout(self.globals.proxy_config.upstream_timeout, self.forwarder.request(req)).await else { - // return self.return_with_error_log(StatusCode::GATEWAY_TIMEOUT, &mut log_data); - // }; - // match result { - // Ok(res) => res, - // Err(e) => { - // error!("Failed to get response from backend: {}", e); - // return self.return_with_error_log(StatusCode::SERVICE_UNAVAILABLE, &mut log_data); - // } - // } - // }; + // Forward request to a chosen backend + let mut res_backend = { + let Ok(result) = timeout(self.globals.proxy_config.upstream_timeout, self.forwarder.request(req)).await else { + return Err(HttpError::TimeoutUpstreamRequest); + }; + match result { + Ok(res) => res, + Err(e) => { + return Err(HttpError::FailedToGetResponseFromBackend(e.to_string())); + } + } + }; ////////////// // Process reverse proxy context generated during the forwarding request generation. #[cfg(feature = "sticky-cookie")] if let Some(context_from_lb) = _context.context_lb { let res_headers = res_backend.headers_mut(); if let Err(e) = set_sticky_cookie_lb_context(res_headers, &context_from_lb) { - error!("Failed to append context to the response given from backend: {}", e); - return Err(HttpError::FailedToAddSetCookeInResponse); + return Err(HttpError::FailedToAddSetCookeInResponse(e.to_string())); } } if res_backend.status() != StatusCode::SWITCHING_PROTOCOLS { // Generate response to client if let Err(e) = self.generate_response_forwarded(&mut res_backend, backend_app) { - error!("Failed to generate downstream response for clients: {}", e); return Err(HttpError::FailedToGenerateDownstreamResponse(e.to_string())); } return Ok(res_backend); @@ -215,18 +209,15 @@ where }; if !should_upgrade { - error!( + return Err(HttpError::FailedToUpgrade(format!( "Backend tried to switch to protocol {:?} when {:?} was requested", upgrade_in_response, upgrade_in_request - ); - return Err(HttpError::FailedToUpgrade); + ))); } let Some(request_upgraded) = request_upgraded else { - error!("Request does not have an upgrade extension"); return Err(HttpError::NoUpgradeExtensionInRequest); }; let Some(onupgrade) = res_backend.extensions_mut().remove::() else { - error!("Response does not have an upgrade extension"); return Err(HttpError::NoUpgradeExtensionInResponse); }; diff --git a/rpxy-lib/src/message_handler/http_result.rs b/rpxy-lib/src/message_handler/http_result.rs index dc77565..857ab55 100644 --- a/rpxy-lib/src/message_handler/http_result.rs +++ b/rpxy-lib/src/message_handler/http_result.rs @@ -20,16 +20,20 @@ pub enum HttpError { FailedToRedirect(String), #[error("No upstream candidates")] NoUpstreamCandidates, - #[error("Failed to generate upstream request: {0}")] + #[error("Failed to generate upstream request for backend application: {0}")] FailedToGenerateUpstreamRequest(String), + #[error("Timeout in upstream request")] + TimeoutUpstreamRequest, + #[error("Failed to get response from backend: {0}")] + FailedToGetResponseFromBackend(String), - #[error("Failed to add set-cookie header in response")] - FailedToAddSetCookeInResponse, - #[error("Failed to generated downstream response: {0}")] + #[error("Failed to add set-cookie header in response {0}")] + FailedToAddSetCookeInResponse(String), + #[error("Failed to generated downstream response for clients: {0}")] FailedToGenerateDownstreamResponse(String), - #[error("Failed to upgrade connection")] - FailedToUpgrade, + #[error("Failed to upgrade connection: {0}")] + FailedToUpgrade(String), #[error("Request does not have an upgrade extension")] NoUpgradeExtensionInRequest, #[error("Response does not have an upgrade extension")] @@ -49,9 +53,10 @@ impl From for StatusCode { HttpError::FailedToRedirect(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::NoUpstreamCandidates => StatusCode::NOT_FOUND, HttpError::FailedToGenerateUpstreamRequest(_) => StatusCode::INTERNAL_SERVER_ERROR, - HttpError::FailedToAddSetCookeInResponse => StatusCode::INTERNAL_SERVER_ERROR, + HttpError::TimeoutUpstreamRequest => StatusCode::GATEWAY_TIMEOUT, + HttpError::FailedToAddSetCookeInResponse(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::FailedToGenerateDownstreamResponse(_) => StatusCode::INTERNAL_SERVER_ERROR, - HttpError::FailedToUpgrade => StatusCode::INTERNAL_SERVER_ERROR, + HttpError::FailedToUpgrade(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::NoUpgradeExtensionInRequest => StatusCode::BAD_REQUEST, HttpError::NoUpgradeExtensionInResponse => StatusCode::BAD_GATEWAY, _ => StatusCode::INTERNAL_SERVER_ERROR, diff --git a/rpxy-lib/src/message_handler/synthetic_response.rs b/rpxy-lib/src/message_handler/synthetic_response.rs index 0038997..60aeeec 100644 --- a/rpxy-lib/src/message_handler/synthetic_response.rs +++ b/rpxy-lib/src/message_handler/synthetic_response.rs @@ -1,25 +1,10 @@ +use super::http_result::{HttpError, HttpResult}; use crate::{ error::*, hyper_ext::body::{empty, BoxBody, IncomingOr}, name_exp::ServerName, }; use http::{Request, Response, StatusCode, Uri}; -use hyper::body::Incoming; - -use super::http_result::{HttpError, HttpResult}; - -/// helper function to build http response with passthrough body -pub(crate) fn passthrough_response(response: Response) -> Response> -where - B: hyper::body::Body, -{ - response.map(IncomingOr::Left) -} - -/// helper function to build http response with synthetic body -pub(crate) fn synthetic_response(response: Response) -> Response> { - response.map(IncomingOr::Right) -} /// build http response with status code of 4xx and 5xx pub(crate) fn synthetic_error_response(status_code: StatusCode) -> RpxyResult>> { diff --git a/rpxy-lib/src/message_handler/utils_request.rs b/rpxy-lib/src/message_handler/utils_request.rs index 37d5e0b..aa4ce42 100644 --- a/rpxy-lib/src/message_handler/utils_request.rs +++ b/rpxy-lib/src/message_handler/utils_request.rs @@ -57,11 +57,11 @@ pub(super) fn apply_upstream_options_to_request_line( ) -> anyhow::Result<()> { for opt in upstream.options.iter() { match opt { - UpstreamOption::ForceHttp11Upstream => *req.version_mut() = hyper::Version::HTTP_11, + UpstreamOption::ForceHttp11Upstream => *req.version_mut() = http::Version::HTTP_11, UpstreamOption::ForceHttp2Upstream => { // case: h2c -> https://www.rfc-editor.org/rfc/rfc9113.txt // Upgrade from HTTP/1.1 to HTTP/2 is deprecated. So, http-2 prior knowledge is required. - *req.version_mut() = hyper::Version::HTTP_2; + *req.version_mut() = http::Version::HTTP_2; } _ => (), } diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index 922b857..5b77263 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -92,13 +92,9 @@ where } /// Serves a request stream from a client - /// TODO: TODO: TODO: TODO: - /// TODO: Body in hyper-0.14 was changed to Incoming in hyper-1.0, and it is not accessible from outside. - /// Thus, we need to implement IncomingLike trait using channel. Also, the backend handler must feed the body in the form of + /// Body in hyper-0.14 was changed to Incoming in hyper-1.0, and it is not accessible from outside. + /// Thus, we needed to implement IncomingLike trait using channel. Also, the backend handler must feed the body in the form of /// Either as body. - /// Also, the downstream from the backend handler could be Incoming, but will be wrapped as Either as well due to H3. - /// Result, E> type includes E as HttpError to generate the status code and related Response. - /// Thus to handle synthetic error messages in BoxBody, the serve() function outputs Response, BoxBody>>>. async fn h3_serve_stream( &self, req: Request<()>, @@ -146,7 +142,7 @@ where Ok(()) as RpxyResult<()> }); - let mut new_req: Request> = Request::from_parts(req_parts, IncomingOr::Right(req_body)); + let new_req: Request> = Request::from_parts(req_parts, IncomingOr::Right(req_body)); // Response> wrapped by RpxyResult let res = self .message_handler diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index f8f04c0..28ad76e 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -26,7 +26,7 @@ use tokio::time::timeout; /// Wrapper function to handle request for HTTP/1.1 and HTTP/2 /// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler async fn serve_request( - mut req: Request, + req: Request, // handler: Arc>, handler: Arc>, client_addr: SocketAddr,