diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index 5df1060..dd21b39 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -45,22 +45,25 @@ async-trait = "0.1.74" anyhow = "1.0.75" thiserror = "1.0.50" -# http +# http for both server and client http = "1.0.0" http-body-util = "0.1.0" hyper = { version = "1.0.1", default-features = false } hyper-util = { version = "0.1.1", features = ["full"] } futures-util = { version = "0.3.29", default-features = false } futures-channel = { version = "0.3.29", default-features = false } + +# http client +hyper-tls = { version = "0.6.0", features = ["alpn"] } +tokio-native-tls = { version = "0.3.1" } # hyper-rustls = { version = "0.24.2", default-features = false, features = [ # "tokio-runtime", # "webpki-tokio", # "http1", # "http2", # ] } -hyper-tls = { version = "0.6.0", features = ["alpn"] } -# tls and cert management +# tls and cert management for server hot_reload = "0.1.4" rustls = { version = "0.21.9", default-features = false } tokio-rustls = { version = "0.24.1", features = ["early-data"] } diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 438e7bb..a19ca2c 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -79,6 +79,8 @@ pub enum RpxyError { FailedToCopyBidirectional(String), // Forwarder errors + #[error("Failed to build forwarder: {0}")] + FailedToBuildForwarder(String), #[error("Failed to fetch from upstream: {0}")] FailedToFetchFromUpstream(String), diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index 576999b..aa89749 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -43,6 +43,8 @@ where type Error = RpxyError; async fn request(&self, req: Request) -> Result>, Self::Error> { + // TODO: cache handling + self.request_directly(req).await } } @@ -64,6 +66,7 @@ where } } +/// Build forwarder with hyper-tls (native-tls) impl Forwarder, B1> where B1: Body + Send + Unpin + 'static, @@ -71,16 +74,29 @@ where ::Error: Into>, { /// Build forwarder - pub async fn new(_globals: &Arc) -> Self { + pub async fn try_new(_globals: &Arc) -> RpxyResult { // build hyper client with hyper-tls - // TODO: Frame size errorが取れない > H2 どうしようもない。。。。 hyper_rustlsのリリース待ち? - let connector = HttpsConnector::new(); - let executor = LocalExecutor::new(_globals.runtime_handle.clone().clone()); + let executor = LocalExecutor::new(_globals.runtime_handle.clone()); + + let try_build_connector = |alpns: &[&str]| { + hyper_tls::native_tls::TlsConnector::builder() + .request_alpns(alpns) + .build() + .map_err(|e| RpxyError::FailedToBuildForwarder(e.to_string())) + .map(|tls| { + let mut http = HttpConnector::new(); + http.enforce_http(false); + HttpsConnector::from((http, tls.into())) + }) + }; + + let connector = try_build_connector(&["h2", "http/1.1"])?; let inner = Client::builder(executor.clone()).build::<_, B1>(connector); - let connector = HttpsConnector::new(); - let executor = LocalExecutor::new(_globals.runtime_handle.clone()); - let inner_h2 = Client::builder(executor).http2_only(true).build::<_, B1>(connector); + let connector_h2 = try_build_connector(&["h2"])?; + let inner_h2 = Client::builder(executor.clone()) + .http2_only(true) + .build::<_, B1>(connector_h2); // #[cfg(feature = "native-roots")] // let builder = hyper_rustls::HttpsConnectorBuilder::new().with_native_roots(); @@ -108,6 +124,6 @@ where // Self { inner, inner_h2, cache } // } // #[cfg(not(feature = "cache"))] - Self { inner, inner_h2 } + Ok(Self { inner, inner_h2 }) } } diff --git a/rpxy-lib/src/forwarder/mod.rs b/rpxy-lib/src/forwarder/mod.rs index 1cb67fd..13d37eb 100644 --- a/rpxy-lib/src/forwarder/mod.rs +++ b/rpxy-lib/src/forwarder/mod.rs @@ -1,8 +1,6 @@ mod client; use crate::hyper_ext::body::{IncomingLike, IncomingOr}; -use hyper_tls::HttpsConnector; -use hyper_util::client::legacy::connect::HttpConnector; -pub type Forwarder = client::Forwarder, IncomingOr>; +pub type Forwarder = client::Forwarder>; pub use client::ForwardRequest; diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index c78e8c8..da2cabc 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -91,7 +91,7 @@ where }); // 4. build message handler containing Arc-ed http_client and backends, and make it contained in Arc as well - let forwarder = Arc::new(Forwarder::new(&globals).await); + let forwarder = Arc::new(Forwarder::try_new(&globals).await?); let message_handler = Arc::new( HttpMessageHandlerBuilder::default() .globals(globals.clone()) diff --git a/rpxy-lib/src/message_handler/handler_main.rs b/rpxy-lib/src/message_handler/handler_main.rs index d94d2c9..a9fae01 100644 --- a/rpxy-lib/src/message_handler/handler_main.rs +++ b/rpxy-lib/src/message_handler/handler_main.rs @@ -17,7 +17,7 @@ use crate::{ }; use derive_builder::Builder; use http::{Request, Response, StatusCode}; -use hyper_util::rt::TokioIo; +use hyper_util::{client::legacy::connect::Connect, rt::TokioIo}; use std::{net::SocketAddr, sync::Arc}; use tokio::{io::copy_bidirectional, time::timeout}; @@ -34,19 +34,19 @@ pub(super) struct HandlerContext { #[derive(Clone, Builder)] /// HTTP message handler for requests from clients and responses from backend applications, /// responsible to manipulate and forward messages to upstream backends and downstream clients. -// pub struct HttpMessageHandler -pub struct HttpMessageHandler +pub struct HttpMessageHandler where + C: Send + Sync + Connect + Clone + 'static, U: CryptoSource + Clone, { - forwarder: Arc, + forwarder: Arc>, pub(super) globals: Arc, app_manager: Arc>, } -impl HttpMessageHandler +impl HttpMessageHandler where - // T: Connect + Clone + Sync + Send + 'static, + C: Send + Sync + Connect + Clone + 'static, U: CryptoSource + Clone, { /// Handle incoming request message from a client. diff --git a/rpxy-lib/src/message_handler/handler_manipulate_messages.rs b/rpxy-lib/src/message_handler/handler_manipulate_messages.rs index a33e58f..46e572c 100644 --- a/rpxy-lib/src/message_handler/handler_manipulate_messages.rs +++ b/rpxy-lib/src/message_handler/handler_manipulate_messages.rs @@ -1,7 +1,4 @@ -use super::{ - handler_main::HandlerContext, utils_headers::*, utils_request::apply_upstream_options_to_request_line, - HttpMessageHandler, -}; +use super::{handler_main::HandlerContext, utils_headers::*, utils_request::update_request_line, HttpMessageHandler}; use crate::{ backend::{BackendApp, UpstreamCandidates}, constants::RESPONSE_HEADER_SERVER, @@ -9,11 +6,13 @@ use crate::{ CryptoSource, }; use anyhow::{anyhow, ensure, Result}; -use http::{header, uri::Scheme, HeaderValue, Request, Response, Uri, Version}; +use http::{header, HeaderValue, Request, Response, Uri}; +use hyper_util::client::legacy::connect::Connect; use std::net::SocketAddr; -impl HttpMessageHandler +impl HttpMessageHandler where + C: Send + Sync + Connect + Clone + 'static, U: CryptoSource + Clone, { //////////////////////////////////////////////////// @@ -177,18 +176,7 @@ where .insert(header::CONNECTION, HeaderValue::from_static("upgrade")); } - // If not specified (force_httpXX_upstream) and https, version is preserved except for http/3 - if upstream_chosen.uri.scheme() == Some(&Scheme::HTTP) { - // Change version to http/1.1 when destination scheme is http - debug!("Change version to http/1.1 when destination scheme is http unless upstream option enabled."); - *req.version_mut() = Version::HTTP_11; - } else if req.version() == Version::HTTP_3 { - // HTTP/3 is always https - debug!("HTTP/3 is currently unsupported for request to upstream."); - *req.version_mut() = Version::HTTP_2; - } - - apply_upstream_options_to_request_line(req, upstream_candidates)?; + update_request_line(req, upstream_chosen, upstream_candidates)?; Ok(context) } diff --git a/rpxy-lib/src/message_handler/utils_request.rs b/rpxy-lib/src/message_handler/utils_request.rs index aa4ce42..8939433 100644 --- a/rpxy-lib/src/message_handler/utils_request.rs +++ b/rpxy-lib/src/message_handler/utils_request.rs @@ -1,6 +1,9 @@ -use crate::backend::{UpstreamCandidates, UpstreamOption}; +use crate::{ + backend::{Upstream, UpstreamCandidates, UpstreamOption}, + log::*, +}; use anyhow::{anyhow, ensure, Result}; -use http::{header, Request}; +use http::{header, uri::Scheme, Request, Version}; /// Trait defining parser of hostname /// Inspect and extract hostname from either the request HOST header or request line @@ -50,18 +53,30 @@ impl InspectParseHost for Request { //////////////////////////////////////////////////// // Functions to manipulate request line -/// Apply upstream options in request line, specified in the configuration -pub(super) fn apply_upstream_options_to_request_line( +/// Update request line, e.g., version, and apply upstream options to request line, specified in the configuration +pub(super) fn update_request_line( req: &mut Request, - upstream: &UpstreamCandidates, + upstream_chosen: &Upstream, + upstream_candidates: &UpstreamCandidates, ) -> anyhow::Result<()> { - for opt in upstream.options.iter() { + // If not specified (force_httpXX_upstream) and https, version is preserved except for http/3 + if upstream_chosen.uri.scheme() == Some(&Scheme::HTTP) { + // Change version to http/1.1 when destination scheme is http + debug!("Change version to http/1.1 when destination scheme is http unless upstream option enabled."); + *req.version_mut() = Version::HTTP_11; + } else if req.version() == Version::HTTP_3 { + // HTTP/3 is always https + debug!("HTTP/3 is currently unsupported for request to upstream."); + *req.version_mut() = Version::HTTP_2; + } + + for opt in upstream_candidates.options.iter() { match opt { - UpstreamOption::ForceHttp11Upstream => *req.version_mut() = http::Version::HTTP_11, + UpstreamOption::ForceHttp11Upstream => *req.version_mut() = Version::HTTP_11, UpstreamOption::ForceHttp2Upstream => { // case: h2c -> https://www.rfc-editor.org/rfc/rfc9113.txt // Upgrade from HTTP/1.1 to HTTP/2 is deprecated. So, http-2 prior knowledge is required. - *req.version_mut() = http::Version::HTTP_2; + *req.version_mut() = Version::HTTP_2; } _ => (), } diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index 5b77263..813eaa8 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -9,6 +9,7 @@ use crate::{ use bytes::{Buf, Bytes}; use http::{Request, Response}; use http_body_util::BodyExt; +use hyper_util::client::legacy::connect::Connect; use std::{net::SocketAddr, time::Duration}; use tokio::time::timeout; @@ -17,12 +18,9 @@ use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestSt #[cfg(feature = "http3-s2n")] use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream}; -// use futures::Stream; -// use hyper_util::client::legacy::connect::Connect; - -impl Proxy +impl Proxy where - // T: Connect + Clone + Sync + Send + 'static, + T: Connect + Clone + Sync + Send + 'static, U: CryptoSource + Clone + Sync + Send + 'static, { pub(super) async fn h3_serve_connection( diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 28ad76e..9c8e3a5 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -19,23 +19,22 @@ use hyper::{ rt::{Read, Write}, service::service_fn, }; -use hyper_util::{rt::TokioIo, server::conn::auto::Builder as ConnectionBuilder}; +use hyper_util::{client::legacy::connect::Connect, rt::TokioIo, server::conn::auto::Builder as ConnectionBuilder}; use std::{net::SocketAddr, sync::Arc, time::Duration}; use tokio::time::timeout; /// Wrapper function to handle request for HTTP/1.1 and HTTP/2 /// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler -async fn serve_request( +async fn serve_request( req: Request, - // handler: Arc>, - handler: Arc>, + handler: Arc>, client_addr: SocketAddr, listen_addr: SocketAddr, tls_enabled: bool, tls_server_name: Option, ) -> RpxyResult>> where - // T: Connect + Clone + Sync + Send + 'static, + T: Send + Sync + Connect + Clone, U: CryptoSource + Clone, { handler @@ -51,9 +50,9 @@ where #[derive(Clone)] /// Proxy main object responsible to serve requests received from clients at the given socket address. -pub(crate) struct Proxy +pub(crate) struct Proxy where - // T: Connect + Clone + Sync + Send + 'static, + T: Send + Sync + Connect + Clone + 'static, U: CryptoSource + Clone + Sync + Send + 'static, { /// global context shared among async tasks @@ -65,12 +64,12 @@ where /// hyper connection builder serving http request pub connection_builder: Arc>, /// message handler serving incoming http request - pub message_handler: Arc>, + pub message_handler: Arc>, } -impl Proxy +impl Proxy where - // T: Connect + Clone + Sync + Send + 'static, + T: Send + Sync + Connect + Clone + 'static, U: CryptoSource + Clone + Sync + Send + 'static, { /// Serves requests from clients diff --git a/rpxy-lib/src/proxy/proxy_quic_quinn.rs b/rpxy-lib/src/proxy/proxy_quic_quinn.rs index 8380f6e..9c4bf4e 100644 --- a/rpxy-lib/src/proxy/proxy_quic_quinn.rs +++ b/rpxy-lib/src/proxy/proxy_quic_quinn.rs @@ -6,14 +6,14 @@ use crate::{ log::*, name_exp::ByteName, }; -// use hyper_util::client::legacy::connect::Connect; +use hyper_util::client::legacy::connect::Connect; use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerConfig, TransportConfig}; use rustls::ServerConfig; use std::sync::Arc; -impl Proxy +impl Proxy where - // T: Connect + Clone + Sync + Send + 'static, + T: Send + Sync + Connect + Clone + 'static, U: CryptoSource + Clone + Sync + Send + 'static, { pub(super) async fn h3_listener_service(&self) -> RpxyResult<()> { diff --git a/rpxy-lib/src/proxy/proxy_quic_s2n.rs b/rpxy-lib/src/proxy/proxy_quic_s2n.rs index 3ab41d0..13a8802 100644 --- a/rpxy-lib/src/proxy/proxy_quic_s2n.rs +++ b/rpxy-lib/src/proxy/proxy_quic_s2n.rs @@ -1,18 +1,20 @@ use super::proxy_main::Proxy; use crate::{ + crypto::CryptoSource, crypto::{ServerCrypto, ServerCryptoBase}, error::*, log::*, name_exp::ByteName, }; +use anyhow::anyhow; use hot_reload::ReloaderReceiver; -use std::sync::Arc; -// use hyper_util::client::legacy::connect::Connect; +use hyper_util::client::legacy::connect::Connect; use s2n_quic::provider; +use std::sync::Arc; -impl Proxy +impl Proxy where - // T: Connect + Clone + Sync + Send + 'static, + T: Connect + Clone + Sync + Send + 'static, U: CryptoSource + Clone + Sync + Send + 'static, { /// Start UDP proxy serving with HTTP/3 request for configured host names