This commit is contained in:
Jun Kurihara 2025-05-16 16:31:58 +09:00
commit 1da0d77309
No known key found for this signature in database
GPG key ID: B184DE07B34AA676
4 changed files with 56 additions and 51 deletions

View file

@ -26,6 +26,7 @@ pub struct BackendApp {
pub https_redirection: Option<bool>, pub https_redirection: Option<bool>,
/// tls settings: mutual TLS is enabled /// tls settings: mutual TLS is enabled
#[builder(default)] #[builder(default)]
#[allow(unused)]
pub mutual_tls: Option<bool>, pub mutual_tls: Option<bool>,
} }
impl<'a> BackendAppBuilder { impl<'a> BackendAppBuilder {

View file

@ -68,7 +68,7 @@ where
h3_conn.shutdown(0).await?; h3_conn.shutdown(0).await?;
break; break;
} }
debug!("Request incoming: current # {}", request_count.current()); trace!("Request incoming: current # {}", request_count.current());
let self_inner = self.clone(); let self_inner = self.clone();
let tls_server_name_inner = tls_server_name.clone(); let tls_server_name_inner = tls_server_name.clone();
@ -82,7 +82,7 @@ where
warn!("HTTP/3 error on serve stream: {}", e); warn!("HTTP/3 error on serve stream: {}", e);
} }
request_count.decrement(); request_count.decrement();
debug!("Request processed: current # {}", request_count.current()); trace!("Request processed: current # {}", request_count.current());
}); });
} }
} }

View file

@ -80,7 +80,7 @@ where
request_count.decrement(); request_count.decrement();
return; return;
} }
debug!("Request incoming: current # {}", request_count.current()); trace!("Request incoming: current # {}", request_count.current());
let server_clone = self.connection_builder.clone(); let server_clone = self.connection_builder.clone();
let message_handler_clone = self.message_handler.clone(); let message_handler_clone = self.message_handler.clone();
@ -110,7 +110,7 @@ where
} }
request_count.decrement(); request_count.decrement();
debug!("Request processed: current # {}", request_count.current()); trace!("Request processed: current # {}", request_count.current());
}); });
} }
@ -131,16 +131,8 @@ where
/// Start with TLS (HTTPS) /// Start with TLS (HTTPS)
pub(super) async fn start_with_tls(&self, cancel_token: CancellationToken) -> RpxyResult<()> { pub(super) async fn start_with_tls(&self, cancel_token: CancellationToken) -> RpxyResult<()> {
#[cfg(not(any(feature = "http3-quinn", feature = "http3-s2n")))] // By default, TLS listener is spawned
{ let join_handle_tls = self.globals.runtime_handle.spawn({
self.tls_listener_service().await?;
error!("TCP proxy service for TLS exited");
Ok(())
}
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
{
if self.globals.proxy_config.http3 {
let jh_tls = self.globals.runtime_handle.spawn({
let self_clone = self.clone(); let self_clone = self.clone();
let cancel_token = cancel_token.clone(); let cancel_token = cancel_token.clone();
async move { async move {
@ -155,7 +147,23 @@ where
} }
} }
}); });
let jh_h3 = self.globals.runtime_handle.spawn({
#[cfg(not(any(feature = "http3-quinn", feature = "http3-s2n")))]
{
let _ = join_handle_tls.await;
Ok(())
}
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
{
// If HTTP/3 is not enabled, wait for TLS listener to finish
if !self.globals.proxy_config.http3 {
let _ = join_handle_tls.await;
return Ok(());
}
// If HTTP/3 is enabled, spawn a task to handle HTTP/3 connections
let join_handle_h3 = self.globals.runtime_handle.spawn({
let self_clone = self.clone(); let self_clone = self.clone();
async move { async move {
select! { select! {
@ -169,14 +177,9 @@ where
} }
} }
}); });
let _ = futures::future::join(jh_tls, jh_h3).await; let _ = futures::future::join(join_handle_tls, join_handle_h3).await;
Ok(()) Ok(())
} else {
self.tls_listener_service().await?;
error!("TCP proxy service for TLS exited");
Ok(())
}
} }
} }

View file

@ -16,10 +16,12 @@ pub(super) fn bind_tcp_socket(listening_on: &SocketAddr) -> RpxyResult<TcpSocket
}?; }?;
tcp_socket.set_reuseaddr(true)?; tcp_socket.set_reuseaddr(true)?;
tcp_socket.set_reuseport(true)?; tcp_socket.set_reuseport(true)?;
if let Err(e) = tcp_socket.bind(*listening_on) {
tcp_socket.bind(*listening_on).map_err(|e| {
error!("Failed to bind TCP socket: {}", e); error!("Failed to bind TCP socket: {}", e);
return Err(RpxyError::Io(e)); RpxyError::Io(e)
}; })?;
Ok(tcp_socket) Ok(tcp_socket)
} }
@ -36,11 +38,10 @@ pub(super) fn bind_udp_socket(listening_on: &SocketAddr) -> RpxyResult<UdpSocket
socket.set_reuse_port(true)?; socket.set_reuse_port(true)?;
socket.set_nonblocking(true)?; // This was made true inside quinn. so this line isn't necessary here. but just in case. socket.set_nonblocking(true)?; // This was made true inside quinn. so this line isn't necessary here. but just in case.
if let Err(e) = socket.bind(&(*listening_on).into()) { socket.bind(&(*listening_on).into()).map_err(|e| {
error!("Failed to bind UDP socket: {}", e); error!("Failed to bind UDP socket: {}", e);
return Err(RpxyError::Io(e)); RpxyError::Io(e)
}; })?;
let udp_socket: UdpSocket = socket.into();
Ok(udp_socket) Ok(socket.into())
} }