reconsider handling tls for http 1.1 and 2

This commit is contained in:
Jun Kurihara 2022-07-23 14:34:48 +09:00
commit d2fbf8db4d
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03

View file

@ -1,6 +1,5 @@
use super::proxy_main::{LocalExecutor, Proxy}; use super::proxy_main::{LocalExecutor, Proxy};
use crate::{constants::*, error::*, log::*}; use crate::{constants::*, error::*, log::*};
use futures::{future::FutureExt, select};
use hyper::{client::connect::Connect, server::conn::Http}; use hyper::{client::connect::Connect, server::conn::Http};
use rustls::ServerConfig; use rustls::ServerConfig;
use std::sync::Arc; use std::sync::Arc;
@ -47,39 +46,50 @@ where
// let mut server_crypto: Option<Arc<ServerConfig>> = None; // let mut server_crypto: Option<Arc<ServerConfig>> = None;
let mut tls_acceptor: Option<TlsAcceptor> = None; let mut tls_acceptor: Option<TlsAcceptor> = None;
loop { loop {
select! { tokio::select! {
tcp_cnx = tcp_listener.accept().fuse() => { tcp_cnx = tcp_listener.accept() => {
if tls_acceptor.is_none() || tcp_cnx.is_err() { if tls_acceptor.is_none() || tcp_cnx.is_err() {
continue; continue;
} }
let (raw_stream, client_addr) = tcp_cnx.unwrap(); let (raw_stream, client_addr) = tcp_cnx.unwrap();
let acceptor = tls_acceptor.clone().unwrap();
let server_clone = server.clone();
let self_inner = self.clone();
match tls_acceptor.as_ref().unwrap().accept(raw_stream).await { let fut = async move {
Ok(stream) => { match acceptor.accept(raw_stream).await {
// Retrieve SNI Ok(stream) => {
let (_, conn) = stream.get_ref(); // Retrieve SNI
let server_name = conn.sni_hostname(); let (_, conn) = stream.get_ref();
debug!("HTTP/2 or 1.1: SNI in ClientHello: {:?}", server_name); let server_name = conn.sni_hostname();
let server_name = server_name.map_or_else(|| None, |v| Some(v.as_bytes().to_ascii_lowercase())); debug!("HTTP/2 or 1.1: SNI in ClientHello: {:?}", server_name);
if server_name.is_none(){ let server_name = server_name.map_or_else(|| None, |v| Some(v.as_bytes().to_ascii_lowercase()));
continue; if server_name.is_none(){
Err(anyhow!("No SNI is given"))
} else {
self_inner.client_serve(stream, server_clone, client_addr, server_name); // TODO: don't want to pass copied value...
Ok(())
}
},
Err(e) => {
Err(anyhow!("Failed to accept TLS stream {}", e))
} }
self.clone().client_serve(stream, server.clone(), client_addr, server_name); // TODO: don't want to pass copied value...
},
Err(e) => {
error!("Failed to accept TLS stream {}", e);
continue;
} }
} };
self.globals.runtime_handle.spawn( async move {
if let Err(e) = fut.await {
error!("{}", e);
}
});
} }
_ = server_crypto_rx.changed().fuse() => { _ = server_crypto_rx.changed() => {
if server_crypto_rx.borrow().is_none() { if server_crypto_rx.borrow().is_none() {
break; break;
} }
let server_crypto = server_crypto_rx.borrow().clone().unwrap(); let server_crypto = server_crypto_rx.borrow().clone().unwrap();
tls_acceptor = Some(TlsAcceptor::from(server_crypto)); tls_acceptor = Some(TlsAcceptor::from(server_crypto));
} }
complete => break else => break
} }
} }
Ok(()) as Result<()> Ok(()) as Result<()>
@ -106,8 +116,8 @@ where
let mut server_crypto: Option<Arc<ServerConfig>> = None; let mut server_crypto: Option<Arc<ServerConfig>> = None;
loop { loop {
select! { tokio::select! {
new_conn = incoming.next().fuse() => { new_conn = incoming.next() => {
if server_crypto.is_none() || new_conn.is_none() { if server_crypto.is_none() || new_conn.is_none() {
continue; continue;
} }
@ -141,7 +151,7 @@ where
} }
}); });
} }
_ = server_crypto_rx.changed().fuse() => { _ = server_crypto_rx.changed() => {
if server_crypto_rx.borrow().is_none() { if server_crypto_rx.borrow().is_none() {
break; break;
} }
@ -151,7 +161,7 @@ where
endpoint.set_server_config(Some(QuicServerConfig::with_crypto(server_crypto.clone().unwrap()))); endpoint.set_server_config(Some(QuicServerConfig::with_crypto(server_crypto.clone().unwrap())));
} }
} }
complete => break // complete => break
} }
} }
endpoint.wait_idle().await; endpoint.wait_idle().await;
@ -179,34 +189,34 @@ where
#[cfg(feature = "http3")] #[cfg(feature = "http3")]
{ {
if self.globals.http3 { if self.globals.http3 {
select! { tokio::select! {
_= self.cert_service(tx).fuse() => { _= self.cert_service(tx) => {
error!("Cert service for TLS exited"); error!("Cert service for TLS exited");
}, },
_ = self.listener_service(server, rx.clone()).fuse() => { _ = self.listener_service(server, rx.clone()) => {
error!("TCP proxy service for TLS exited"); error!("TCP proxy service for TLS exited");
}, },
_= self.listener_service_h3(rx).fuse() => { _= self.listener_service_h3(rx) => {
error!("UDP proxy service for QUIC exited"); error!("UDP proxy service for QUIC exited");
}, },
complete => { // complete => {
error!("Something went wrong"); // error!("Something went wrong");
return Ok(()) // return Ok(())
} // }
}; };
Ok(()) Ok(())
} else { } else {
select! { tokio::select! {
_= self.cert_service(tx).fuse() => { _= self.cert_service(tx) => {
error!("Cert service for TLS exited"); error!("Cert service for TLS exited");
}, },
_ = self.listener_service(server, rx).fuse() => { _ = self.listener_service(server, rx) => {
error!("TCP proxy service for TLS exited"); error!("TCP proxy service for TLS exited");
}, },
complete => { // complete => {
error!("Something went wrong"); // error!("Something went wrong");
return Ok(()) // return Ok(())
} // }
}; };
Ok(()) Ok(())
} }