implement native-tls client

This commit is contained in:
Jun Kurihara 2023-11-29 17:24:07 +09:00
commit 48a84a77cb
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
12 changed files with 90 additions and 69 deletions

View file

@ -45,22 +45,25 @@ async-trait = "0.1.74"
anyhow = "1.0.75" anyhow = "1.0.75"
thiserror = "1.0.50" thiserror = "1.0.50"
# http # http for both server and client
http = "1.0.0" http = "1.0.0"
http-body-util = "0.1.0" http-body-util = "0.1.0"
hyper = { version = "1.0.1", default-features = false } hyper = { version = "1.0.1", default-features = false }
hyper-util = { version = "0.1.1", features = ["full"] } hyper-util = { version = "0.1.1", features = ["full"] }
futures-util = { version = "0.3.29", default-features = false } futures-util = { version = "0.3.29", default-features = false }
futures-channel = { version = "0.3.29", default-features = false } futures-channel = { version = "0.3.29", default-features = false }
# http client
hyper-tls = { version = "0.6.0", features = ["alpn"] }
tokio-native-tls = { version = "0.3.1" }
# hyper-rustls = { version = "0.24.2", default-features = false, features = [ # hyper-rustls = { version = "0.24.2", default-features = false, features = [
# "tokio-runtime", # "tokio-runtime",
# "webpki-tokio", # "webpki-tokio",
# "http1", # "http1",
# "http2", # "http2",
# ] } # ] }
hyper-tls = { version = "0.6.0", features = ["alpn"] }
# tls and cert management # tls and cert management for server
hot_reload = "0.1.4" hot_reload = "0.1.4"
rustls = { version = "0.21.9", default-features = false } rustls = { version = "0.21.9", default-features = false }
tokio-rustls = { version = "0.24.1", features = ["early-data"] } tokio-rustls = { version = "0.24.1", features = ["early-data"] }

View file

@ -79,6 +79,8 @@ pub enum RpxyError {
FailedToCopyBidirectional(String), FailedToCopyBidirectional(String),
// Forwarder errors // Forwarder errors
#[error("Failed to build forwarder: {0}")]
FailedToBuildForwarder(String),
#[error("Failed to fetch from upstream: {0}")] #[error("Failed to fetch from upstream: {0}")]
FailedToFetchFromUpstream(String), FailedToFetchFromUpstream(String),

View file

@ -43,6 +43,8 @@ where
type Error = RpxyError; type Error = RpxyError;
async fn request(&self, req: Request<B1>) -> Result<Response<IncomingOr<B2>>, Self::Error> { async fn request(&self, req: Request<B1>) -> Result<Response<IncomingOr<B2>>, Self::Error> {
// TODO: cache handling
self.request_directly(req).await self.request_directly(req).await
} }
} }
@ -64,6 +66,7 @@ where
} }
} }
/// Build forwarder with hyper-tls (native-tls)
impl<B1> Forwarder<HttpsConnector<HttpConnector>, B1> impl<B1> Forwarder<HttpsConnector<HttpConnector>, B1>
where where
B1: Body + Send + Unpin + 'static, B1: Body + Send + Unpin + 'static,
@ -71,16 +74,29 @@ where
<B1 as Body>::Error: Into<Box<(dyn std::error::Error + Send + Sync + 'static)>>, <B1 as Body>::Error: Into<Box<(dyn std::error::Error + Send + Sync + 'static)>>,
{ {
/// Build forwarder /// Build forwarder
pub async fn new(_globals: &Arc<Globals>) -> Self { pub async fn try_new(_globals: &Arc<Globals>) -> RpxyResult<Self> {
// build hyper client with hyper-tls // build hyper client with hyper-tls
// TODO: Frame size errorが取れない > H2 どうしようもない。。。。 hyper_rustlsのリリース待ち let executor = LocalExecutor::new(_globals.runtime_handle.clone());
let connector = HttpsConnector::new();
let executor = LocalExecutor::new(_globals.runtime_handle.clone().clone()); let try_build_connector = |alpns: &[&str]| {
hyper_tls::native_tls::TlsConnector::builder()
.request_alpns(alpns)
.build()
.map_err(|e| RpxyError::FailedToBuildForwarder(e.to_string()))
.map(|tls| {
let mut http = HttpConnector::new();
http.enforce_http(false);
HttpsConnector::from((http, tls.into()))
})
};
let connector = try_build_connector(&["h2", "http/1.1"])?;
let inner = Client::builder(executor.clone()).build::<_, B1>(connector); let inner = Client::builder(executor.clone()).build::<_, B1>(connector);
let connector = HttpsConnector::new(); let connector_h2 = try_build_connector(&["h2"])?;
let executor = LocalExecutor::new(_globals.runtime_handle.clone()); let inner_h2 = Client::builder(executor.clone())
let inner_h2 = Client::builder(executor).http2_only(true).build::<_, B1>(connector); .http2_only(true)
.build::<_, B1>(connector_h2);
// #[cfg(feature = "native-roots")] // #[cfg(feature = "native-roots")]
// let builder = hyper_rustls::HttpsConnectorBuilder::new().with_native_roots(); // let builder = hyper_rustls::HttpsConnectorBuilder::new().with_native_roots();
@ -108,6 +124,6 @@ where
// Self { inner, inner_h2, cache } // Self { inner, inner_h2, cache }
// } // }
// #[cfg(not(feature = "cache"))] // #[cfg(not(feature = "cache"))]
Self { inner, inner_h2 } Ok(Self { inner, inner_h2 })
} }
} }

View file

@ -1,8 +1,6 @@
mod client; mod client;
use crate::hyper_ext::body::{IncomingLike, IncomingOr}; use crate::hyper_ext::body::{IncomingLike, IncomingOr};
use hyper_tls::HttpsConnector; pub type Forwarder<C> = client::Forwarder<C, IncomingOr<IncomingLike>>;
use hyper_util::client::legacy::connect::HttpConnector;
pub type Forwarder = client::Forwarder<HttpsConnector<HttpConnector>, IncomingOr<IncomingLike>>;
pub use client::ForwardRequest; pub use client::ForwardRequest;

View file

@ -91,7 +91,7 @@ where
}); });
// 4. build message handler containing Arc-ed http_client and backends, and make it contained in Arc as well // 4. build message handler containing Arc-ed http_client and backends, and make it contained in Arc as well
let forwarder = Arc::new(Forwarder::new(&globals).await); let forwarder = Arc::new(Forwarder::try_new(&globals).await?);
let message_handler = Arc::new( let message_handler = Arc::new(
HttpMessageHandlerBuilder::default() HttpMessageHandlerBuilder::default()
.globals(globals.clone()) .globals(globals.clone())

View file

@ -17,7 +17,7 @@ use crate::{
}; };
use derive_builder::Builder; use derive_builder::Builder;
use http::{Request, Response, StatusCode}; use http::{Request, Response, StatusCode};
use hyper_util::rt::TokioIo; use hyper_util::{client::legacy::connect::Connect, rt::TokioIo};
use std::{net::SocketAddr, sync::Arc}; use std::{net::SocketAddr, sync::Arc};
use tokio::{io::copy_bidirectional, time::timeout}; use tokio::{io::copy_bidirectional, time::timeout};
@ -34,19 +34,19 @@ pub(super) struct HandlerContext {
#[derive(Clone, Builder)] #[derive(Clone, Builder)]
/// HTTP message handler for requests from clients and responses from backend applications, /// HTTP message handler for requests from clients and responses from backend applications,
/// responsible to manipulate and forward messages to upstream backends and downstream clients. /// responsible to manipulate and forward messages to upstream backends and downstream clients.
// pub struct HttpMessageHandler<T, U> pub struct HttpMessageHandler<U, C>
pub struct HttpMessageHandler<U>
where where
C: Send + Sync + Connect + Clone + 'static,
U: CryptoSource + Clone, U: CryptoSource + Clone,
{ {
forwarder: Arc<Forwarder>, forwarder: Arc<Forwarder<C>>,
pub(super) globals: Arc<Globals>, pub(super) globals: Arc<Globals>,
app_manager: Arc<BackendAppManager<U>>, app_manager: Arc<BackendAppManager<U>>,
} }
impl<U> HttpMessageHandler<U> impl<U, C> HttpMessageHandler<U, C>
where where
// T: Connect + Clone + Sync + Send + 'static, C: Send + Sync + Connect + Clone + 'static,
U: CryptoSource + Clone, U: CryptoSource + Clone,
{ {
/// Handle incoming request message from a client. /// Handle incoming request message from a client.

View file

@ -1,7 +1,4 @@
use super::{ use super::{handler_main::HandlerContext, utils_headers::*, utils_request::update_request_line, HttpMessageHandler};
handler_main::HandlerContext, utils_headers::*, utils_request::apply_upstream_options_to_request_line,
HttpMessageHandler,
};
use crate::{ use crate::{
backend::{BackendApp, UpstreamCandidates}, backend::{BackendApp, UpstreamCandidates},
constants::RESPONSE_HEADER_SERVER, constants::RESPONSE_HEADER_SERVER,
@ -9,11 +6,13 @@ use crate::{
CryptoSource, CryptoSource,
}; };
use anyhow::{anyhow, ensure, Result}; use anyhow::{anyhow, ensure, Result};
use http::{header, uri::Scheme, HeaderValue, Request, Response, Uri, Version}; use http::{header, HeaderValue, Request, Response, Uri};
use hyper_util::client::legacy::connect::Connect;
use std::net::SocketAddr; use std::net::SocketAddr;
impl<U> HttpMessageHandler<U> impl<U, C> HttpMessageHandler<U, C>
where where
C: Send + Sync + Connect + Clone + 'static,
U: CryptoSource + Clone, U: CryptoSource + Clone,
{ {
//////////////////////////////////////////////////// ////////////////////////////////////////////////////
@ -177,18 +176,7 @@ where
.insert(header::CONNECTION, HeaderValue::from_static("upgrade")); .insert(header::CONNECTION, HeaderValue::from_static("upgrade"));
} }
// If not specified (force_httpXX_upstream) and https, version is preserved except for http/3 update_request_line(req, upstream_chosen, upstream_candidates)?;
if upstream_chosen.uri.scheme() == Some(&Scheme::HTTP) {
// Change version to http/1.1 when destination scheme is http
debug!("Change version to http/1.1 when destination scheme is http unless upstream option enabled.");
*req.version_mut() = Version::HTTP_11;
} else if req.version() == Version::HTTP_3 {
// HTTP/3 is always https
debug!("HTTP/3 is currently unsupported for request to upstream.");
*req.version_mut() = Version::HTTP_2;
}
apply_upstream_options_to_request_line(req, upstream_candidates)?;
Ok(context) Ok(context)
} }

View file

@ -1,6 +1,9 @@
use crate::backend::{UpstreamCandidates, UpstreamOption}; use crate::{
backend::{Upstream, UpstreamCandidates, UpstreamOption},
log::*,
};
use anyhow::{anyhow, ensure, Result}; use anyhow::{anyhow, ensure, Result};
use http::{header, Request}; use http::{header, uri::Scheme, Request, Version};
/// Trait defining parser of hostname /// Trait defining parser of hostname
/// Inspect and extract hostname from either the request HOST header or request line /// Inspect and extract hostname from either the request HOST header or request line
@ -50,18 +53,30 @@ impl<B> InspectParseHost for Request<B> {
//////////////////////////////////////////////////// ////////////////////////////////////////////////////
// Functions to manipulate request line // Functions to manipulate request line
/// Apply upstream options in request line, specified in the configuration /// Update request line, e.g., version, and apply upstream options to request line, specified in the configuration
pub(super) fn apply_upstream_options_to_request_line<B>( pub(super) fn update_request_line<B>(
req: &mut Request<B>, req: &mut Request<B>,
upstream: &UpstreamCandidates, upstream_chosen: &Upstream,
upstream_candidates: &UpstreamCandidates,
) -> anyhow::Result<()> { ) -> anyhow::Result<()> {
for opt in upstream.options.iter() { // If not specified (force_httpXX_upstream) and https, version is preserved except for http/3
if upstream_chosen.uri.scheme() == Some(&Scheme::HTTP) {
// Change version to http/1.1 when destination scheme is http
debug!("Change version to http/1.1 when destination scheme is http unless upstream option enabled.");
*req.version_mut() = Version::HTTP_11;
} else if req.version() == Version::HTTP_3 {
// HTTP/3 is always https
debug!("HTTP/3 is currently unsupported for request to upstream.");
*req.version_mut() = Version::HTTP_2;
}
for opt in upstream_candidates.options.iter() {
match opt { match opt {
UpstreamOption::ForceHttp11Upstream => *req.version_mut() = http::Version::HTTP_11, UpstreamOption::ForceHttp11Upstream => *req.version_mut() = Version::HTTP_11,
UpstreamOption::ForceHttp2Upstream => { UpstreamOption::ForceHttp2Upstream => {
// case: h2c -> https://www.rfc-editor.org/rfc/rfc9113.txt // case: h2c -> https://www.rfc-editor.org/rfc/rfc9113.txt
// Upgrade from HTTP/1.1 to HTTP/2 is deprecated. So, http-2 prior knowledge is required. // Upgrade from HTTP/1.1 to HTTP/2 is deprecated. So, http-2 prior knowledge is required.
*req.version_mut() = http::Version::HTTP_2; *req.version_mut() = Version::HTTP_2;
} }
_ => (), _ => (),
} }

View file

@ -9,6 +9,7 @@ use crate::{
use bytes::{Buf, Bytes}; use bytes::{Buf, Bytes};
use http::{Request, Response}; use http::{Request, Response};
use http_body_util::BodyExt; use http_body_util::BodyExt;
use hyper_util::client::legacy::connect::Connect;
use std::{net::SocketAddr, time::Duration}; use std::{net::SocketAddr, time::Duration};
use tokio::time::timeout; use tokio::time::timeout;
@ -17,12 +18,9 @@ use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestSt
#[cfg(feature = "http3-s2n")] #[cfg(feature = "http3-s2n")]
use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream}; use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream};
// use futures::Stream; impl<U, T> Proxy<U, T>
// use hyper_util::client::legacy::connect::Connect;
impl<U> Proxy<U>
where where
// T: Connect + Clone + Sync + Send + 'static, T: Connect + Clone + Sync + Send + 'static,
U: CryptoSource + Clone + Sync + Send + 'static, U: CryptoSource + Clone + Sync + Send + 'static,
{ {
pub(super) async fn h3_serve_connection<C>( pub(super) async fn h3_serve_connection<C>(

View file

@ -19,23 +19,22 @@ use hyper::{
rt::{Read, Write}, rt::{Read, Write},
service::service_fn, service::service_fn,
}; };
use hyper_util::{rt::TokioIo, server::conn::auto::Builder as ConnectionBuilder}; use hyper_util::{client::legacy::connect::Connect, rt::TokioIo, server::conn::auto::Builder as ConnectionBuilder};
use std::{net::SocketAddr, sync::Arc, time::Duration}; use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::time::timeout; use tokio::time::timeout;
/// Wrapper function to handle request for HTTP/1.1 and HTTP/2 /// Wrapper function to handle request for HTTP/1.1 and HTTP/2
/// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler /// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler
async fn serve_request<U>( async fn serve_request<U, T>(
req: Request<Incoming>, req: Request<Incoming>,
// handler: Arc<HttpMessageHandler<T, U>>, handler: Arc<HttpMessageHandler<U, T>>,
handler: Arc<HttpMessageHandler<U>>,
client_addr: SocketAddr, client_addr: SocketAddr,
listen_addr: SocketAddr, listen_addr: SocketAddr,
tls_enabled: bool, tls_enabled: bool,
tls_server_name: Option<ServerName>, tls_server_name: Option<ServerName>,
) -> RpxyResult<Response<IncomingOr<BoxBody>>> ) -> RpxyResult<Response<IncomingOr<BoxBody>>>
where where
// T: Connect + Clone + Sync + Send + 'static, T: Send + Sync + Connect + Clone,
U: CryptoSource + Clone, U: CryptoSource + Clone,
{ {
handler handler
@ -51,9 +50,9 @@ where
#[derive(Clone)] #[derive(Clone)]
/// Proxy main object responsible to serve requests received from clients at the given socket address. /// Proxy main object responsible to serve requests received from clients at the given socket address.
pub(crate) struct Proxy<U, E = LocalExecutor> pub(crate) struct Proxy<U, T, E = LocalExecutor>
where where
// T: Connect + Clone + Sync + Send + 'static, T: Send + Sync + Connect + Clone + 'static,
U: CryptoSource + Clone + Sync + Send + 'static, U: CryptoSource + Clone + Sync + Send + 'static,
{ {
/// global context shared among async tasks /// global context shared among async tasks
@ -65,12 +64,12 @@ where
/// hyper connection builder serving http request /// hyper connection builder serving http request
pub connection_builder: Arc<ConnectionBuilder<E>>, pub connection_builder: Arc<ConnectionBuilder<E>>,
/// message handler serving incoming http request /// message handler serving incoming http request
pub message_handler: Arc<HttpMessageHandler<U>>, pub message_handler: Arc<HttpMessageHandler<U, T>>,
} }
impl<U> Proxy<U> impl<U, T> Proxy<U, T>
where where
// T: Connect + Clone + Sync + Send + 'static, T: Send + Sync + Connect + Clone + 'static,
U: CryptoSource + Clone + Sync + Send + 'static, U: CryptoSource + Clone + Sync + Send + 'static,
{ {
/// Serves requests from clients /// Serves requests from clients

View file

@ -6,14 +6,14 @@ use crate::{
log::*, log::*,
name_exp::ByteName, name_exp::ByteName,
}; };
// use hyper_util::client::legacy::connect::Connect; use hyper_util::client::legacy::connect::Connect;
use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerConfig, TransportConfig}; use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerConfig, TransportConfig};
use rustls::ServerConfig; use rustls::ServerConfig;
use std::sync::Arc; use std::sync::Arc;
impl<U> Proxy<U> impl<U, T> Proxy<U, T>
where where
// T: Connect + Clone + Sync + Send + 'static, T: Send + Sync + Connect + Clone + 'static,
U: CryptoSource + Clone + Sync + Send + 'static, U: CryptoSource + Clone + Sync + Send + 'static,
{ {
pub(super) async fn h3_listener_service(&self) -> RpxyResult<()> { pub(super) async fn h3_listener_service(&self) -> RpxyResult<()> {

View file

@ -1,18 +1,20 @@
use super::proxy_main::Proxy; use super::proxy_main::Proxy;
use crate::{ use crate::{
crypto::CryptoSource,
crypto::{ServerCrypto, ServerCryptoBase}, crypto::{ServerCrypto, ServerCryptoBase},
error::*, error::*,
log::*, log::*,
name_exp::ByteName, name_exp::ByteName,
}; };
use anyhow::anyhow;
use hot_reload::ReloaderReceiver; use hot_reload::ReloaderReceiver;
use std::sync::Arc; use hyper_util::client::legacy::connect::Connect;
// use hyper_util::client::legacy::connect::Connect;
use s2n_quic::provider; use s2n_quic::provider;
use std::sync::Arc;
impl<U> Proxy<U> impl<U, T> Proxy<U, T>
where where
// T: Connect + Clone + Sync + Send + 'static, T: Connect + Clone + Sync + Send + 'static,
U: CryptoSource + Clone + Sync + Send + 'static, U: CryptoSource + Clone + Sync + Send + 'static,
{ {
/// Start UDP proxy serving with HTTP/3 request for configured host names /// Start UDP proxy serving with HTTP/3 request for configured host names