refactor: initial implementation of separeted lib and bin
This commit is contained in:
parent
ec85b0bb28
commit
13e82035a8
37 changed files with 225 additions and 157 deletions
181
rpxy-lib/src/proxy/crypto_service.rs
Normal file
181
rpxy-lib/src/proxy/crypto_service.rs
Normal file
|
|
@ -0,0 +1,181 @@
|
|||
use crate::{
|
||||
certs::{CertsAndKeys, CryptoSource},
|
||||
globals::Globals,
|
||||
log::*,
|
||||
utils::ServerNameBytesExp,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use hot_reload::*;
|
||||
use rustc_hash::FxHashMap as HashMap;
|
||||
use rustls::{server::ResolvesServerCertUsingSni, sign::CertifiedKey, RootCertStore, ServerConfig};
|
||||
use std::sync::Arc;
|
||||
|
||||
#[derive(Clone)]
|
||||
/// Reloader service for certificates and keys for TLS
|
||||
pub struct CryptoReloader<T>
|
||||
where
|
||||
T: CryptoSource,
|
||||
{
|
||||
globals: Arc<Globals<T>>,
|
||||
}
|
||||
|
||||
pub type SniServerCryptoMap = HashMap<ServerNameBytesExp, Arc<ServerConfig>>;
|
||||
pub struct ServerCrypto {
|
||||
// For Quic/HTTP3, only servers with no client authentication
|
||||
pub inner_global_no_client_auth: Arc<ServerConfig>,
|
||||
// For TLS over TCP/HTTP2 and 1.1, map of SNI to server_crypto for all given servers
|
||||
pub inner_local_map: Arc<SniServerCryptoMap>,
|
||||
}
|
||||
|
||||
/// Reloader target for the certificate reloader service
|
||||
#[derive(Debug, PartialEq, Eq, Clone, Default)]
|
||||
pub struct ServerCryptoBase {
|
||||
inner: HashMap<ServerNameBytesExp, CertsAndKeys>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T> Reload<ServerCryptoBase> for CryptoReloader<T>
|
||||
where
|
||||
T: CryptoSource + Sync + Send,
|
||||
{
|
||||
type Source = Arc<Globals<T>>;
|
||||
async fn new(source: &Self::Source) -> Result<Self, ReloaderError<ServerCryptoBase>> {
|
||||
Ok(Self {
|
||||
globals: source.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
async fn reload(&self) -> Result<Option<ServerCryptoBase>, ReloaderError<ServerCryptoBase>> {
|
||||
let mut certs_and_keys_map = ServerCryptoBase::default();
|
||||
|
||||
for (server_name_bytes_exp, backend) in self.globals.backends.apps.iter() {
|
||||
if let Some(crypto_source) = &backend.crypto_source {
|
||||
let certs_and_keys = crypto_source
|
||||
.read()
|
||||
.await
|
||||
.map_err(|_e| ReloaderError::<ServerCryptoBase>::Reload("Failed to reload cert, key or ca cert"))?;
|
||||
certs_and_keys_map
|
||||
.inner
|
||||
.insert(server_name_bytes_exp.to_owned(), certs_and_keys);
|
||||
}
|
||||
}
|
||||
|
||||
Ok(Some(certs_and_keys_map))
|
||||
}
|
||||
}
|
||||
|
||||
impl TryInto<Arc<ServerCrypto>> for &ServerCryptoBase {
|
||||
type Error = anyhow::Error;
|
||||
|
||||
fn try_into(self) -> Result<Arc<ServerCrypto>, Self::Error> {
|
||||
let mut resolver_global = ResolvesServerCertUsingSni::new();
|
||||
let mut server_crypto_local_map: SniServerCryptoMap = HashMap::default();
|
||||
|
||||
for (server_name_bytes_exp, certs_and_keys) in self.inner.iter() {
|
||||
let server_name: String = server_name_bytes_exp.try_into()?;
|
||||
|
||||
// Parse server certificates and private keys
|
||||
let Ok(certified_key): Result<CertifiedKey, _> = certs_and_keys.parse_server_certs_and_keys() else {
|
||||
warn!("Failed to add certificate for {}", server_name);
|
||||
continue;
|
||||
};
|
||||
|
||||
let mut resolver_local = ResolvesServerCertUsingSni::new();
|
||||
let mut client_ca_roots_local = RootCertStore::empty();
|
||||
|
||||
// add server certificate and key
|
||||
if let Err(e) = resolver_local.add(server_name.as_str(), certified_key.to_owned()) {
|
||||
error!(
|
||||
"{}: Failed to read some certificates and keys {}",
|
||||
server_name.as_str(),
|
||||
e
|
||||
)
|
||||
}
|
||||
|
||||
// add client certificate if specified
|
||||
if certs_and_keys.client_ca_certs.is_none() {
|
||||
// aggregated server config for no client auth server for http3
|
||||
if let Err(e) = resolver_global.add(server_name.as_str(), certified_key) {
|
||||
error!(
|
||||
"{}: Failed to read some certificates and keys {}",
|
||||
server_name.as_str(),
|
||||
e
|
||||
)
|
||||
}
|
||||
} else {
|
||||
// add client certificate if specified
|
||||
match certs_and_keys.parse_client_ca_certs() {
|
||||
Ok((owned_trust_anchors, _subject_key_ids)) => {
|
||||
client_ca_roots_local.add_server_trust_anchors(owned_trust_anchors.into_iter());
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to add client CA certificate for {}: {}",
|
||||
server_name.as_str(),
|
||||
e
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
let mut server_config_local = if client_ca_roots_local.is_empty() {
|
||||
// with no client auth, enable http1.1 -- 3
|
||||
#[cfg(not(feature = "http3"))]
|
||||
{
|
||||
ServerConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(Arc::new(resolver_local))
|
||||
}
|
||||
#[cfg(feature = "http3")]
|
||||
{
|
||||
let mut sc = ServerConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(Arc::new(resolver_local));
|
||||
sc.alpn_protocols = vec![b"h3".to_vec(), b"hq-29".to_vec()]; // TODO: remove hq-29 later?
|
||||
sc
|
||||
}
|
||||
} else {
|
||||
// with client auth, enable only http1.1 and 2
|
||||
// let client_certs_verifier = rustls::server::AllowAnyAnonymousOrAuthenticatedClient::new(client_ca_roots);
|
||||
let client_certs_verifier = rustls::server::AllowAnyAuthenticatedClient::new(client_ca_roots_local);
|
||||
ServerConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_client_cert_verifier(Arc::new(client_certs_verifier))
|
||||
.with_cert_resolver(Arc::new(resolver_local))
|
||||
};
|
||||
server_config_local.alpn_protocols.push(b"h2".to_vec());
|
||||
server_config_local.alpn_protocols.push(b"http/1.1".to_vec());
|
||||
|
||||
server_crypto_local_map.insert(server_name_bytes_exp.to_owned(), Arc::new(server_config_local));
|
||||
}
|
||||
|
||||
//////////////
|
||||
let mut server_crypto_global = ServerConfig::builder()
|
||||
.with_safe_defaults()
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(Arc::new(resolver_global));
|
||||
|
||||
//////////////////////////////
|
||||
|
||||
#[cfg(feature = "http3")]
|
||||
{
|
||||
server_crypto_global.alpn_protocols = vec![
|
||||
b"h3".to_vec(),
|
||||
b"hq-29".to_vec(), // TODO: remove later?
|
||||
b"h2".to_vec(),
|
||||
b"http/1.1".to_vec(),
|
||||
];
|
||||
}
|
||||
#[cfg(not(feature = "http3"))]
|
||||
{
|
||||
server_crypto_global.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
}
|
||||
|
||||
Ok(Arc::new(ServerCrypto {
|
||||
inner_global_no_client_auth: Arc::new(server_crypto_global),
|
||||
inner_local_map: Arc::new(server_crypto_local_map),
|
||||
}))
|
||||
}
|
||||
}
|
||||
8
rpxy-lib/src/proxy/mod.rs
Normal file
8
rpxy-lib/src/proxy/mod.rs
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
mod crypto_service;
|
||||
mod proxy_client_cert;
|
||||
#[cfg(feature = "http3")]
|
||||
mod proxy_h3;
|
||||
mod proxy_main;
|
||||
mod proxy_tls;
|
||||
|
||||
pub use proxy_main::{Proxy, ProxyBuilder, ProxyBuilderError};
|
||||
47
rpxy-lib/src/proxy/proxy_client_cert.rs
Normal file
47
rpxy-lib/src/proxy/proxy_client_cert.rs
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
use crate::{error::*, log::*};
|
||||
use rustc_hash::FxHashSet as HashSet;
|
||||
use rustls::Certificate;
|
||||
use x509_parser::extensions::ParsedExtension;
|
||||
use x509_parser::prelude::*;
|
||||
|
||||
#[allow(dead_code)]
|
||||
// TODO: consider move this function to the layer of handle_request (L7) to return 403
|
||||
pub(super) fn check_client_authentication(
|
||||
client_certs: Option<&[Certificate]>,
|
||||
client_ca_keyids_set_for_sni: Option<&HashSet<Vec<u8>>>,
|
||||
) -> std::result::Result<(), ClientCertsError> {
|
||||
let Some(client_ca_keyids_set) = client_ca_keyids_set_for_sni else {
|
||||
// No client cert settings for given server name
|
||||
return Ok(());
|
||||
};
|
||||
|
||||
let Some(client_certs) = client_certs else {
|
||||
error!("Client certificate is needed for given server name");
|
||||
return Err(ClientCertsError::ClientCertRequired(
|
||||
"Client certificate is needed for given server name".to_string(),
|
||||
));
|
||||
};
|
||||
debug!("Incoming TLS client is (temporarily) authenticated via client cert");
|
||||
|
||||
// Check client certificate key ids
|
||||
let mut client_certs_parsed_iter = client_certs.iter().filter_map(|d| parse_x509_certificate(&d.0).ok());
|
||||
let match_server_crypto_and_client_cert = client_certs_parsed_iter.any(|c| {
|
||||
let mut filtered = c.1.iter_extensions().filter_map(|e| {
|
||||
if let ParsedExtension::AuthorityKeyIdentifier(key_id) = e.parsed_extension() {
|
||||
key_id.key_identifier.as_ref()
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
filtered.any(|id| client_ca_keyids_set.contains(id.0))
|
||||
});
|
||||
|
||||
if !match_server_crypto_and_client_cert {
|
||||
error!("Inconsistent client certificate was provided for SNI");
|
||||
return Err(ClientCertsError::InconsistentClientCert(
|
||||
"Inconsistent client certificate was provided for SNI".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
163
rpxy-lib/src/proxy/proxy_h3.rs
Normal file
163
rpxy-lib/src/proxy/proxy_h3.rs
Normal file
|
|
@ -0,0 +1,163 @@
|
|||
use super::Proxy;
|
||||
use crate::{certs::CryptoSource, error::*, log::*, utils::ServerNameBytesExp};
|
||||
use bytes::{Buf, Bytes};
|
||||
use h3::{quic::BidiStream, server::RequestStream};
|
||||
use hyper::{client::connect::Connect, Body, Request, Response};
|
||||
use std::net::SocketAddr;
|
||||
use tokio::time::{timeout, Duration};
|
||||
|
||||
impl<T, U> Proxy<T, U>
|
||||
where
|
||||
T: Connect + Clone + Sync + Send + 'static,
|
||||
U: CryptoSource + Clone + Sync + Send + 'static,
|
||||
{
|
||||
pub(super) async fn connection_serve_h3(
|
||||
self,
|
||||
conn: quinn::Connecting,
|
||||
tls_server_name: ServerNameBytesExp,
|
||||
) -> Result<()> {
|
||||
let client_addr = conn.remote_address();
|
||||
|
||||
match conn.await {
|
||||
Ok(new_conn) => {
|
||||
let mut h3_conn = h3::server::Connection::<_, bytes::Bytes>::new(h3_quinn::Connection::new(new_conn)).await?;
|
||||
info!(
|
||||
"QUIC/HTTP3 connection established from {:?} {:?}",
|
||||
client_addr, tls_server_name
|
||||
);
|
||||
// TODO: Is here enough to fetch server_name from NewConnection?
|
||||
// to avoid deep nested call from listener_service_h3
|
||||
loop {
|
||||
// this routine follows hyperium/h3 examples https://github.com/hyperium/h3/blob/master/examples/server.rs
|
||||
match h3_conn.accept().await {
|
||||
Ok(None) => {
|
||||
break;
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("HTTP/3 error on accept incoming connection: {}", e);
|
||||
match e.get_error_level() {
|
||||
h3::error::ErrorLevel::ConnectionError => break,
|
||||
h3::error::ErrorLevel::StreamError => continue,
|
||||
}
|
||||
}
|
||||
Ok(Some((req, stream))) => {
|
||||
// We consider the connection count separately from the stream count.
|
||||
// Max clients for h1/h2 = max 'stream' for h3.
|
||||
let request_count = self.globals.request_count.clone();
|
||||
if request_count.increment() > self.globals.proxy_config.max_clients {
|
||||
request_count.decrement();
|
||||
h3_conn.shutdown(0).await?;
|
||||
break;
|
||||
}
|
||||
debug!("Request incoming: current # {}", request_count.current());
|
||||
|
||||
let self_inner = self.clone();
|
||||
let tls_server_name_inner = tls_server_name.clone();
|
||||
self.globals.runtime_handle.spawn(async move {
|
||||
if let Err(e) = timeout(
|
||||
self_inner.globals.proxy_config.proxy_timeout + Duration::from_secs(1), // timeout per stream are considered as same as one in http2
|
||||
self_inner.stream_serve_h3(req, stream, client_addr, tls_server_name_inner),
|
||||
)
|
||||
.await
|
||||
{
|
||||
error!("HTTP/3 failed to process stream: {}", e);
|
||||
}
|
||||
request_count.decrement();
|
||||
debug!("Request processed: current # {}", request_count.current());
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("QUIC accepting connection failed: {:?}", err);
|
||||
return Err(RpxyError::QuicConn(err));
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn stream_serve_h3<S>(
|
||||
self,
|
||||
req: Request<()>,
|
||||
stream: RequestStream<S, Bytes>,
|
||||
client_addr: SocketAddr,
|
||||
tls_server_name: ServerNameBytesExp,
|
||||
) -> Result<()>
|
||||
where
|
||||
S: BidiStream<Bytes> + Send + 'static,
|
||||
<S as BidiStream<Bytes>>::RecvStream: Send,
|
||||
{
|
||||
let (req_parts, _) = req.into_parts();
|
||||
// split stream and async body handling
|
||||
let (mut send_stream, mut recv_stream) = stream.split();
|
||||
|
||||
// generate streamed body with trailers using channel
|
||||
let (body_sender, req_body) = Body::channel();
|
||||
|
||||
// Buffering and sending body through channel for protocol conversion like h3 -> h2/http1.1
|
||||
// The underling buffering, i.e., buffer given by the API recv_data.await?, is handled by quinn.
|
||||
let max_body_size = self.globals.proxy_config.h3_request_max_body_size;
|
||||
self.globals.runtime_handle.spawn(async move {
|
||||
let mut sender = body_sender;
|
||||
let mut size = 0usize;
|
||||
while let Some(mut body) = recv_stream.recv_data().await? {
|
||||
debug!("HTTP/3 incoming request body: remaining {}", body.remaining());
|
||||
size += body.remaining();
|
||||
if size > max_body_size {
|
||||
error!(
|
||||
"Exceeds max request body size for HTTP/3: received {}, maximum_allowd {}",
|
||||
size, max_body_size
|
||||
);
|
||||
return Err(RpxyError::Proxy("Exceeds max request body size for HTTP/3".to_string()));
|
||||
}
|
||||
// create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes
|
||||
sender.send_data(body.copy_to_bytes(body.remaining())).await?;
|
||||
}
|
||||
|
||||
// trailers: use inner for work around. (directly get trailer)
|
||||
let trailers = recv_stream.as_mut().recv_trailers().await?;
|
||||
if trailers.is_some() {
|
||||
debug!("HTTP/3 incoming request trailers");
|
||||
sender.send_trailers(trailers.unwrap()).await?;
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
let new_req: Request<Body> = Request::from_parts(req_parts, req_body);
|
||||
let res = self
|
||||
.msg_handler
|
||||
.clone()
|
||||
.handle_request(
|
||||
new_req,
|
||||
client_addr,
|
||||
self.listening_on,
|
||||
self.tls_enabled,
|
||||
Some(tls_server_name),
|
||||
)
|
||||
.await?;
|
||||
|
||||
let (new_res_parts, new_body) = res.into_parts();
|
||||
let new_res = Response::from_parts(new_res_parts, ());
|
||||
|
||||
match send_stream.send_response(new_res).await {
|
||||
Ok(_) => {
|
||||
debug!("HTTP/3 response to connection successful");
|
||||
// aggregate body without copying
|
||||
let mut body_data = hyper::body::aggregate(new_body).await?;
|
||||
|
||||
// create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes
|
||||
send_stream
|
||||
.send_data(body_data.copy_to_bytes(body_data.remaining()))
|
||||
.await?;
|
||||
|
||||
// TODO: needs handling trailer? should be included in body from handler.
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Unable to send response to connection peer: {:?}", err);
|
||||
}
|
||||
}
|
||||
Ok(send_stream.finish().await?)
|
||||
}
|
||||
}
|
||||
124
rpxy-lib/src/proxy/proxy_main.rs
Normal file
124
rpxy-lib/src/proxy/proxy_main.rs
Normal file
|
|
@ -0,0 +1,124 @@
|
|||
// use super::proxy_handler::handle_request;
|
||||
use crate::{
|
||||
certs::CryptoSource, error::*, globals::Globals, handler::HttpMessageHandler, log::*, utils::ServerNameBytesExp,
|
||||
};
|
||||
use derive_builder::{self, Builder};
|
||||
use hyper::{client::connect::Connect, server::conn::Http, service::service_fn, Body, Request};
|
||||
use std::{net::SocketAddr, sync::Arc};
|
||||
use tokio::{
|
||||
io::{AsyncRead, AsyncWrite},
|
||||
net::TcpListener,
|
||||
runtime::Handle,
|
||||
time::{timeout, Duration},
|
||||
};
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct LocalExecutor {
|
||||
runtime_handle: Handle,
|
||||
}
|
||||
|
||||
impl LocalExecutor {
|
||||
fn new(runtime_handle: Handle) -> Self {
|
||||
LocalExecutor { runtime_handle }
|
||||
}
|
||||
}
|
||||
|
||||
impl<F> hyper::rt::Executor<F> for LocalExecutor
|
||||
where
|
||||
F: std::future::Future + Send + 'static,
|
||||
F::Output: Send,
|
||||
{
|
||||
fn execute(&self, fut: F) {
|
||||
self.runtime_handle.spawn(fut);
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Builder)]
|
||||
pub struct Proxy<T, U>
|
||||
where
|
||||
T: Connect + Clone + Sync + Send + 'static,
|
||||
U: CryptoSource + Clone + Sync + Send + 'static,
|
||||
{
|
||||
pub listening_on: SocketAddr,
|
||||
pub tls_enabled: bool, // TCP待受がTLSかどうか
|
||||
pub msg_handler: HttpMessageHandler<T, U>,
|
||||
pub globals: Arc<Globals<U>>,
|
||||
}
|
||||
|
||||
impl<T, U> Proxy<T, U>
|
||||
where
|
||||
T: Connect + Clone + Sync + Send + 'static,
|
||||
U: CryptoSource + Clone + Sync + Send,
|
||||
{
|
||||
pub(super) fn client_serve<I>(
|
||||
self,
|
||||
stream: I,
|
||||
server: Http<LocalExecutor>,
|
||||
peer_addr: SocketAddr,
|
||||
tls_server_name: Option<ServerNameBytesExp>,
|
||||
) where
|
||||
I: AsyncRead + AsyncWrite + Send + Unpin + 'static,
|
||||
{
|
||||
let request_count = self.globals.request_count.clone();
|
||||
if request_count.increment() > self.globals.proxy_config.max_clients {
|
||||
request_count.decrement();
|
||||
return;
|
||||
}
|
||||
debug!("Request incoming: current # {}", request_count.current());
|
||||
|
||||
self.globals.runtime_handle.clone().spawn(async move {
|
||||
timeout(
|
||||
self.globals.proxy_config.proxy_timeout + Duration::from_secs(1),
|
||||
server
|
||||
.serve_connection(
|
||||
stream,
|
||||
service_fn(move |req: Request<Body>| {
|
||||
self.msg_handler.clone().handle_request(
|
||||
req,
|
||||
peer_addr,
|
||||
self.listening_on,
|
||||
self.tls_enabled,
|
||||
tls_server_name.clone(),
|
||||
)
|
||||
}),
|
||||
)
|
||||
.with_upgrades(),
|
||||
)
|
||||
.await
|
||||
.ok();
|
||||
|
||||
request_count.decrement();
|
||||
debug!("Request processed: current # {}", request_count.current());
|
||||
});
|
||||
}
|
||||
|
||||
async fn start_without_tls(self, server: Http<LocalExecutor>) -> Result<()> {
|
||||
let listener_service = async {
|
||||
let tcp_listener = TcpListener::bind(&self.listening_on).await?;
|
||||
info!("Start TCP proxy serving with HTTP request for configured host names");
|
||||
while let Ok((stream, _client_addr)) = tcp_listener.accept().await {
|
||||
self.clone().client_serve(stream, server.clone(), _client_addr, None);
|
||||
}
|
||||
Ok(()) as Result<()>
|
||||
};
|
||||
listener_service.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn start(self) -> Result<()> {
|
||||
let mut server = Http::new();
|
||||
server.http1_keep_alive(self.globals.proxy_config.keepalive);
|
||||
server.http2_max_concurrent_streams(self.globals.proxy_config.max_concurrent_streams);
|
||||
server.pipeline_flush(true);
|
||||
let executor = LocalExecutor::new(self.globals.runtime_handle.clone());
|
||||
let server = server.with_executor(executor);
|
||||
|
||||
if self.tls_enabled {
|
||||
self.start_with_tls(server).await?;
|
||||
} else {
|
||||
self.start_without_tls(server).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
245
rpxy-lib/src/proxy/proxy_tls.rs
Normal file
245
rpxy-lib/src/proxy/proxy_tls.rs
Normal file
|
|
@ -0,0 +1,245 @@
|
|||
use super::{
|
||||
crypto_service::{CryptoReloader, ServerCrypto, ServerCryptoBase, SniServerCryptoMap},
|
||||
proxy_main::{LocalExecutor, Proxy},
|
||||
};
|
||||
use crate::{certs::CryptoSource, constants::*, error::*, log::*, utils::BytesName};
|
||||
use hot_reload::{ReloaderReceiver, ReloaderService};
|
||||
use hyper::{client::connect::Connect, server::conn::Http};
|
||||
#[cfg(feature = "http3")]
|
||||
use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerConfig, TransportConfig};
|
||||
#[cfg(feature = "http3")]
|
||||
use rustls::ServerConfig;
|
||||
use std::sync::Arc;
|
||||
use tokio::{
|
||||
net::TcpListener,
|
||||
time::{timeout, Duration},
|
||||
};
|
||||
|
||||
impl<T, U> Proxy<T, U>
|
||||
where
|
||||
T: Connect + Clone + Sync + Send + 'static,
|
||||
U: CryptoSource + Clone + Sync + Send + 'static,
|
||||
{
|
||||
// TCP Listener Service, i.e., http/2 and http/1.1
|
||||
async fn listener_service(
|
||||
&self,
|
||||
server: Http<LocalExecutor>,
|
||||
mut server_crypto_rx: ReloaderReceiver<ServerCryptoBase>,
|
||||
) -> Result<()> {
|
||||
let tcp_listener = TcpListener::bind(&self.listening_on).await?;
|
||||
info!("Start TCP proxy serving with HTTPS request for configured host names");
|
||||
|
||||
let mut server_crypto_map: Option<Arc<SniServerCryptoMap>> = None;
|
||||
loop {
|
||||
tokio::select! {
|
||||
tcp_cnx = tcp_listener.accept() => {
|
||||
if tcp_cnx.is_err() || server_crypto_map.is_none() {
|
||||
continue;
|
||||
}
|
||||
let (raw_stream, client_addr) = tcp_cnx.unwrap();
|
||||
let sc_map_inner = server_crypto_map.clone();
|
||||
let server_clone = server.clone();
|
||||
let self_inner = self.clone();
|
||||
|
||||
// spawns async handshake to avoid blocking thread by sequential handshake.
|
||||
let handshake_fut = async move {
|
||||
let acceptor = tokio_rustls::LazyConfigAcceptor::new(tokio_rustls::rustls::server::Acceptor::default(), raw_stream).await;
|
||||
if let Err(e) = acceptor {
|
||||
return Err(RpxyError::Proxy(format!("Failed to handshake TLS: {e}")));
|
||||
}
|
||||
let start = acceptor.unwrap();
|
||||
let client_hello = start.client_hello();
|
||||
let server_name = client_hello.server_name();
|
||||
debug!("HTTP/2 or 1.1: SNI in ClientHello: {:?}", server_name);
|
||||
let server_name = server_name.map_or_else(|| None, |v| Some(v.to_server_name_vec()));
|
||||
if server_name.is_none(){
|
||||
return Err(RpxyError::Proxy("No SNI is given".to_string()));
|
||||
}
|
||||
let server_crypto = sc_map_inner.as_ref().unwrap().get(server_name.as_ref().unwrap());
|
||||
if server_crypto.is_none() {
|
||||
return Err(RpxyError::Proxy(format!("No TLS serving app for {:?}", "xx")));
|
||||
}
|
||||
let stream = match start.into_stream(server_crypto.unwrap().clone()).await {
|
||||
Ok(s) => s,
|
||||
Err(e) => {
|
||||
return Err(RpxyError::Proxy(format!("Failed to handshake TLS: {e}")));
|
||||
}
|
||||
};
|
||||
self_inner.client_serve(stream, server_clone, client_addr, server_name);
|
||||
Ok(())
|
||||
};
|
||||
|
||||
self.globals.runtime_handle.spawn( async move {
|
||||
// timeout is introduced to avoid get stuck here.
|
||||
match timeout(
|
||||
Duration::from_secs(TLS_HANDSHAKE_TIMEOUT_SEC),
|
||||
handshake_fut
|
||||
).await {
|
||||
Ok(a) => {
|
||||
if let Err(e) = a {
|
||||
error!("{}", e);
|
||||
}
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Timeout to handshake TLS: {}", e);
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
_ = server_crypto_rx.changed() => {
|
||||
if server_crypto_rx.borrow().is_none() {
|
||||
error!("Reloader is broken");
|
||||
break;
|
||||
}
|
||||
let cert_keys_map = server_crypto_rx.borrow().clone().unwrap();
|
||||
let Some(server_crypto): Option<Arc<ServerCrypto>> = (&cert_keys_map).try_into().ok() else {
|
||||
error!("Failed to update server crypto");
|
||||
break;
|
||||
};
|
||||
server_crypto_map = Some(server_crypto.inner_local_map.clone());
|
||||
}
|
||||
else => break
|
||||
}
|
||||
}
|
||||
Ok(()) as Result<()>
|
||||
}
|
||||
|
||||
#[cfg(feature = "http3")]
|
||||
async fn listener_service_h3(&self, mut server_crypto_rx: ReloaderReceiver<ServerCryptoBase>) -> Result<()> {
|
||||
info!("Start UDP proxy serving with HTTP/3 request for configured host names");
|
||||
// first set as null config server
|
||||
let rustls_server_config = ServerConfig::builder()
|
||||
.with_safe_default_cipher_suites()
|
||||
.with_safe_default_kx_groups()
|
||||
.with_protocol_versions(&[&rustls::version::TLS13])?
|
||||
.with_no_client_auth()
|
||||
.with_cert_resolver(Arc::new(rustls::server::ResolvesServerCertUsingSni::new()));
|
||||
|
||||
let mut transport_config_quic = TransportConfig::default();
|
||||
transport_config_quic
|
||||
.max_concurrent_bidi_streams(self.globals.proxy_config.h3_max_concurrent_bidistream)
|
||||
.max_concurrent_uni_streams(self.globals.proxy_config.h3_max_concurrent_unistream)
|
||||
.max_idle_timeout(self.globals.proxy_config.h3_max_idle_timeout);
|
||||
|
||||
let mut server_config_h3 = QuicServerConfig::with_crypto(Arc::new(rustls_server_config));
|
||||
server_config_h3.transport = Arc::new(transport_config_quic);
|
||||
server_config_h3.concurrent_connections(self.globals.proxy_config.h3_max_concurrent_connections);
|
||||
let endpoint = Endpoint::server(server_config_h3, self.listening_on)?;
|
||||
|
||||
let mut server_crypto: Option<Arc<ServerCrypto>> = None;
|
||||
loop {
|
||||
tokio::select! {
|
||||
new_conn = endpoint.accept() => {
|
||||
if server_crypto.is_none() || new_conn.is_none() {
|
||||
continue;
|
||||
}
|
||||
let mut conn: quinn::Connecting = new_conn.unwrap();
|
||||
let Ok(hsd) = conn.handshake_data().await else {
|
||||
continue
|
||||
};
|
||||
|
||||
let Ok(hsd_downcast) = hsd.downcast::<HandshakeData>() else {
|
||||
continue
|
||||
};
|
||||
let Some(new_server_name) = hsd_downcast.server_name else {
|
||||
warn!("HTTP/3 no SNI is given");
|
||||
continue;
|
||||
};
|
||||
debug!(
|
||||
"HTTP/3 connection incoming (SNI {:?})",
|
||||
new_server_name
|
||||
);
|
||||
// TODO: server_nameをここで出してどんどん深く投げていくのは効率が悪い。connecting -> connectionsの後でいいのでは?
|
||||
// TODO: 通常のTLSと同じenumか何かにまとめたい
|
||||
let fut = self.clone().connection_serve_h3(conn, new_server_name.to_server_name_vec());
|
||||
self.globals.runtime_handle.spawn(async move {
|
||||
// Timeout is based on underlying quic
|
||||
if let Err(e) = fut.await {
|
||||
warn!("QUIC or HTTP/3 connection failed: {}", e)
|
||||
}
|
||||
});
|
||||
}
|
||||
_ = server_crypto_rx.changed() => {
|
||||
if server_crypto_rx.borrow().is_none() {
|
||||
error!("Reloader is broken");
|
||||
break;
|
||||
}
|
||||
let cert_keys_map = server_crypto_rx.borrow().clone().unwrap();
|
||||
|
||||
server_crypto = (&cert_keys_map).try_into().ok();
|
||||
let Some(inner) = server_crypto.clone() else {
|
||||
error!("Failed to update server crypto for h3");
|
||||
break;
|
||||
};
|
||||
endpoint.set_server_config(Some(QuicServerConfig::with_crypto(inner.clone().inner_global_no_client_auth.clone())));
|
||||
|
||||
}
|
||||
else => break
|
||||
}
|
||||
}
|
||||
endpoint.wait_idle().await;
|
||||
Ok(()) as Result<()>
|
||||
}
|
||||
|
||||
pub async fn start_with_tls(self, server: Http<LocalExecutor>) -> Result<()> {
|
||||
let (cert_reloader_service, cert_reloader_rx) = ReloaderService::<CryptoReloader<U>, ServerCryptoBase>::new(
|
||||
&self.globals.clone(),
|
||||
CERTS_WATCH_DELAY_SECS,
|
||||
!LOAD_CERTS_ONLY_WHEN_UPDATED,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!(e))?;
|
||||
|
||||
#[cfg(not(feature = "http3"))]
|
||||
{
|
||||
tokio::select! {
|
||||
_= self.cert_service(tx) => {
|
||||
error!("Cert service for TLS exited");
|
||||
},
|
||||
_ = self.listener_service(server, rx) => {
|
||||
error!("TCP proxy service for TLS exited");
|
||||
},
|
||||
else => {
|
||||
error!("Something went wrong");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
#[cfg(feature = "http3")]
|
||||
{
|
||||
if self.globals.proxy_config.http3 {
|
||||
tokio::select! {
|
||||
_= cert_reloader_service.start() => {
|
||||
error!("Cert service for TLS exited");
|
||||
},
|
||||
_ = self.listener_service(server, cert_reloader_rx.clone()) => {
|
||||
error!("TCP proxy service for TLS exited");
|
||||
},
|
||||
_= self.listener_service_h3(cert_reloader_rx) => {
|
||||
error!("UDP proxy service for QUIC exited");
|
||||
},
|
||||
else => {
|
||||
error!("Something went wrong");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
} else {
|
||||
tokio::select! {
|
||||
_= cert_reloader_service.start() => {
|
||||
error!("Cert service for TLS exited");
|
||||
},
|
||||
_ = self.listener_service(server, cert_reloader_rx) => {
|
||||
error!("TCP proxy service for TLS exited");
|
||||
},
|
||||
else => {
|
||||
error!("Something went wrong");
|
||||
return Ok(())
|
||||
}
|
||||
};
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue