refactor: simplify the watcher option of inner handler
This commit is contained in:
parent
4b3f719759
commit
8d0adde1b0
5 changed files with 79 additions and 72 deletions
|
|
@ -2,7 +2,6 @@ use crate::{constants::*, count::RequestCount};
|
|||
use hot_reload::ReloaderReceiver;
|
||||
use rpxy_certs::ServerCryptoBase;
|
||||
use std::{net::SocketAddr, time::Duration};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// Global object containing proxy configurations and shared object like counters.
|
||||
/// But note that in Globals, we do not have Mutex and RwLock. It is indeed, the context shared among async tasks.
|
||||
|
|
@ -13,8 +12,6 @@ pub struct Globals {
|
|||
pub request_count: RequestCount,
|
||||
/// Shared context - Async task runtime handler
|
||||
pub runtime_handle: tokio::runtime::Handle,
|
||||
/// Shared context - Notify object to stop async tasks
|
||||
pub cancel_token: Option<CancellationToken>,
|
||||
/// Shared context - Certificate reloader service receiver // TODO: newer one
|
||||
pub cert_reloader_rx: Option<ReloaderReceiver<ServerCryptoBase>>,
|
||||
|
||||
|
|
|
|||
|
|
@ -43,8 +43,6 @@ pub struct RpxyOptions {
|
|||
pub cert_rx: Option<ReloaderReceiver<ServerCryptoBase>>, // TODO:
|
||||
/// Async task runtime handler
|
||||
pub runtime_handle: tokio::runtime::Handle,
|
||||
/// Notify object to stop async tasks
|
||||
pub cancel_token: Option<CancellationToken>,
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
/// ServerConfig used for only ACME challenge for ACME domains
|
||||
|
|
@ -58,10 +56,10 @@ pub async fn entrypoint(
|
|||
app_config_list,
|
||||
cert_rx, // TODO:
|
||||
runtime_handle,
|
||||
cancel_token,
|
||||
#[cfg(feature = "acme")]
|
||||
server_configs_acme_challenge,
|
||||
}: &RpxyOptions,
|
||||
cancel_token: CancellationToken,
|
||||
) -> RpxyResult<()> {
|
||||
#[cfg(all(feature = "http3-quinn", feature = "http3-s2n"))]
|
||||
warn!("Both \"http3-quinn\" and \"http3-s2n\" features are enabled. \"http3-quinn\" will be used");
|
||||
|
|
@ -117,7 +115,6 @@ pub async fn entrypoint(
|
|||
proxy_config: proxy_config.clone(),
|
||||
request_count: Default::default(),
|
||||
runtime_handle: runtime_handle.clone(),
|
||||
cancel_token: cancel_token.clone(),
|
||||
cert_reloader_rx: cert_rx.clone(),
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
|
|
@ -153,25 +150,21 @@ pub async fn entrypoint(
|
|||
message_handler: message_handler.clone(),
|
||||
};
|
||||
|
||||
let cancel_token = globals.cancel_token.as_ref().map(|t| t.child_token());
|
||||
let parent_cancel_token_clone = globals.cancel_token.clone();
|
||||
let cancel_token = cancel_token.clone();
|
||||
globals.runtime_handle.spawn(async move {
|
||||
info!("rpxy proxy service for {listening_on} started");
|
||||
if let Some(cancel_token) = cancel_token {
|
||||
tokio::select! {
|
||||
_ = cancel_token.cancelled() => {
|
||||
debug!("rpxy proxy service for {listening_on} terminated");
|
||||
Ok(())
|
||||
},
|
||||
proxy_res = proxy.start() => {
|
||||
info!("rpxy proxy service for {listening_on} exited");
|
||||
// cancel other proxy tasks
|
||||
parent_cancel_token_clone.unwrap().cancel();
|
||||
proxy_res
|
||||
}
|
||||
|
||||
tokio::select! {
|
||||
_ = cancel_token.cancelled() => {
|
||||
debug!("rpxy proxy service for {listening_on} terminated");
|
||||
Ok(())
|
||||
},
|
||||
proxy_res = proxy.start(cancel_token.child_token()) => {
|
||||
info!("rpxy proxy service for {listening_on} exited");
|
||||
// cancel other proxy tasks
|
||||
cancel_token.cancel();
|
||||
proxy_res
|
||||
}
|
||||
} else {
|
||||
proxy.start().await
|
||||
}
|
||||
})
|
||||
});
|
||||
|
|
|
|||
|
|
@ -22,6 +22,7 @@ use hyper_util::{client::legacy::connect::Connect, rt::TokioIo, server::conn::au
|
|||
use rpxy_certs::ServerCrypto;
|
||||
use std::{net::SocketAddr, sync::Arc, time::Duration};
|
||||
use tokio::time::timeout;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// Wrapper function to handle request for HTTP/1.1 and HTTP/2
|
||||
/// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler
|
||||
|
|
@ -129,7 +130,7 @@ where
|
|||
}
|
||||
|
||||
/// Start with TLS (HTTPS)
|
||||
pub(super) async fn start_with_tls(&self) -> RpxyResult<()> {
|
||||
pub(super) async fn start_with_tls(&self, cancel_token: CancellationToken) -> RpxyResult<()> {
|
||||
#[cfg(not(any(feature = "http3-quinn", feature = "http3-s2n")))]
|
||||
{
|
||||
self.tls_listener_service().await?;
|
||||
|
|
@ -139,14 +140,37 @@ where
|
|||
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
|
||||
{
|
||||
if self.globals.proxy_config.http3 {
|
||||
select! {
|
||||
_ = self.tls_listener_service().fuse() => {
|
||||
error!("TCP proxy service for TLS exited");
|
||||
},
|
||||
_ = self.h3_listener_service().fuse() => {
|
||||
error!("UDP proxy service for QUIC exited");
|
||||
let jh_tls = self.globals.runtime_handle.spawn({
|
||||
let self_clone = self.clone();
|
||||
let cancel_token = cancel_token.clone();
|
||||
async move {
|
||||
select! {
|
||||
_ = self_clone.tls_listener_service().fuse() => {
|
||||
error!("TCP proxy service for TLS exited");
|
||||
cancel_token.cancel();
|
||||
},
|
||||
_ = cancel_token.cancelled().fuse() => {
|
||||
debug!("Cancel token is called for TLS listener");
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
});
|
||||
let jh_h3 = self.globals.runtime_handle.spawn({
|
||||
let self_clone = self.clone();
|
||||
async move {
|
||||
select! {
|
||||
_ = self_clone.h3_listener_service().fuse() => {
|
||||
error!("UDP proxy service for QUIC exited");
|
||||
cancel_token.cancel();
|
||||
},
|
||||
_ = cancel_token.cancelled().fuse() => {
|
||||
debug!("Cancel token is called for QUIC listener");
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
let _ = futures::future::join(jh_tls, jh_h3).await;
|
||||
|
||||
Ok(())
|
||||
} else {
|
||||
self.tls_listener_service().await?;
|
||||
|
|
@ -303,10 +327,10 @@ where
|
|||
}
|
||||
|
||||
/// Entrypoint for HTTP/1.1, 2 and 3 servers
|
||||
pub async fn start(&self) -> RpxyResult<()> {
|
||||
pub async fn start(&self, cancel_token: CancellationToken) -> RpxyResult<()> {
|
||||
let proxy_service = async {
|
||||
if self.tls_enabled {
|
||||
self.start_with_tls().await
|
||||
self.start_with_tls(cancel_token).await
|
||||
} else {
|
||||
self.start_without_tls().await
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue