fix: redesigned graceful shutdown for config update

This commit is contained in:
Jun Kurihara 2024-07-27 03:32:35 +09:00
commit 48b33409f9
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
4 changed files with 213 additions and 197 deletions

View file

@ -1,6 +1,6 @@
pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"];
pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"];
pub const CONFIG_WATCH_DELAY_SECS: u32 = 20;
pub const CONFIG_WATCH_DELAY_SECS: u32 = 2;
#[cfg(feature = "cache")]
// Cache directory

View file

@ -16,6 +16,8 @@ use crate::{
};
use hot_reload::{ReloaderReceiver, ReloaderService};
use rpxy_lib::{entrypoint, RpxyOptions, RpxyOptionsBuilder};
use std::sync::Arc;
use tokio_util::sync::CancellationToken;
fn main() {
init_logger();
@ -62,51 +64,201 @@ fn main() {
});
}
/// rpxy service definition
struct RpxyService {
runtime_handle: tokio::runtime::Handle,
proxy_conf: rpxy_lib::ProxyConfig,
app_conf: rpxy_lib::AppConfigList,
cert_service: Option<Arc<ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>>>,
cert_rx: Option<ReloaderReceiver<rpxy_certs::ServerCryptoBase>>,
#[cfg(feature = "acme")]
acme_manager: Option<rpxy_acme::AcmeManager>,
}
impl RpxyService {
async fn new(config_toml: &ConfigToml, runtime_handle: tokio::runtime::Handle) -> Result<Self, anyhow::Error> {
let (proxy_conf, app_conf) = build_settings(config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;
let (cert_service, cert_rx) = build_cert_manager(config_toml)
.await
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?
.map(|(s, r)| (Some(Arc::new(s)), Some(r)))
.unwrap_or((None, None));
Ok(RpxyService {
runtime_handle: runtime_handle.clone(),
proxy_conf,
app_conf,
cert_service,
cert_rx,
#[cfg(feature = "acme")]
acme_manager: build_acme_manager(config_toml, runtime_handle.clone()).await?,
})
}
async fn start(&self, cancel_token: Option<CancellationToken>) -> Result<(), anyhow::Error> {
let RpxyService {
runtime_handle,
proxy_conf,
app_conf,
cert_service: _,
cert_rx,
#[cfg(feature = "acme")]
acme_manager,
} = self;
#[cfg(feature = "acme")]
{
let (acme_join_handles, server_config_acme_challenge) = acme_manager
.as_ref()
.map(|m| m.spawn_manager_tasks(cancel_token.as_ref().map(|t| t.child_token())))
.unwrap_or((vec![], Default::default()));
let rpxy_opts = RpxyOptionsBuilder::default()
.proxy_config(proxy_conf.clone())
.app_config_list(app_conf.clone())
.cert_rx(cert_rx.clone())
.runtime_handle(runtime_handle.clone())
.cancel_token(cancel_token.as_ref().map(|t| t.child_token()))
.server_configs_acme_challenge(Arc::new(server_config_acme_challenge))
.build()?;
self
.start_inner(rpxy_opts, acme_join_handles) //, &runtime_handle)
.await
.map_err(|e| anyhow!(e))
}
#[cfg(not(feature = "acme"))]
{
let rpxy_opts = RpxyOptionsBuilder::default()
.proxy_config(proxy_conf.clone())
.app_config_list(app_conf.clone())
.cert_rx(cert_rx.clone())
.runtime_handle(runtime_handle.clone())
.cancel_token(cancel_token.as_ref().map(|t| t.child_token()))
.build()?;
self
.start_inner(rpxy_opts) //, &runtime_handle)
.await
.map_err(|e| anyhow!(e))
}
}
/// Wrapper of entry point for rpxy service with certificate management service
async fn start_inner(
&self,
rpxy_opts: RpxyOptions,
// cert_service: Option<&Arc<ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>>>,
#[cfg(feature = "acme")] acme_task_handles: Vec<tokio::task::JoinHandle<()>>,
// runtime_handle: &tokio::runtime::Handle,
) -> Result<(), anyhow::Error> {
let cancel_token = rpxy_opts.cancel_token.clone().unwrap_or_default();
let runtime_handle = rpxy_opts.runtime_handle.clone();
// spawn rpxy entry point
let cancel_token_clone = cancel_token.clone();
let child_cancel_token = cancel_token.child_token();
let rpxy_handle = runtime_handle.spawn(async move {
tokio::select! {
rpxy_res = entrypoint(&rpxy_opts) => {
if let Err(ref e) = rpxy_res {
error!("rpxy entrypoint exited on error: {e}");
}
cancel_token_clone.cancel();
rpxy_res.map_err(|e| anyhow!(e))
}
_ = child_cancel_token.cancelled() => {
debug!("rpxy entrypoint terminated by cancel token");
Ok(())
}
}
});
if self.cert_service.is_none() {
return rpxy_handle.await?;
}
// spawn certificate reloader service
let cert_service = self.cert_service.as_ref().unwrap().clone();
let cancel_token_clone = cancel_token.clone();
let child_cancel_token = cancel_token.child_token();
let cert_handle = runtime_handle.spawn(async move {
tokio::select! {
cert_res = cert_service.start() => {
if let Err(ref e) = cert_res {
error!("cert reloader service exited on error: {e}");
}
cancel_token_clone.cancel();
cert_res.map_err(|e| anyhow!(e))
}
_ = child_cancel_token.cancelled() => {
debug!("cert reloader service terminated by cancel token");
Ok(())
}
}
});
#[cfg(not(feature = "acme"))]
{
let (rpxy_res, cert_res) = tokio::join!(rpxy_handle, cert_handle);
let (rpxy_res, cert_res) = (rpxy_res?, cert_res?);
match (rpxy_res, cert_res) {
(Ok(()), Ok(())) => Ok(()),
(Err(e), _) => Err(e),
(_, Err(e)) => Err(e),
}
}
#[cfg(feature = "acme")]
{
if acme_task_handles.is_empty() {
let (rpxy_res, cert_res) = tokio::join!(rpxy_handle, cert_handle);
let (rpxy_res, cert_res) = (rpxy_res?, cert_res?);
return match (rpxy_res, cert_res) {
(Ok(()), Ok(())) => Ok(()),
(Err(e), _) => Err(e),
(_, Err(e)) => Err(e),
};
}
// spawn acme manager tasks
let select_all = futures_util::future::select_all(acme_task_handles);
let cancel_token_clone = cancel_token.clone();
let child_cancel_token = cancel_token.child_token();
let acme_handle = runtime_handle.spawn(async move {
tokio::select! {
(acme_res, _, _) = select_all => {
if let Err(ref e) = acme_res {
error!("acme manager exited on error: {e}");
}
cancel_token_clone.cancel();
acme_res.map_err(|e| anyhow!(e))
}
_ = child_cancel_token.cancelled() => {
debug!("acme manager terminated by cancel token");
Ok(())
}
}
});
let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle);
let (rpxy_res, cert_res, acme_res) = (rpxy_res?, cert_res?, acme_res?);
match (rpxy_res, cert_res, acme_res) {
(Ok(()), Ok(()), Ok(())) => Ok(()),
(Err(e), _, _) => Err(e),
(_, Err(e), _) => Err(e),
(_, _, Err(e)) => Err(e),
}
}
}
}
async fn rpxy_service_without_watcher(
config_file_path: &str,
runtime_handle: tokio::runtime::Handle,
) -> Result<(), anyhow::Error> {
info!("Start rpxy service");
let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?;
let (proxy_conf, app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;
let (cert_service, cert_rx) = build_cert_manager(&config_toml)
.await
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?
.map(|(s, r)| (Some(s), Some(r)))
.unwrap_or((None, None));
#[cfg(feature = "acme")]
{
let acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?;
let (acme_join_handles, server_config_acme_challenge) = acme_manager
.as_ref()
.map(|m| m.spawn_manager_tasks(None))
.unwrap_or((vec![], Default::default()));
let rpxy_opts = RpxyOptionsBuilder::default()
.proxy_config(proxy_conf)
.app_config_list(app_conf)
.cert_rx(cert_rx)
.runtime_handle(runtime_handle.clone())
.server_configs_acme_challenge(std::sync::Arc::new(server_config_acme_challenge))
.build()?;
rpxy_entrypoint(&rpxy_opts, cert_service.as_ref(), acme_join_handles) //, &runtime_handle)
.await
.map_err(|e| anyhow!(e))
}
#[cfg(not(feature = "acme"))]
{
let rpxy_opts = RpxyOptionsBuilder::default()
.proxy_config(proxy_conf.clone())
.app_config_list(app_conf.clone())
.cert_rx(cert_rx.clone())
.runtime_handle(runtime_handle.clone())
.build()?;
rpxy_entrypoint(&rpxy_opts, cert_service.as_ref()) //, &runtime_handle)
.await
.map_err(|e| anyhow!(e))
}
let service = RpxyService::new(&config_toml, runtime_handle).await?;
service.start(None).await
}
async fn rpxy_service_with_watcher(
@ -120,176 +272,41 @@ async fn rpxy_service_with_watcher(
.borrow()
.clone()
.ok_or(anyhow!("Something wrong in config reloader receiver"))?;
let (mut proxy_conf, mut app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;
#[cfg(feature = "acme")]
let mut acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?;
let mut cert_service_and_rx = build_cert_manager(&config_toml)
.await
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?;
let mut service = RpxyService::new(&config_toml, runtime_handle.clone()).await?;
// Continuous monitoring
loop {
// Notifier for proxy service termination
let cancel_token = tokio_util::sync::CancellationToken::new();
let (cert_service, cert_rx) = cert_service_and_rx
.as_ref()
.map(|(s, r)| (Some(s), Some(r)))
.unwrap_or((None, None));
#[cfg(feature = "acme")]
let (acme_join_handles, server_config_acme_challenge) = acme_manager
.as_ref()
.map(|m| m.spawn_manager_tasks(Some(cancel_token.child_token())))
.unwrap_or((vec![], Default::default()));
let rpxy_opts = {
#[cfg(feature = "acme")]
let res = RpxyOptionsBuilder::default()
.proxy_config(proxy_conf.clone())
.app_config_list(app_conf.clone())
.cert_rx(cert_rx.cloned())
.runtime_handle(runtime_handle.clone())
.cancel_token(Some(cancel_token.child_token()))
.server_configs_acme_challenge(std::sync::Arc::new(server_config_acme_challenge))
.build();
#[cfg(not(feature = "acme"))]
let res = RpxyOptionsBuilder::default()
.proxy_config(proxy_conf.clone())
.app_config_list(app_conf.clone())
.cert_rx(cert_rx.cloned())
.runtime_handle(runtime_handle.clone())
.term_notify(Some(term_notify.clone()))
.build();
res
}?;
tokio::select! {
rpxy_res = {
#[cfg(feature = "acme")]
{
rpxy_entrypoint(&rpxy_opts, cert_service, acme_join_handles)//, &runtime_handle)
/* ---------- */
rpxy_res = service.start(Some(cancel_token.clone())) => {
if let Err(ref e) = rpxy_res {
error!("rpxy service exited on error: {e}");
} else {
error!("rpxy service exited");
}
#[cfg(not(feature = "acme"))]
{
rpxy_entrypoint(&rpxy_opts, cert_service)//, &runtime_handle)
}
} => {
error!("rpxy entrypoint or cert service exited");
return rpxy_res.map_err(|e| anyhow!(e));
}
/* ---------- */
_ = config_rx.changed() => {
let Some(config_toml) = config_rx.borrow().clone() else {
let Some(new_config_toml) = config_rx.borrow().clone() else {
error!("Something wrong in config reloader receiver");
return Err(anyhow!("Something wrong in config reloader receiver"));
};
match build_settings(&config_toml) {
Ok((p, a)) => {
(proxy_conf, app_conf) = (p, a)
match RpxyService::new(&new_config_toml, runtime_handle.clone()).await {
Ok(new_service) => {
info!("Configuration updated.");
service = new_service;
},
Err(e) => {
error!("Invalid configuration. Configuration does not updated: {e}");
continue;
error!("rpxy failed to be ready. Configuration does not updated: {e}");
}
};
match build_cert_manager(&config_toml).await {
Ok(c) => {
cert_service_and_rx = c;
},
Err(e) => {
error!("Invalid cert configuration. Configuration does not updated: {e}");
continue;
}
};
#[cfg(feature = "acme")]
{
match build_acme_manager(&config_toml, runtime_handle.clone()).await {
Ok(m) => {
acme_manager = m;
},
Err(e) => {
error!("Invalid acme configuration. Configuration does not updated: {e}");
continue;
}
}
}
info!("Configuration updated. Terminate all spawned services and force to re-bind TCP/UDP sockets");
info!("Terminate all spawned services and force to re-bind TCP/UDP sockets");
cancel_token.cancel();
}
else => break
}
}
Ok(())
}
#[cfg(not(feature = "acme"))]
/// Wrapper of entry point for rpxy service with certificate management service
async fn rpxy_entrypoint(
rpxy_opts: &RpxyOptions,
cert_service: Option<&ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>>,
// runtime_handle: &tokio::runtime::Handle,
) -> Result<(), anyhow::Error> {
// TODO: refactor: update routine
if let Some(cert_service) = cert_service {
tokio::select! {
rpxy_res = entrypoint(rpxy_opts) => {
error!("rpxy entrypoint exited");
rpxy_res.map_err(|e| anyhow!(e))
}
cert_res = cert_service.start() => {
error!("cert reloader service exited");
cert_res.map_err(|e| anyhow!(e))
}
}
} else {
entrypoint(rpxy_opts).await.map_err(|e| anyhow!(e))
}
}
#[cfg(feature = "acme")]
/// Wrapper of entry point for rpxy service with certificate management service
async fn rpxy_entrypoint(
rpxy_opts: &RpxyOptions,
cert_service: Option<&ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>>,
acme_task_handles: Vec<tokio::task::JoinHandle<()>>,
// runtime_handle: &tokio::runtime::Handle,
) -> Result<(), anyhow::Error> {
// TODO: refactor: update routine
if let Some(cert_service) = cert_service {
if acme_task_handles.is_empty() {
tokio::select! {
rpxy_res = entrypoint(rpxy_opts) => {
error!("rpxy entrypoint exited");
rpxy_res.map_err(|e| anyhow!(e))
}
cert_res = cert_service.start() => {
error!("cert reloader service exited");
cert_res.map_err(|e| anyhow!(e))
}
}
} else {
let select_all = futures_util::future::select_all(acme_task_handles);
tokio::select! {
rpxy_res = entrypoint(rpxy_opts) => {
error!("rpxy entrypoint exited");
rpxy_res.map_err(|e| anyhow!(e))
}
(acme_res, _, _) = select_all => {
error!("acme manager exited");
acme_res.map_err(|e| anyhow!(e))
}
cert_res = cert_service.start() => {
error!("cert reloader service exited");
cert_res.map_err(|e| anyhow!(e))
}
}
}
} else {
entrypoint(rpxy_opts).await.map_err(|e| anyhow!(e))
}
}