chore: refactor dynamic reloading
This commit is contained in:
parent
e48efa0109
commit
3b918af40b
6 changed files with 54 additions and 66 deletions
|
|
@ -1,5 +1,5 @@
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "0.9.0-alpha.1"
|
version = "0.9.0-alpha.2"
|
||||||
authors = ["Jun Kurihara"]
|
authors = ["Jun Kurihara"]
|
||||||
homepage = "https://github.com/junkurihara/rust-rpxy"
|
homepage = "https://github.com/junkurihara/rust-rpxy"
|
||||||
repository = "https://github.com/junkurihara/rust-rpxy"
|
repository = "https://github.com/junkurihara/rust-rpxy"
|
||||||
|
|
|
||||||
|
|
@ -115,7 +115,7 @@ impl AcmeManager {
|
||||||
if let Some(cancel_token) = cancel_token.as_ref() {
|
if let Some(cancel_token) = cancel_token.as_ref() {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = task => {},
|
_ = task => {},
|
||||||
_ = cancel_token.cancelled() => { info!("rpxy ACME manager task for {domain} terminated") }
|
_ = cancel_token.cancelled() => { debug!("rpxy ACME manager task for {domain} terminated") }
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
task.await;
|
task.await;
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"];
|
pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"];
|
||||||
pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"];
|
pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"];
|
||||||
pub const CONFIG_WATCH_DELAY_SECS: u32 = 20;
|
pub const CONFIG_WATCH_DELAY_SECS: u32 = 15;
|
||||||
|
|
||||||
#[cfg(feature = "cache")]
|
#[cfg(feature = "cache")]
|
||||||
// Cache directory
|
// Cache directory
|
||||||
|
|
|
||||||
|
|
@ -121,10 +121,7 @@ impl RpxyService {
|
||||||
.cancel_token(cancel_token.as_ref().map(|t| t.child_token()))
|
.cancel_token(cancel_token.as_ref().map(|t| t.child_token()))
|
||||||
.server_configs_acme_challenge(Arc::new(server_config_acme_challenge))
|
.server_configs_acme_challenge(Arc::new(server_config_acme_challenge))
|
||||||
.build()?;
|
.build()?;
|
||||||
self
|
self.start_inner(rpxy_opts, acme_join_handles).await.map_err(|e| anyhow!(e))
|
||||||
.start_inner(rpxy_opts, acme_join_handles) //, &runtime_handle)
|
|
||||||
.await
|
|
||||||
.map_err(|e| anyhow!(e))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(not(feature = "acme"))]
|
#[cfg(not(feature = "acme"))]
|
||||||
|
|
@ -136,10 +133,7 @@ impl RpxyService {
|
||||||
.runtime_handle(runtime_handle.clone())
|
.runtime_handle(runtime_handle.clone())
|
||||||
.cancel_token(cancel_token.as_ref().map(|t| t.child_token()))
|
.cancel_token(cancel_token.as_ref().map(|t| t.child_token()))
|
||||||
.build()?;
|
.build()?;
|
||||||
self
|
self.start_inner(rpxy_opts).await.map_err(|e| anyhow!(e))
|
||||||
.start_inner(rpxy_opts) //, &runtime_handle)
|
|
||||||
.await
|
|
||||||
.map_err(|e| anyhow!(e))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -147,54 +141,50 @@ impl RpxyService {
|
||||||
async fn start_inner(
|
async fn start_inner(
|
||||||
&self,
|
&self,
|
||||||
rpxy_opts: RpxyOptions,
|
rpxy_opts: RpxyOptions,
|
||||||
// cert_service: Option<&Arc<ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>>>,
|
|
||||||
#[cfg(feature = "acme")] acme_task_handles: Vec<tokio::task::JoinHandle<()>>,
|
#[cfg(feature = "acme")] acme_task_handles: Vec<tokio::task::JoinHandle<()>>,
|
||||||
// runtime_handle: &tokio::runtime::Handle,
|
|
||||||
) -> Result<(), anyhow::Error> {
|
) -> Result<(), anyhow::Error> {
|
||||||
let cancel_token = rpxy_opts.cancel_token.clone().unwrap_or_default();
|
let cancel_token = rpxy_opts.cancel_token.clone();
|
||||||
let runtime_handle = rpxy_opts.runtime_handle.clone();
|
let runtime_handle = rpxy_opts.runtime_handle.clone();
|
||||||
|
|
||||||
// spawn rpxy entry point
|
// spawn rpxy entrypoint, where cancellation token is possibly contained inside the service
|
||||||
let cancel_token_clone = cancel_token.clone();
|
let cancel_token_clone = cancel_token.clone();
|
||||||
let child_cancel_token = cancel_token.child_token();
|
|
||||||
let rpxy_handle = runtime_handle.spawn(async move {
|
let rpxy_handle = runtime_handle.spawn(async move {
|
||||||
tokio::select! {
|
if let Err(e) = entrypoint(&rpxy_opts).await {
|
||||||
rpxy_res = entrypoint(&rpxy_opts) => {
|
|
||||||
if let Err(ref e) = rpxy_res {
|
|
||||||
error!("rpxy entrypoint exited on error: {e}");
|
error!("rpxy entrypoint exited on error: {e}");
|
||||||
|
if let Some(cancel_token) = cancel_token_clone {
|
||||||
|
cancel_token.cancel();
|
||||||
}
|
}
|
||||||
cancel_token_clone.cancel();
|
return Err(anyhow!(e));
|
||||||
rpxy_res.map_err(|e| anyhow!(e))
|
|
||||||
}
|
}
|
||||||
_ = child_cancel_token.cancelled() => {
|
|
||||||
debug!("rpxy entrypoint terminated by cancel token");
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
|
|
||||||
if self.cert_service.is_none() {
|
if self.cert_service.is_none() {
|
||||||
return rpxy_handle.await?;
|
return rpxy_handle.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
// spawn certificate reloader service
|
// spawn certificate reloader service, where cert service does not have cancellation token inside the service
|
||||||
let cert_service = self.cert_service.as_ref().unwrap().clone();
|
let cert_service = self.cert_service.as_ref().unwrap().clone();
|
||||||
let cancel_token_clone = cancel_token.clone();
|
let cancel_token_clone = cancel_token.clone();
|
||||||
let child_cancel_token = cancel_token.child_token();
|
let child_cancel_token = cancel_token.as_ref().map(|c| c.child_token());
|
||||||
let cert_handle = runtime_handle.spawn(async move {
|
let cert_handle = runtime_handle.spawn(async move {
|
||||||
|
if let Some(child_cancel_token) = child_cancel_token {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
cert_res = cert_service.start() => {
|
cert_res = cert_service.start() => {
|
||||||
if let Err(ref e) = cert_res {
|
if let Err(ref e) = cert_res {
|
||||||
error!("cert reloader service exited on error: {e}");
|
error!("cert reloader service exited on error: {e}");
|
||||||
}
|
}
|
||||||
cancel_token_clone.cancel();
|
cancel_token_clone.unwrap().cancel();
|
||||||
cert_res.map_err(|e| anyhow!(e))
|
cert_res.map_err(|e| anyhow!(e))
|
||||||
}
|
}
|
||||||
_ = child_cancel_token.cancelled() => {
|
_ = child_cancel_token.cancelled() => {
|
||||||
debug!("cert reloader service terminated by cancel token");
|
debug!("cert reloader service terminated");
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
cert_service.start().await.map_err(|e| anyhow!(e))
|
||||||
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
#[cfg(not(feature = "acme"))]
|
#[cfg(not(feature = "acme"))]
|
||||||
|
|
@ -220,24 +210,18 @@ impl RpxyService {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
// spawn acme manager tasks
|
// spawn acme manager tasks, where cancellation token is possibly contained inside the service
|
||||||
let select_all = futures_util::future::select_all(acme_task_handles);
|
let select_all = futures_util::future::select_all(acme_task_handles);
|
||||||
let cancel_token_clone = cancel_token.clone();
|
let cancel_token_clone = cancel_token.clone();
|
||||||
let child_cancel_token = cancel_token.child_token();
|
|
||||||
let acme_handle = runtime_handle.spawn(async move {
|
let acme_handle = runtime_handle.spawn(async move {
|
||||||
tokio::select! {
|
let (acme_res, _, _) = select_all.await;
|
||||||
(acme_res, _, _) = select_all => {
|
|
||||||
if let Err(ref e) = acme_res {
|
if let Err(ref e) = acme_res {
|
||||||
error!("acme manager exited on error: {e}");
|
error!("acme manager exited on error: {e}");
|
||||||
}
|
}
|
||||||
cancel_token_clone.cancel();
|
if let Some(cancel_token) = cancel_token_clone {
|
||||||
|
cancel_token.cancel();
|
||||||
|
}
|
||||||
acme_res.map_err(|e| anyhow!(e))
|
acme_res.map_err(|e| anyhow!(e))
|
||||||
}
|
|
||||||
_ = child_cancel_token.cancelled() => {
|
|
||||||
debug!("acme manager terminated by cancel token");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle);
|
let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle);
|
||||||
let (rpxy_res, cert_res, acme_res) = (rpxy_res?, cert_res?, acme_res?);
|
let (rpxy_res, cert_res, acme_res) = (rpxy_res?, cert_res?, acme_res?);
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@ pub struct Globals {
|
||||||
/// Shared context - Async task runtime handler
|
/// Shared context - Async task runtime handler
|
||||||
pub runtime_handle: tokio::runtime::Handle,
|
pub runtime_handle: tokio::runtime::Handle,
|
||||||
/// Shared context - Notify object to stop async tasks
|
/// Shared context - Notify object to stop async tasks
|
||||||
pub cancel_token: CancellationToken,
|
pub cancel_token: Option<CancellationToken>,
|
||||||
/// Shared context - Certificate reloader service receiver // TODO: newer one
|
/// Shared context - Certificate reloader service receiver // TODO: newer one
|
||||||
pub cert_reloader_rx: Option<ReloaderReceiver<ServerCryptoBase>>,
|
pub cert_reloader_rx: Option<ReloaderReceiver<ServerCryptoBase>>,
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -108,7 +108,7 @@ pub async fn entrypoint(
|
||||||
proxy_config: proxy_config.clone(),
|
proxy_config: proxy_config.clone(),
|
||||||
request_count: Default::default(),
|
request_count: Default::default(),
|
||||||
runtime_handle: runtime_handle.clone(),
|
runtime_handle: runtime_handle.clone(),
|
||||||
cancel_token: cancel_token.clone().unwrap_or_default(),
|
cancel_token: cancel_token.clone(),
|
||||||
cert_reloader_rx: cert_rx.clone(),
|
cert_reloader_rx: cert_rx.clone(),
|
||||||
|
|
||||||
#[cfg(feature = "acme")]
|
#[cfg(feature = "acme")]
|
||||||
|
|
@ -144,22 +144,26 @@ pub async fn entrypoint(
|
||||||
message_handler: message_handler.clone(),
|
message_handler: message_handler.clone(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let cancel_token = globals.cancel_token.child_token();
|
let cancel_token = globals.cancel_token.as_ref().map(|t| t.child_token());
|
||||||
let parent_cancel_token_clone = globals.cancel_token.clone();
|
let parent_cancel_token_clone = globals.cancel_token.clone();
|
||||||
globals.runtime_handle.spawn(async move {
|
globals.runtime_handle.spawn(async move {
|
||||||
info!("rpxy proxy service for {listening_on} started");
|
info!("rpxy proxy service for {listening_on} started");
|
||||||
|
if let Some(cancel_token) = cancel_token {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
_ = cancel_token.cancelled() => {
|
_ = cancel_token.cancelled() => {
|
||||||
info!("rpxy proxy service for {listening_on} terminated");
|
debug!("rpxy proxy service for {listening_on} terminated");
|
||||||
Ok(())
|
Ok(())
|
||||||
},
|
},
|
||||||
proxy_res = proxy.start() => {
|
proxy_res = proxy.start() => {
|
||||||
info!("rpxy proxy service for {listening_on} exited");
|
info!("rpxy proxy service for {listening_on} exited");
|
||||||
// cancel other proxy tasks
|
// cancel other proxy tasks
|
||||||
parent_cancel_token_clone.cancel();
|
parent_cancel_token_clone.unwrap().cancel();
|
||||||
proxy_res
|
proxy_res
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
proxy.start().await
|
||||||
|
}
|
||||||
})
|
})
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue