This commit is contained in:
Jun Kurihara 2022-07-07 20:28:30 +09:00
commit 5840808021
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
3 changed files with 51 additions and 81 deletions

View file

@ -8,9 +8,6 @@ pub const MAX_CONCURRENT_STREAMS: u32 = 16;
// #[cfg(feature = "tls")] // #[cfg(feature = "tls")]
pub const CERTS_WATCH_DELAY_SECS: u32 = 10; pub const CERTS_WATCH_DELAY_SECS: u32 = 10;
pub const GET_LISTENER_RETRY_MAX_CNT: u64 = 128;
pub const GET_LISTENER_RETRY_WAITING_MSEC: u64 = 10;
#[cfg(feature = "h3")] #[cfg(feature = "h3")]
pub const H3_ALT_SVC_MAX_AGE: u32 = 60; pub const H3_ALT_SVC_MAX_AGE: u32 = 60;
#[cfg(feature = "h3")] #[cfg(feature = "h3")]

View file

@ -1,6 +1,6 @@
// use super::proxy_handler::handle_request; // use super::proxy_handler::handle_request;
use super::Backends; use super::Backends;
use crate::{constants::*, error::*, globals::Globals, log::*}; use crate::{error::*, globals::Globals, log::*};
use hyper::{ use hyper::{
client::connect::Connect, server::conn::Http, service::service_fn, Body, Client, Request, client::connect::Connect, server::conn::Http, service::service_fn, Body, Client, Request,
}; };
@ -77,28 +77,9 @@ where
}); });
} }
// Work around to forcibly get tcp listener for "address already in use"
pub(super) async fn try_bind_tcp_listener(&self) -> Result<TcpListener> {
let mut cnt = 0;
while cnt < GET_LISTENER_RETRY_MAX_CNT {
if let Ok(listener) = TcpListener::bind(&self.listening_on).await {
return Ok(listener);
}
cnt += 1;
tokio::time::sleep(tokio::time::Duration::from_millis(
GET_LISTENER_RETRY_WAITING_MSEC,
))
.await;
}
error!("Failed to get tcp listener: {}", self.listening_on);
Err(anyhow!("Failed to get tcp listener: {}", self.listening_on))
}
async fn start_without_tls(self, server: Http<LocalExecutor>) -> Result<()> { async fn start_without_tls(self, server: Http<LocalExecutor>) -> Result<()> {
let listener_service = async { let listener_service = async {
// let tcp_listener = TcpListener::bind(&self.listening_on).await?; let tcp_listener = TcpListener::bind(&self.listening_on).await?;
let tcp_listener = self.try_bind_tcp_listener().await?;
info!("Start TCP proxy serving with HTTP request for configured host names"); info!("Start TCP proxy serving with HTTP request for configured host names");
while let Ok((stream, _client_addr)) = tcp_listener.accept().await { while let Ok((stream, _client_addr)) = tcp_listener.accept().await {
self self

View file

@ -6,6 +6,7 @@ use futures::{future::FutureExt, select};
use hyper::{client::connect::Connect, server::conn::Http}; use hyper::{client::connect::Connect, server::conn::Http};
use rustls::ServerConfig; use rustls::ServerConfig;
use std::{sync::Arc, time::Duration}; use std::{sync::Arc, time::Duration};
use tokio::net::TcpListener;
impl<T> Proxy<T> impl<T> Proxy<T>
where where
@ -27,8 +28,7 @@ where
// TCP Listener Service, i.e., http/2 and http/1.1 // TCP Listener Service, i.e., http/2 and http/1.1
pub async fn listener_service(&self, server: Http<LocalExecutor>) -> Result<()> { pub async fn listener_service(&self, server: Http<LocalExecutor>) -> Result<()> {
// let tcp_listener = TcpListener::bind(&self.listening_on).await?; let tcp_listener = TcpListener::bind(&self.listening_on).await?;
let tcp_listener = self.try_bind_tcp_listener().await?;
info!("Start TCP proxy serving with HTTPS request for configured host names"); info!("Start TCP proxy serving with HTTPS request for configured host names");
loop { loop {
@ -71,6 +71,32 @@ where
Ok(()) as Result<()> Ok(()) as Result<()>
} }
#[cfg(feature = "h3")]
async fn get_new_server_config_h3(
&self,
peeked_conn: &mut quinn::Connecting,
) -> Option<quinn::ServerConfig> {
let hsd = if let Ok(h) = peeked_conn.handshake_data().await {
h
} else {
return None;
};
let hsd_downcast = if let Ok(d) = hsd.downcast::<quinn::crypto::rustls::HandshakeData>() {
d
} else {
return None;
};
let server_name = hsd_downcast.server_name?;
info!(
"HTTP/3 connection incoming (SNI {:?}): Overwrite ServerConfig",
server_name
);
let new_server_crypto = self.fetch_server_crypto(&server_name)?;
Some(quinn::ServerConfig::with_crypto(Arc::new(
new_server_crypto,
)))
}
#[cfg(feature = "h3")] #[cfg(feature = "h3")]
pub async fn listener_service_h3(&self) -> Result<()> { pub async fn listener_service_h3(&self) -> Result<()> {
// TODO: Work around to initially serve incoming connection // TODO: Work around to initially serve incoming connection
@ -85,60 +111,48 @@ where
.map(|(name, _)| name.to_string()) .map(|(name, _)| name.to_string())
.collect(); .collect();
ensure!(!tls_app_names.is_empty(), "No TLS supported app"); ensure!(!tls_app_names.is_empty(), "No TLS supported app");
let initial_app_name = tls_app_names.get(0).unwrap().as_str(); let initial_app_name = tls_app_names.get(0).ok_or_else(|| anyhow!(""))?.as_str();
debug!( debug!(
"HTTP/3 SNI multiplexer initial app_name: {}", "HTTP/3 SNI multiplexer initial app_name: {}",
initial_app_name initial_app_name
); );
let backend_serve = self.backends.apps.get(initial_app_name).unwrap(); let backend_serve = self
.backends
.apps
.get(initial_app_name)
.ok_or_else(|| anyhow!(""))?;
while backend_serve.get_tls_server_config().is_none() { while backend_serve.get_tls_server_config().is_none() {
tokio::time::sleep(tokio::time::Duration::from_millis(10)).await; tokio::time::sleep(tokio::time::Duration::from_millis(10)).await;
} }
let server_crypto = backend_serve.get_tls_server_config().unwrap(); let server_crypto = backend_serve
.get_tls_server_config()
.ok_or_else(|| anyhow!(""))?;
let server_config_h3 = quinn::ServerConfig::with_crypto(Arc::new(server_crypto)); let server_config_h3 = quinn::ServerConfig::with_crypto(Arc::new(server_crypto));
let (endpoint, incoming) = self.try_bind_quic_listener(server_config_h3).await?; let (endpoint, incoming) = quinn::Endpoint::server(server_config_h3, self.listening_on)?;
// quinn::Endpoint::server(server_config_h3, self.listening_on).unwrap();
info!("Start UDP proxy serving with HTTP/3 request for configured host names"); info!("Start UDP proxy serving with HTTP/3 request for configured host names");
let mut p = incoming.peekable(); let mut p = incoming.peekable();
loop { loop {
// TODO: Not sure if this properly works to handle multiple "server_name"s to host multiple hosts. // TODO: Not sure if this properly works to handle multiple "server_name"s to host multiple hosts.
// peek() should work for that. // peek() should work for that.
let success = if let Some(peeked_conn) = std::pin::Pin::new(&mut p).peek_mut().await { let peeked_conn = std::pin::Pin::new(&mut p)
let hsd = peeked_conn.handshake_data().await; .peek_mut()
let hsd_downcast = hsd? .await
.downcast::<quinn::crypto::rustls::HandshakeData>() .ok_or_else(|| anyhow!("Failed to peek"))?;
.unwrap(); let is_acceptable =
if let Some(svn) = hsd_downcast.server_name { if let Some(new_server_config) = self.get_new_server_config_h3(peeked_conn).await {
if let Some(new_server_crypto) = self.fetch_server_crypto(&svn) { // Set ServerConfig::set_server_config for given SNI
// Set ServerConfig::set_server_config for given SNI endpoint.set_server_config(Some(new_server_config));
let mut new_server_config_h3 = true
quinn::ServerConfig::with_crypto(Arc::new(new_server_crypto));
if svn == "localhost" {
new_server_config_h3.concurrent_connections(512);
}
info!(
"HTTP/3 connection incoming (SNI {:?}): Overwrite ServerConfig",
svn
);
endpoint.set_server_config(Some(new_server_config_h3));
true
} else {
false
}
} else { } else {
debug!("HTTP/3 no SNI is given");
false false
} };
} else {
false
};
// Then acquire actual connection // Then acquire actual connection
let peekable_incoming = std::pin::Pin::new(&mut p); let peekable_incoming = std::pin::Pin::new(&mut p);
if let Some(conn) = peekable_incoming.get_mut().next().await { if let Some(conn) = peekable_incoming.get_mut().next().await {
if success { if is_acceptable {
self.clone().client_serve_h3(conn).await; self.clone().client_serve_h3(conn).await;
} }
} else { } else {
@ -193,28 +207,6 @@ where
} }
} }
// Work around to forcibly get quic listener for "address already in use"
#[cfg(feature = "h3")]
async fn try_bind_quic_listener(
&self,
server_config: quinn::ServerConfig,
) -> Result<(quinn::Endpoint, quinn::Incoming)> {
let mut cnt = 0;
while cnt < GET_LISTENER_RETRY_MAX_CNT {
if let Ok(listener) = quinn::Endpoint::server(server_config.clone(), self.listening_on) {
return Ok(listener);
}
cnt += 1;
tokio::time::sleep(tokio::time::Duration::from_millis(
GET_LISTENER_RETRY_WAITING_MSEC,
))
.await;
}
error!("Failed to get quic listener: {}", self.listening_on);
Err(anyhow!("Failed to get tcp listener: {}", self.listening_on))
}
fn fetch_server_crypto(&self, server_name: &str) -> Option<ServerConfig> { fn fetch_server_crypto(&self, server_name: &str) -> Option<ServerConfig> {
let backend_serve = if let Some(backend_serve) = self.backends.apps.get(server_name) { let backend_serve = if let Some(backend_serve) = self.backends.apps.get(server_name) {
backend_serve backend_serve