rust-rpxy/src/proxy/proxy_tls.rs
2023-01-25 02:09:13 +09:00

248 lines
8.7 KiB
Rust
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

use super::proxy_main::{LocalExecutor, Proxy};
use crate::{
backend::{ServerCrypto, SniServerCryptoMap},
constants::*,
error::*,
log::*,
utils::BytesName,
};
#[cfg(feature = "http3")]
use futures::StreamExt;
use hyper::{client::connect::Connect, server::conn::Http};
#[cfg(feature = "http3")]
use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerConfig, TransportConfig};
use rustls::ServerConfig;
use std::sync::Arc;
use tokio::{
net::TcpListener,
sync::watch,
time::{sleep, timeout, Duration},
};
impl<T> Proxy<T>
where
T: Connect + Clone + Sync + Send + 'static,
{
async fn cert_service(&self, server_crypto_tx: watch::Sender<Option<Arc<ServerCrypto>>>) {
info!("Start cert watch service");
loop {
if let Ok(server_crypto) = self.globals.backends.generate_server_crypto().await {
if let Err(_e) = server_crypto_tx.send(Some(Arc::new(server_crypto))) {
error!("Failed to populate server crypto");
break;
}
} else {
error!("Failed to update certs");
}
sleep(Duration::from_secs(CERTS_WATCH_DELAY_SECS.into())).await;
}
}
// TCP Listener Service, i.e., http/2 and http/1.1
async fn listener_service(
&self,
server: Http<LocalExecutor>,
mut server_crypto_rx: watch::Receiver<Option<Arc<ServerCrypto>>>,
) -> Result<()> {
let tcp_listener = TcpListener::bind(&self.listening_on).await?;
info!("Start TCP proxy serving with HTTPS request for configured host names");
let mut server_crypto_map: Option<Arc<SniServerCryptoMap>> = None;
loop {
tokio::select! {
tcp_cnx = tcp_listener.accept() => {
if tcp_cnx.is_err() || server_crypto_map.is_none() {
continue;
}
let (raw_stream, client_addr) = tcp_cnx.unwrap();
let sc_map_inner = server_crypto_map.clone();
let server_clone = server.clone();
let self_inner = self.clone();
// spawns async handshake to avoid blocking thread by sequential handshake.
let handshake_fut = async move {
let acceptor = tokio_rustls::LazyConfigAcceptor::new(rustls::server::Acceptor::default(), raw_stream).await;
if let Err(e) = acceptor {
return Err(RpxyError::Proxy(format!("Failed to handshake TLS: {}", e)));
}
let start = acceptor.unwrap();
let client_hello = start.client_hello();
let server_name = client_hello.server_name();
debug!("HTTP/2 or 1.1: SNI in ClientHello: {:?}", server_name);
let server_name = server_name.map_or_else(|| None, |v| Some(v.to_server_name_vec()));
if server_name.is_none(){
return Err(RpxyError::Proxy("No SNI is given".to_string()));
}
let server_crypto = sc_map_inner.as_ref().unwrap().get(server_name.as_ref().unwrap());
if server_crypto.is_none() {
return Err(RpxyError::Proxy(format!("No TLS serving app for {:?}", "xx")));
}
let stream = match start.into_stream(server_crypto.unwrap().clone()).await {
Ok(s) => s,
Err(e) => {
return Err(RpxyError::Proxy(format!("Failed to handshake TLS: {}", e)));
}
};
self_inner.client_serve(stream, server_clone, client_addr, server_name);
Ok(())
};
self.globals.runtime_handle.spawn( async move {
// timeout is introduced to avoid get stuck here.
match timeout(
Duration::from_secs(TLS_HANDSHAKE_TIMEOUT_SEC),
handshake_fut
).await {
Ok(a) => {
if let Err(e) = a {
error!("{}", e);
}
},
Err(e) => {
error!("Timeout to handshake TLS: {}", e);
}
};
});
}
_ = server_crypto_rx.changed() => {
if server_crypto_rx.borrow().is_none() {
break;
}
let server_crypto = server_crypto_rx.borrow().clone().unwrap();
server_crypto_map = Some(server_crypto.inner_local_map.clone());
}
else => break
}
}
Ok(()) as Result<()>
}
#[cfg(feature = "http3")]
async fn listener_service_h3(&self, mut server_crypto_rx: watch::Receiver<Option<Arc<ServerCrypto>>>) -> Result<()> {
info!("Start UDP proxy serving with HTTP/3 request for configured host names");
// first set as null config server
let rustls_server_config = ServerConfig::builder()
.with_safe_defaults()
.with_no_client_auth()
.with_cert_resolver(Arc::new(tokio_rustls::rustls::server::ResolvesServerCertUsingSni::new()));
let mut transport_config_quic = TransportConfig::default();
transport_config_quic
.max_concurrent_bidi_streams(self.globals.h3_max_concurrent_bidistream)
.max_concurrent_uni_streams(self.globals.h3_max_concurrent_unistream)
.max_idle_timeout(self.globals.h3_max_idle_timeout);
let mut server_config_h3 = QuicServerConfig::with_crypto(Arc::new(rustls_server_config));
server_config_h3.transport = Arc::new(transport_config_quic);
server_config_h3.concurrent_connections(self.globals.h3_max_concurrent_connections);
let (endpoint, mut incoming) = Endpoint::server(server_config_h3, self.listening_on)?;
let mut server_crypto: Option<Arc<ServerCrypto>> = None;
loop {
tokio::select! {
new_conn = incoming.next() => {
if server_crypto.is_none() || new_conn.is_none() {
continue;
}
let mut conn = new_conn.unwrap();
let hsd = match conn.handshake_data().await {
Ok(h) => h,
Err(_) => continue
};
let hsd_downcast = match hsd.downcast::<HandshakeData>() {
Ok(d) => d,
Err(_) => continue
};
let new_server_name = match hsd_downcast.server_name {
Some(sn) => sn.to_server_name_vec(),
None => {
warn!("HTTP/3 no SNI is given");
continue;
}
};
debug!(
"HTTP/3 connection incoming (SNI {:?})",
new_server_name.0
);
// TODO: server_nameをここで出してどんどん深く投げていくのは効率が悪い。connecting -> connectionsの後でいいのでは
// TODO: 通常のTLSと同じenumか何かにまとめたい
let fut = self.clone().connection_serve_h3(conn, new_server_name);
self.globals.runtime_handle.spawn(async move {
// Timeout is based on underlying quic
if let Err(e) = fut.await {
warn!("QUIC or HTTP/3 connection failed: {}", e)
}
});
}
_ = server_crypto_rx.changed() => {
if server_crypto_rx.borrow().is_none() {
break;
}
server_crypto = server_crypto_rx.borrow().clone();
if server_crypto.is_some(){
endpoint.set_server_config(Some(QuicServerConfig::with_crypto(server_crypto.clone().unwrap().inner_global_no_client_auth.clone())));
}
}
else => break
}
}
endpoint.wait_idle().await;
Ok(()) as Result<()>
}
pub async fn start_with_tls(self, server: Http<LocalExecutor>) -> Result<()> {
let (tx, rx) = watch::channel::<Option<Arc<ServerCrypto>>>(None);
#[cfg(not(feature = "http3"))]
{
select! {
_= self.cert_service(tx).fuse() => {
error!("Cert service for TLS exited");
},
_ = self.listener_service(server, rx).fuse() => {
error!("TCP proxy service for TLS exited");
},
complete => {
error!("Something went wrong");
return Ok(())
}
};
Ok(())
}
#[cfg(feature = "http3")]
{
if self.globals.http3 {
tokio::select! {
_= self.cert_service(tx) => {
error!("Cert service for TLS exited");
},
_ = self.listener_service(server, rx.clone()) => {
error!("TCP proxy service for TLS exited");
},
_= self.listener_service_h3(rx) => {
error!("UDP proxy service for QUIC exited");
},
else => {
error!("Something went wrong");
return Ok(())
}
};
Ok(())
} else {
tokio::select! {
_= self.cert_service(tx) => {
error!("Cert service for TLS exited");
},
_ = self.listener_service(server, rx) => {
error!("TCP proxy service for TLS exited");
},
else => {
error!("Something went wrong");
return Ok(())
}
};
Ok(())
}
}
}
}