wip: tested with synthetic echo response from h3

This commit is contained in:
Jun Kurihara 2023-11-24 22:23:57 +09:00
commit 1dc88ce056
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
11 changed files with 732 additions and 37 deletions

View file

@ -12,7 +12,7 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
default = ["http3-s2n", "sticky-cookie", "cache"] default = ["http3-quinn", "sticky-cookie", "cache"]
http3-quinn = ["quinn", "h3", "h3-quinn", "socket2"] http3-quinn = ["quinn", "h3", "h3-quinn", "socket2"]
http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"] http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"]
sticky-cookie = ["base64", "sha2", "chrono"] sticky-cookie = ["base64", "sha2", "chrono"]

View file

@ -1,6 +1,6 @@
[package] [package]
name = "rpxy" name = "rpxy"
version = "0.6.2" version = "0.7.0"
authors = ["Jun Kurihara"] authors = ["Jun Kurihara"]
homepage = "https://github.com/junkurihara/rust-rpxy" homepage = "https://github.com/junkurihara/rust-rpxy"
repository = "https://github.com/junkurihara/rust-rpxy" repository = "https://github.com/junkurihara/rust-rpxy"
@ -12,7 +12,7 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
default = ["http3-s2n", "cache"] default = ["http3-quinn", "cache"]
http3-quinn = ["rpxy-lib/http3-quinn"] http3-quinn = ["rpxy-lib/http3-quinn"]
http3-s2n = ["rpxy-lib/http3-s2n"] http3-s2n = ["rpxy-lib/http3-s2n"]
cache = ["rpxy-lib/cache"] cache = ["rpxy-lib/cache"]
@ -20,7 +20,7 @@ native-roots = ["rpxy-lib/native-roots"]
[dependencies] [dependencies]
rpxy-lib = { path = "../rpxy-lib/", default-features = false, features = [ rpxy-lib = { path = "../rpxy-lib/", default-features = false, features = [
# "sticky-cookie", "sticky-cookie",
] } ] }
anyhow = "1.0.75" anyhow = "1.0.75"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "rpxy-lib" name = "rpxy-lib"
version = "0.6.2" version = "0.7.0"
authors = ["Jun Kurihara"] authors = ["Jun Kurihara"]
homepage = "https://github.com/junkurihara/rust-rpxy" homepage = "https://github.com/junkurihara/rust-rpxy"
repository = "https://github.com/junkurihara/rust-rpxy" repository = "https://github.com/junkurihara/rust-rpxy"
@ -12,9 +12,15 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
default = ["http3-s2n", "sticky-cookie", "cache"] default = ["http3-quinn", "sticky-cookie", "cache"]
http3-quinn = ["socket2", "quinn", "h3", "h3-quinn"] http3-quinn = ["socket2", "quinn", "h3", "h3-quinn"]
http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"] http3-s2n = [
"h3",
"s2n-quic",
"s2n-quic-core",
"s2n-quic-rustls",
"s2n-quic-h3",
]
sticky-cookie = ["base64", "sha2", "chrono"] sticky-cookie = ["base64", "sha2", "chrono"]
cache = [] #"http-cache-semantics", "lru"] cache = [] #"http-cache-semantics", "lru"]
native-roots = [] #"hyper-rustls/native-tokio"] native-roots = [] #"hyper-rustls/native-tokio"]
@ -22,7 +28,7 @@ native-roots = [] #"hyper-rustls/
[dependencies] [dependencies]
rand = "0.8.5" rand = "0.8.5"
rustc-hash = "1.1.0" rustc-hash = "1.1.0"
# bytes = "1.5.0" bytes = "1.5.0"
derive_builder = "0.12.0" derive_builder = "0.12.0"
futures = { version = "0.3.29", features = ["alloc", "async-await"] } futures = { version = "0.3.29", features = ["alloc", "async-await"] }
tokio = { version = "1.34.0", default-features = false, features = [ tokio = { version = "1.34.0", default-features = false, features = [
@ -41,7 +47,7 @@ thiserror = "1.0.50"
# http # http
http = "1.0.0" http = "1.0.0"
# http-body-util = "0.1.0" http-body-util = "0.1.0"
hyper = { version = "1.0.1", default-features = false } hyper = { version = "1.0.1", default-features = false }
hyper-util = { version = "0.1.1", features = ["full"] } hyper-util = { version = "0.1.1", features = ["full"] }
# hyper-rustls = { version = "0.24.2", default-features = false, features = [ # hyper-rustls = { version = "0.24.2", default-features = false, features = [
@ -50,11 +56,11 @@ hyper-util = { version = "0.1.1", features = ["full"] }
# "http1", # "http1",
# "http2", # "http2",
# ] } # ] }
# tokio-rustls = { version = "0.24.1", features = ["early-data"] }
# tls and cert management # tls and cert management
hot_reload = "0.1.4" hot_reload = "0.1.4"
rustls = { version = "0.21.9", default-features = false } rustls = { version = "0.21.9", default-features = false }
tokio-rustls = { version = "0.24.1", features = ["early-data"] }
webpki = "0.22.4" webpki = "0.22.4"
x509-parser = "0.15.1" x509-parser = "0.15.1"
@ -68,6 +74,7 @@ h3-quinn = { path = "../submodules/h3/h3-quinn/", optional = true }
s2n-quic = { version = "1.31.0", default-features = false, features = [ s2n-quic = { version = "1.31.0", default-features = false, features = [
"provider-tls-rustls", "provider-tls-rustls",
], optional = true } ], optional = true }
s2n-quic-core = { version = "0.31.0", default-features = false, optional = true }
s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true } s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true }
s2n-quic-rustls = { version = "0.31.0", optional = true } s2n-quic-rustls = { version = "0.31.0", optional = true }
# for UDP socket wit SO_REUSEADDR when h3 with quinn # for UDP socket wit SO_REUSEADDR when h3 with quinn

View file

@ -11,7 +11,7 @@ use service::CryptoReloader;
use std::sync::Arc; use std::sync::Arc;
pub use certs::{CertsAndKeys, CryptoSource}; pub use certs::{CertsAndKeys, CryptoSource};
pub use service::ServerCryptoBase; pub use service::{ServerCrypto, ServerCryptoBase, SniServerCryptoMap};
/// Result type inner of certificate reloader service /// Result type inner of certificate reloader service
type ReloaderServiceResultInner<T> = ( type ReloaderServiceResultInner<T> = (

View file

@ -6,9 +6,51 @@ pub type RpxyResult<T> = std::result::Result<T, RpxyError>;
/// Describes things that can go wrong in the Rpxy /// Describes things that can go wrong in the Rpxy
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum RpxyError { pub enum RpxyError {
// general errors
#[error("IO error: {0}")] #[error("IO error: {0}")]
Io(#[from] std::io::Error), Io(#[from] std::io::Error),
// TLS errors
#[error("Failed to build TLS acceptor: {0}")]
FailedToTlsHandshake(String),
#[error("No server name in ClientHello")]
NoServerNameInClientHello,
#[error("No TLS serving app: {0}")]
NoTlsServingApp(String),
#[error("Failed to update server crypto: {0}")]
FailedToUpdateServerCrypto(String),
#[error("No server crypto: {0}")]
NoServerCrypto(String),
// hyper errors
#[error("hyper body manipulation error: {0}")]
HyperBodyManipulationError(String),
// http/3 errors
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
#[error("H3 error: {0}")]
H3Error(#[from] h3::Error),
#[cfg(feature = "http3-quinn")]
#[error("Invalid rustls TLS version: {0}")]
QuinnInvalidTlsProtocolVersion(String),
#[cfg(feature = "http3-quinn")]
#[error("Quinn connection error: {0}")]
QuinnConnectionFailed(#[from] quinn::ConnectionError),
#[cfg(feature = "http3-s2n")]
#[error("s2n-quic validation error: {0}")]
S2nQuicValidationError(#[from] s2n_quic_core::transport::parameters::ValidationError),
#[cfg(feature = "http3-s2n")]
#[error("s2n-quic connection error: {0}")]
S2nQuicConnectionError(#[from] s2n_quic_core::connection::Error),
#[cfg(feature = "http3-s2n")]
#[error("s2n-quic start error: {0}")]
S2nQuicStartError(#[from] s2n_quic::provider::StartError),
// certificate reloader errors
#[error("No certificate reloader when building a proxy for TLS")]
NoCertificateReloader,
#[error("Certificate reload error: {0}")] #[error("Certificate reload error: {0}")]
CertificateReloadError(#[from] hot_reload::ReloaderError<crate::crypto::ServerCryptoBase>), CertificateReloadError(#[from] hot_reload::ReloaderError<crate::crypto::ServerCryptoBase>),
@ -20,6 +62,11 @@ pub enum RpxyError {
#[error("Failed to build backend app: {0}")] #[error("Failed to build backend app: {0}")]
FailedToBuildBackendApp(#[from] crate::backend::BackendAppBuilderError), FailedToBuildBackendApp(#[from] crate::backend::BackendAppBuilderError),
// Upstream connection setting errors
#[error("Unsupported upstream option")] #[error("Unsupported upstream option")]
UnsupportedUpstreamOption, UnsupportedUpstreamOption,
// Others
#[error("Infallible")]
Infallible(#[from] std::convert::Infallible),
} }

View file

@ -1,5 +1,9 @@
mod proxy_h3;
mod proxy_main; mod proxy_main;
mod proxy_tls; #[cfg(feature = "http3-quinn")]
mod proxy_quic_quinn;
#[cfg(feature = "http3-s2n")]
mod proxy_quic_s2n;
mod socket; mod socket;
use crate::{globals::Globals, hyper_executor::LocalExecutor}; use crate::{globals::Globals, hyper_executor::LocalExecutor};

View file

@ -0,0 +1,205 @@
use super::proxy_main::Proxy;
use crate::{error::*, log::*, name_exp::ServerName};
use bytes::Bytes;
use http::{Request, Response};
use http_body_util::BodyExt;
use std::{net::SocketAddr, time::Duration};
use tokio::time::timeout;
#[cfg(feature = "http3-quinn")]
use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream};
#[cfg(feature = "http3-s2n")]
use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream};
// use crate::{certs::CryptoSource, error::*, log::*, utils::ServerNameBytesExp};
// use futures::Stream;
// use hyper_util::client::legacy::connect::Connect;
// impl<U> Proxy<U>
// where
// // T: Connect + Clone + Sync + Send + 'static,
// U: CryptoSource + Clone + Sync + Send + 'static,
// {
impl Proxy {
pub(super) async fn h3_serve_connection<C>(
&self,
quic_connection: C,
tls_server_name: ServerName,
client_addr: SocketAddr,
) -> RpxyResult<()>
where
C: ConnectionQuic<Bytes>,
<C as ConnectionQuic<Bytes>>::BidiStream: BidiStream<Bytes> + Send + 'static,
<<C as ConnectionQuic<Bytes>>::BidiStream as BidiStream<Bytes>>::RecvStream: Send,
<<C as ConnectionQuic<Bytes>>::BidiStream as BidiStream<Bytes>>::SendStream: Send,
{
let mut h3_conn = h3::server::Connection::<_, Bytes>::new(quic_connection).await?;
info!(
"QUIC/HTTP3 connection established from {:?} {}",
client_addr,
<&ServerName as TryInto<String>>::try_into(&tls_server_name).unwrap_or_default()
);
// TODO: Is here enough to fetch server_name from NewConnection?
// to avoid deep nested call from listener_service_h3
loop {
// this routine follows hyperium/h3 examples https://github.com/hyperium/h3/blob/master/examples/server.rs
match h3_conn.accept().await {
Ok(None) => {
break;
}
Err(e) => {
warn!("HTTP/3 error on accept incoming connection: {}", e);
match e.get_error_level() {
h3::error::ErrorLevel::ConnectionError => break,
h3::error::ErrorLevel::StreamError => continue,
}
}
Ok(Some((req, stream))) => {
// We consider the connection count separately from the stream count.
// Max clients for h1/h2 = max 'stream' for h3.
let request_count = self.globals.request_count.clone();
if request_count.increment() > self.globals.proxy_config.max_clients {
request_count.decrement();
h3_conn.shutdown(0).await?;
break;
}
debug!("Request incoming: current # {}", request_count.current());
let self_inner = self.clone();
let tls_server_name_inner = tls_server_name.clone();
self.globals.runtime_handle.spawn(async move {
if let Err(e) = timeout(
self_inner.globals.proxy_config.proxy_timeout + Duration::from_secs(1), // timeout per stream are considered as same as one in http2
self_inner.h3_serve_stream(req, stream, client_addr, tls_server_name_inner),
)
.await
{
error!("HTTP/3 failed to process stream: {}", e);
}
request_count.decrement();
debug!("Request processed: current # {}", request_count.current());
});
}
}
}
Ok(())
}
/// Serves a request stream from a client
/// TODO: TODO: TODO: TODO:
/// TODO: Body in hyper-0.14 was changed to Incoming in hyper-1.0, and it is not accessible from outside.
/// Thus, we need to implement IncomingLike trait using channel. Also, the backend handler must feed the body in the form of
/// Either<Incoming, IncomingLike> as body.
/// Also, the downstream from the backend handler could be Incoming, but will be wrapped as Either<Incoming, ()/Empty> as well due to H3.
/// Result<Either<_,_>, E> type includes E as HttpError to generate the status code and related Response<BoxBody>.
/// Thus to handle synthetic error messages in BoxBody, the serve() function outputs Response<Either<Either<Incoming, ()/Empty>, BoxBody>>>.
async fn h3_serve_stream<S>(
&self,
req: Request<()>,
stream: RequestStream<S, Bytes>,
client_addr: SocketAddr,
tls_server_name: ServerName,
) -> RpxyResult<()>
where
S: BidiStream<Bytes> + Send + 'static,
<S as BidiStream<Bytes>>::RecvStream: Send,
{
let (req_parts, _) = req.into_parts();
// split stream and async body handling
let (mut send_stream, mut recv_stream) = stream.split();
// let max_body_size = self.globals.proxy_config.h3_request_max_body_size;
// // let max = body_stream.size_hint().upper().unwrap_or(u64::MAX);
// // if max > max_body_size as u64 {
// // return Err(HttpError::TooLargeRequestBody);
// // }
// let new_req = Request::from_parts(req_parts, body_stream);
// // generate streamed body with trailers using channel
// let (body_sender, req_body) = Incoming::channel();
// // Buffering and sending body through channel for protocol conversion like h3 -> h2/http1.1
// // The underling buffering, i.e., buffer given by the API recv_data.await?, is handled by quinn.
// let max_body_size = self.globals.proxy_config.h3_request_max_body_size;
// self.globals.runtime_handle.spawn(async move {
// // let mut sender = body_sender;
// let mut size = 0usize;
// while let Some(mut body) = recv_stream.recv_data().await? {
// debug!("HTTP/3 incoming request body: remaining {}", body.remaining());
// size += body.remaining();
// if size > max_body_size {
// error!(
// "Exceeds max request body size for HTTP/3: received {}, maximum_allowd {}",
// size, max_body_size
// );
// return Err(RpxyError::Proxy("Exceeds max request body size for HTTP/3".to_string()));
// }
// // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes
// // sender.send_data(body.copy_to_bytes(body.remaining())).await?;
// }
// // trailers: use inner for work around. (directly get trailer)
// let trailers = recv_stream.as_mut().recv_trailers().await?;
// if trailers.is_some() {
// debug!("HTTP/3 incoming request trailers");
// // sender.send_trailers(trailers.unwrap()).await?;
// }
// Ok(())
// });
// let new_req: Request<Incoming> = Request::from_parts(req_parts, req_body);
// let res = self
// .msg_handler
// .clone()
// .handle_request(
// new_req,
// client_addr,
// self.listening_on,
// self.tls_enabled,
// Some(tls_server_name),
// )
// .await?;
// TODO: TODO: TODO: remove later
let body = full(hyper::body::Bytes::from("hello h3 echo"));
let res = Response::builder().body(body).unwrap();
/////////////////
let (new_res_parts, new_body) = res.into_parts();
let new_res = Response::from_parts(new_res_parts, ());
match send_stream.send_response(new_res).await {
Ok(_) => {
debug!("HTTP/3 response to connection successful");
// aggregate body without copying
let body_data = new_body
.collect()
.await
.map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?;
// create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes()
send_stream.send_data(body_data.to_bytes()).await?;
// TODO: needs handling trailer? should be included in body from handler.
}
Err(err) => {
error!("Unable to send response to connection peer: {:?}", err);
}
}
Ok(send_stream.finish().await?)
}
}
//////////////
/// TODO: remove later
/// helper function to build a full body
use http_body_util::Full;
pub(crate) type BoxBody = http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>;
pub fn full(body: hyper::body::Bytes) -> BoxBody {
Full::new(body).map_err(|never| match never {}).boxed()
}
//////////////

View file

@ -1,10 +1,59 @@
use super::socket::bind_tcp_socket; use super::socket::bind_tcp_socket;
use crate::{error::RpxyResult, globals::Globals, log::*}; use crate::{
use hyper_util::server::conn::auto::Builder as ConnectionBuilder; constants::TLS_HANDSHAKE_TIMEOUT_SEC,
use std::{net::SocketAddr, sync::Arc}; crypto::{ServerCrypto, SniServerCryptoMap},
error::*,
globals::Globals,
hyper_executor::LocalExecutor,
log::*,
name_exp::ServerName,
};
use futures::{select, FutureExt};
use http::{Request, Response};
use hyper::{
body::Incoming,
rt::{Read, Write},
service::service_fn,
};
use hyper_util::{rt::TokioIo, server::conn::auto::Builder as ConnectionBuilder};
use std::{net::SocketAddr, sync::Arc, time::Duration};
use tokio::time::timeout;
/// Wrapper function to handle request for HTTP/1.1 and HTTP/2
/// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler
async fn serve_request(
req: Request<Incoming>,
// handler: Arc<HttpMessageHandler<T, U>>,
// handler: Arc<HttpMessageHandler<U>>,
client_addr: SocketAddr,
listen_addr: SocketAddr,
tls_enabled: bool,
tls_server_name: Option<ServerName>,
) -> RpxyResult<Response<BoxBody>> {
// match handler
// .handle_request(req, client_addr, listen_addr, tls_enabled, tls_server_name)
// .await?
// {
// Ok(res) => passthrough_response(res),
// Err(e) => synthetic_error_response(StatusCode::from(e)),
// }
let body = full(hyper::body::Bytes::from("hello"));
let res = Response::builder().body(body).unwrap();
Ok(res)
}
//////////////
/// TODO: remove later
/// helper function to build a full body
use http_body_util::{BodyExt, Full};
pub(crate) type BoxBody = http_body_util::combinators::BoxBody<hyper::body::Bytes, hyper::Error>;
pub fn full(body: hyper::body::Bytes) -> BoxBody {
Full::new(body).map_err(|never| match never {}).boxed()
}
//////////////
#[derive(Clone)]
/// Proxy main object responsible to serve requests received from clients at the given socket address. /// Proxy main object responsible to serve requests received from clients at the given socket address.
pub(crate) struct Proxy<E> { pub(crate) struct Proxy<E = LocalExecutor> {
/// global context shared among async tasks /// global context shared among async tasks
pub globals: Arc<Globals>, pub globals: Arc<Globals>,
/// listen socket address /// listen socket address
@ -15,7 +64,49 @@ pub(crate) struct Proxy<E> {
pub connection_builder: Arc<ConnectionBuilder<E>>, pub connection_builder: Arc<ConnectionBuilder<E>>,
} }
impl<E> Proxy<E> { impl Proxy {
/// Serves requests from clients
fn serve_connection<I>(&self, stream: I, peer_addr: SocketAddr, tls_server_name: Option<ServerName>)
where
I: Read + Write + Send + Unpin + 'static,
{
let request_count = self.globals.request_count.clone();
if request_count.increment() > self.globals.proxy_config.max_clients {
request_count.decrement();
return;
}
debug!("Request incoming: current # {}", request_count.current());
let server_clone = self.connection_builder.clone();
// let msg_handler_clone = self.msg_handler.clone();
let timeout_sec = self.globals.proxy_config.proxy_timeout;
let tls_enabled = self.tls_enabled;
let listening_on = self.listening_on;
self.globals.runtime_handle.clone().spawn(async move {
timeout(
timeout_sec + Duration::from_secs(1),
server_clone.serve_connection_with_upgrades(
stream,
service_fn(move |req: Request<Incoming>| {
serve_request(
req,
// msg_handler_clone.clone(),
peer_addr,
listening_on,
tls_enabled,
tls_server_name.clone(),
)
}),
),
)
.await
.ok();
request_count.decrement();
debug!("Request processed: current # {}", request_count.current());
});
}
/// Start without TLS (HTTP cleartext) /// Start without TLS (HTTP cleartext)
async fn start_without_tls(&self) -> RpxyResult<()> { async fn start_without_tls(&self) -> RpxyResult<()> {
let listener_service = async { let listener_service = async {
@ -23,7 +114,7 @@ impl<E> Proxy<E> {
let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?; let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?;
info!("Start TCP proxy serving with HTTP request for configured host names"); info!("Start TCP proxy serving with HTTP request for configured host names");
while let Ok((stream, client_addr)) = tcp_listener.accept().await { while let Ok((stream, client_addr)) = tcp_listener.accept().await {
// self.serve_connection(TokioIo::new(stream), client_addr, None); self.serve_connection(TokioIo::new(stream), client_addr, None);
} }
Ok(()) as RpxyResult<()> Ok(()) as RpxyResult<()>
}; };
@ -33,14 +124,108 @@ impl<E> Proxy<E> {
/// Start with TLS (HTTPS) /// Start with TLS (HTTPS)
pub(super) async fn start_with_tls(&self) -> RpxyResult<()> { pub(super) async fn start_with_tls(&self) -> RpxyResult<()> {
// let (cert_reloader_service, cert_reloader_rx) = ReloaderService::<CryptoReloader<U>, ServerCryptoBase>::new( #[cfg(not(any(feature = "http3-quinn", feature = "http3-s2n")))]
// &self.globals.clone(), {
// CERTS_WATCH_DELAY_SECS, self.tls_listener_service().await?;
// !LOAD_CERTS_ONLY_WHEN_UPDATED, error!("TCP proxy service for TLS exited");
// ) Ok(())
// .await }
// .map_err(|e| anyhow::anyhow!(e))?; #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
loop {} {
if self.globals.proxy_config.http3 {
select! {
_ = self.tls_listener_service().fuse() => {
error!("TCP proxy service for TLS exited");
},
_ = self.h3_listener_service().fuse() => {
error!("UDP proxy service for QUIC exited");
}
};
Ok(())
} else {
self.tls_listener_service().await?;
error!("TCP proxy service for TLS exited");
Ok(())
}
}
}
// TCP Listener Service, i.e., http/2 and http/1.1
async fn tls_listener_service(&self) -> RpxyResult<()> {
let Some(mut server_crypto_rx) = self.globals.cert_reloader_rx.clone() else {
return Err(RpxyError::NoCertificateReloader);
};
let tcp_socket = bind_tcp_socket(&self.listening_on)?;
let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?;
info!("Start TCP proxy serving with HTTPS request for configured host names");
let mut server_crypto_map: Option<Arc<SniServerCryptoMap>> = None;
loop {
select! {
tcp_cnx = tcp_listener.accept().fuse() => {
if tcp_cnx.is_err() || server_crypto_map.is_none() {
continue;
}
let (raw_stream, client_addr) = tcp_cnx.unwrap();
let sc_map_inner = server_crypto_map.clone();
let self_inner = self.clone();
// spawns async handshake to avoid blocking thread by sequential handshake.
let handshake_fut = async move {
let acceptor = tokio_rustls::LazyConfigAcceptor::new(tokio_rustls::rustls::server::Acceptor::default(), raw_stream).await;
if let Err(e) = acceptor {
return Err(RpxyError::FailedToTlsHandshake(e.to_string()));
}
let start = acceptor.unwrap();
let client_hello = start.client_hello();
let sni = client_hello.server_name();
debug!("HTTP/2 or 1.1: SNI in ClientHello: {:?}", sni.unwrap_or("None"));
let server_name = sni.map(ServerName::from);
if server_name.is_none(){
return Err(RpxyError::NoServerNameInClientHello);
}
let server_crypto = sc_map_inner.as_ref().unwrap().get(server_name.as_ref().unwrap());
if server_crypto.is_none() {
return Err(RpxyError::NoTlsServingApp(server_name.as_ref().unwrap().try_into().unwrap_or_default()));
}
let stream = match start.into_stream(server_crypto.unwrap().clone()).await {
Ok(s) => TokioIo::new(s),
Err(e) => {
return Err(RpxyError::FailedToTlsHandshake(e.to_string()));
}
};
self_inner.serve_connection(stream, client_addr, server_name);
Ok(()) as RpxyResult<()>
};
self.globals.runtime_handle.spawn( async move {
// timeout is introduced to avoid get stuck here.
let Ok(v) = timeout(
Duration::from_secs(TLS_HANDSHAKE_TIMEOUT_SEC),
handshake_fut
).await else {
error!("Timeout to handshake TLS");
return;
};
if let Err(e) = v {
error!("{}", e);
}
});
}
_ = server_crypto_rx.changed().fuse() => {
if server_crypto_rx.borrow().is_none() {
error!("Reloader is broken");
break;
}
let cert_keys_map = server_crypto_rx.borrow().clone().unwrap();
let Some(server_crypto): Option<Arc<ServerCrypto>> = (&cert_keys_map).try_into().ok() else {
error!("Failed to update server crypto");
break;
};
server_crypto_map = Some(server_crypto.inner_local_map.clone());
}
}
}
Ok(()) Ok(())
} }
@ -56,11 +241,11 @@ impl<E> Proxy<E> {
match &self.globals.term_notify { match &self.globals.term_notify {
Some(term) => { Some(term) => {
tokio::select! { select! {
_ = proxy_service => { _ = proxy_service.fuse() => {
warn!("Proxy service got down"); warn!("Proxy service got down");
} }
_ = term.notified() => { _ = term.notified().fuse() => {
info!("Proxy service listening on {} receives term signal", self.listening_on); info!("Proxy service listening on {} receives term signal", self.listening_on);
} }
} }

View file

@ -0,0 +1,121 @@
use super::proxy_main::Proxy;
use super::socket::bind_udp_socket;
use crate::{crypto::ServerCrypto, error::*, log::*, name_exp::ByteName};
// use hyper_util::client::legacy::connect::Connect;
use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerConfig, TransportConfig};
use rustls::ServerConfig;
use std::sync::Arc;
impl Proxy
// where
// // T: Connect + Clone + Sync + Send + 'static,
// U: CryptoSource + Clone + Sync + Send + 'static,
{
pub(super) async fn h3_listener_service(&self) -> RpxyResult<()> {
let Some(mut server_crypto_rx) = self.globals.cert_reloader_rx.clone() else {
return Err(RpxyError::NoCertificateReloader);
};
info!("Start UDP proxy serving with HTTP/3 request for configured host names [quinn]");
// first set as null config server
let rustls_server_config = ServerConfig::builder()
.with_safe_default_cipher_suites()
.with_safe_default_kx_groups()
.with_protocol_versions(&[&rustls::version::TLS13])
.map_err(|e| RpxyError::QuinnInvalidTlsProtocolVersion(e.to_string()))?
.with_no_client_auth()
.with_cert_resolver(Arc::new(rustls::server::ResolvesServerCertUsingSni::new()));
let mut transport_config_quic = TransportConfig::default();
transport_config_quic
.max_concurrent_bidi_streams(self.globals.proxy_config.h3_max_concurrent_bidistream.into())
.max_concurrent_uni_streams(self.globals.proxy_config.h3_max_concurrent_unistream.into())
.max_idle_timeout(
self
.globals
.proxy_config
.h3_max_idle_timeout
.map(|v| quinn::IdleTimeout::try_from(v).unwrap()),
);
let mut server_config_h3 = QuicServerConfig::with_crypto(Arc::new(rustls_server_config));
server_config_h3.transport = Arc::new(transport_config_quic);
server_config_h3.concurrent_connections(self.globals.proxy_config.h3_max_concurrent_connections);
// To reuse address
let udp_socket = bind_udp_socket(&self.listening_on)?;
let runtime = quinn::default_runtime()
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "No async runtime found"))?;
let endpoint = Endpoint::new(
quinn::EndpointConfig::default(),
Some(server_config_h3),
udp_socket,
runtime,
)?;
let mut server_crypto: Option<Arc<ServerCrypto>> = None;
loop {
tokio::select! {
new_conn = endpoint.accept() => {
if server_crypto.is_none() || new_conn.is_none() {
continue;
}
let mut conn: quinn::Connecting = new_conn.unwrap();
let Ok(hsd) = conn.handshake_data().await else {
continue
};
let Ok(hsd_downcast) = hsd.downcast::<HandshakeData>() else {
continue
};
let Some(new_server_name) = hsd_downcast.server_name else {
warn!("HTTP/3 no SNI is given");
continue;
};
debug!(
"HTTP/3 connection incoming (SNI {:?})",
new_server_name
);
// TODO: server_nameをここで出してどんどん深く投げていくのは効率が悪い。connecting -> connectionsの後でいいのでは
// TODO: 通常のTLSと同じenumか何かにまとめたい
let self_clone = self.clone();
self.globals.runtime_handle.spawn(async move {
let client_addr = conn.remote_address();
let quic_connection = match conn.await {
Ok(new_conn) => {
info!("New connection established");
h3_quinn::Connection::new(new_conn)
},
Err(e) => {
warn!("QUIC accepting connection failed: {:?}", e);
return Err(RpxyError::QuinnConnectionFailed(e));
}
};
// Timeout is based on underlying quic
if let Err(e) = self_clone.h3_serve_connection(quic_connection, new_server_name.to_server_name(), client_addr).await {
warn!("QUIC or HTTP/3 connection failed: {}", e);
};
Ok(())
});
}
_ = server_crypto_rx.changed() => {
if server_crypto_rx.borrow().is_none() {
error!("Reloader is broken");
break;
}
let cert_keys_map = server_crypto_rx.borrow().clone().unwrap();
server_crypto = (&cert_keys_map).try_into().ok();
let Some(inner) = server_crypto.clone() else {
error!("Failed to update server crypto for h3");
break;
};
endpoint.set_server_config(Some(QuicServerConfig::with_crypto(inner.clone().inner_global_no_client_auth.clone())));
}
else => break
}
}
endpoint.wait_idle().await;
Ok(()) as RpxyResult<()>
}
}

View file

@ -0,0 +1,132 @@
use super::proxy_main::Proxy;
use crate::{
crypto::{ServerCrypto, ServerCryptoBase},
error::*,
log::*,
name_exp::ByteName,
};
use hot_reload::ReloaderReceiver;
use std::sync::Arc;
// use hyper_util::client::legacy::connect::Connect;
use s2n_quic::provider;
impl Proxy {
/// Start UDP proxy serving with HTTP/3 request for configured host names
pub(super) async fn h3_listener_service(&self) -> RpxyResult<()> {
let Some(mut server_crypto_rx) = self.globals.cert_reloader_rx.clone() else {
return Err(RpxyError::NoCertificateReloader);
};
info!("Start UDP proxy serving with HTTP/3 request for configured host names [s2n-quic]");
// initially wait for receipt
let mut server_crypto: Option<Arc<ServerCrypto>> = {
let _ = server_crypto_rx.changed().await;
let sc = self.receive_server_crypto(server_crypto_rx.clone())?;
Some(sc)
};
// event loop
loop {
tokio::select! {
v = self.h3_listener_service_inner(&server_crypto) => {
if let Err(e) = v {
error!("Quic connection event loop illegally shutdown [s2n-quic] {e}");
break;
}
}
_ = server_crypto_rx.changed() => {
server_crypto = match self.receive_server_crypto(server_crypto_rx.clone()) {
Ok(sc) => Some(sc),
Err(e) => {
error!("{e}");
break;
}
};
}
else => break
}
}
Ok(())
}
/// Receive server crypto from reloader
fn receive_server_crypto(
&self,
server_crypto_rx: ReloaderReceiver<ServerCryptoBase>,
) -> RpxyResult<Arc<ServerCrypto>> {
let cert_keys_map = server_crypto_rx.borrow().clone().ok_or_else(|| {
error!("Reloader is broken");
RpxyError::CertificateReloadError(anyhow!("Reloader is broken").into())
})?;
let server_crypto: Option<Arc<ServerCrypto>> = (&cert_keys_map).try_into().ok();
server_crypto.ok_or_else(|| {
error!("Failed to update server crypto for h3 [s2n-quic]");
RpxyError::FailedToUpdateServerCrypto("Failed to update server crypto for h3 [s2n-quic]".to_string())
})
}
/// Event loop for UDP proxy serving with HTTP/3 request for configured host names
async fn h3_listener_service_inner(&self, server_crypto: &Option<Arc<ServerCrypto>>) -> RpxyResult<()> {
// setup UDP socket
let io = provider::io::tokio::Builder::default()
.with_receive_address(self.listening_on)?
.with_reuse_port()?
.build()?;
// setup limits
let mut limits = provider::limits::Limits::default()
.with_max_open_local_bidirectional_streams(self.globals.proxy_config.h3_max_concurrent_bidistream as u64)?
.with_max_open_remote_bidirectional_streams(self.globals.proxy_config.h3_max_concurrent_bidistream as u64)?
.with_max_open_local_unidirectional_streams(self.globals.proxy_config.h3_max_concurrent_unistream as u64)?
.with_max_open_remote_unidirectional_streams(self.globals.proxy_config.h3_max_concurrent_unistream as u64)?
.with_max_active_connection_ids(self.globals.proxy_config.h3_max_concurrent_connections as u64)?;
limits = if let Some(v) = self.globals.proxy_config.h3_max_idle_timeout {
limits.with_max_idle_timeout(v)?
} else {
limits
};
// setup tls
let Some(server_crypto) = server_crypto else {
warn!("No server crypto is given [s2n-quic]");
return Err(RpxyError::NoServerCrypto(
"No server crypto is given [s2n-quic]".to_string(),
));
};
let tls = server_crypto.inner_global_no_client_auth.clone();
let mut server = s2n_quic::Server::builder()
.with_tls(tls)?
.with_io(io)?
.with_limits(limits)?
.start()?;
// quic event loop. this immediately cancels when crypto is updated by tokio::select!
while let Some(new_conn) = server.accept().await {
debug!("New QUIC connection established");
let Ok(Some(new_server_name)) = new_conn.server_name() else {
warn!("HTTP/3 no SNI is given");
continue;
};
debug!("HTTP/3 connection incoming (SNI {:?})", new_server_name);
let self_clone = self.clone();
self.globals.runtime_handle.spawn(async move {
let client_addr = new_conn.remote_addr()?;
let quic_connection = s2n_quic_h3::Connection::new(new_conn);
// Timeout is based on underlying quic
if let Err(e) = self_clone
.h3_serve_connection(quic_connection, new_server_name.to_server_name(), client_addr)
.await
{
warn!("QUIC or HTTP/3 connection failed: {}", e);
};
Ok(()) as RpxyResult<()>
});
}
Ok(())
}
}

View file

@ -1,6 +0,0 @@
use super::proxy_main::Proxy;
use crate::{log::*, error::*};
impl<E> Proxy<E>{
}