wip: fixing dynamic reloading

This commit is contained in:
Jun Kurihara 2024-07-26 23:25:40 +09:00
commit 8d9f07a848
No known key found for this signature in database
GPG key ID: D992B3E3DE1DED23
3 changed files with 44 additions and 32 deletions

View file

@ -44,19 +44,21 @@ fn main() {
.unwrap();
tokio::select! {
Err(e) = config_service.start() => {
config_res = config_service.start() => {
if let Err(e) = config_res {
error!("config reloader service exited: {e}");
std::process::exit(1);
}
Err(e) = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => {
}
rpxy_res = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => {
if let Err(e) = rpxy_res {
error!("rpxy service existed: {e}");
std::process::exit(1);
}
else => {
}
}
std::process::exit(0);
}
}
}
});
}

View file

@ -19,7 +19,7 @@ use crate::{
message_handler::HttpMessageHandlerBuilder,
proxy::Proxy,
};
use futures::future::select_all;
use futures::future::join_all;
use hot_reload::ReloaderReceiver;
use rpxy_certs::ServerCryptoBase;
use std::sync::Arc;
@ -130,8 +130,9 @@ pub async fn entrypoint(
let connection_builder = proxy::connection_builder(&globals);
// spawn each proxy for a given socket with copied Arc-ed backend, message_handler and connection builder.
let parent_cancel_token = globals.cancel_token.clone().unwrap_or_default();
let addresses = globals.proxy_config.listen_sockets.clone();
let futures_iter = addresses.into_iter().map(|listening_on| {
let join_handles = addresses.into_iter().map(|listening_on| {
let mut tls_enabled = false;
if let Some(https_port) = globals.proxy_config.https_port {
tls_enabled = https_port == listening_on.port()
@ -143,11 +144,37 @@ pub async fn entrypoint(
connection_builder: connection_builder.clone(),
message_handler: message_handler.clone(),
};
globals.runtime_handle.spawn(async move { proxy.start().await })
let cancel_token = parent_cancel_token.child_token();
let parent_cancel_token_clone = parent_cancel_token.clone();
globals.runtime_handle.spawn(async move {
info!("rpxy proxy service for {listening_on} started");
tokio::select! {
_ = cancel_token.cancelled() => {
info!("rpxy proxy service for {listening_on} terminated");
Ok(())
},
proxy_res = proxy.start() => {
info!("rpxy proxy service for {listening_on} exited");
// cancel other proxy tasks
parent_cancel_token_clone.cancel();
proxy_res
}
}
})
});
if let (Ok(Err(e)), _, _) = select_all(futures_iter).await {
let join_res = join_all(join_handles).await;
let mut errs = join_res.into_iter().filter_map(|res| {
if let Ok(Err(e)) = res {
error!("Some proxy services are down: {}", e);
Some(e)
} else {
None
}
});
// returns the first error as the representative error
if let Some(e) = errs.next() {
return Err(e);
}

View file

@ -312,23 +312,6 @@ where
}
};
match &self.globals.cancel_token {
Some(cancel_token) => {
select! {
_ = proxy_service.fuse() => {
warn!("Proxy service got down");
}
_ = cancel_token.cancelled().fuse() => {
info!("Proxy service listening on {} receives term signal", self.listening_on);
}
}
}
None => {
proxy_service.await?;
warn!("Proxy service got down");
}
}
Ok(())
proxy_service.await
}
}