feat: add initial acme support (ugly!)
This commit is contained in:
parent
d6136f9ffa
commit
7b0ca08e1e
11 changed files with 277 additions and 89 deletions
|
|
@ -15,7 +15,7 @@ use crate::{
|
|||
log::*,
|
||||
};
|
||||
use hot_reload::{ReloaderReceiver, ReloaderService};
|
||||
use rpxy_lib::entrypoint;
|
||||
use rpxy_lib::{entrypoint, RpxyOptions, RpxyOptionsBuilder};
|
||||
|
||||
fn main() {
|
||||
init_logger();
|
||||
|
|
@ -68,30 +68,40 @@ async fn rpxy_service_without_watcher(
|
|||
let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?;
|
||||
let (proxy_conf, app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
let acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?;
|
||||
|
||||
let cert_service_and_rx = build_cert_manager(&config_toml)
|
||||
let (cert_service, cert_rx) = build_cert_manager(&config_toml)
|
||||
.await
|
||||
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?;
|
||||
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?
|
||||
.map(|(s, r)| (Some(s), Some(r)))
|
||||
.unwrap_or((None, None));
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
{
|
||||
rpxy_entrypoint(
|
||||
&proxy_conf,
|
||||
&app_conf,
|
||||
cert_service_and_rx.as_ref(),
|
||||
acme_manager.as_ref(),
|
||||
&runtime_handle,
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))
|
||||
let acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?;
|
||||
let (acme_join_handles, server_config_acme_challenge) = acme_manager
|
||||
.as_ref()
|
||||
.map(|m| m.spawn_manager_tasks(None))
|
||||
.unwrap_or((vec![], Default::default()));
|
||||
let rpxy_opts = RpxyOptionsBuilder::default()
|
||||
.proxy_config(proxy_conf)
|
||||
.app_config_list(app_conf)
|
||||
.cert_rx(cert_rx)
|
||||
.runtime_handle(runtime_handle.clone())
|
||||
.server_configs_acme_challenge(std::sync::Arc::new(server_config_acme_challenge))
|
||||
.build()?;
|
||||
rpxy_entrypoint(&rpxy_opts, cert_service.as_ref(), acme_join_handles) //, &runtime_handle)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "acme"))]
|
||||
{
|
||||
rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), &runtime_handle, None)
|
||||
let rpxy_opts = RpxyOptionsBuilder::default()
|
||||
.proxy_config(proxy_conf.clone())
|
||||
.app_config_list(app_conf.clone())
|
||||
.cert_rx(cert_rx.clone())
|
||||
.runtime_handle(runtime_handle.clone())
|
||||
.build()?;
|
||||
rpxy_entrypoint(&rpxy_opts, cert_service.as_ref()) //, &runtime_handle)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))
|
||||
}
|
||||
|
|
@ -111,7 +121,7 @@ async fn rpxy_service_with_watcher(
|
|||
let (mut proxy_conf, mut app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
let acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?;
|
||||
let mut acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?;
|
||||
|
||||
let mut cert_service_and_rx = build_cert_manager(&config_toml)
|
||||
.await
|
||||
|
|
@ -122,15 +132,48 @@ async fn rpxy_service_with_watcher(
|
|||
|
||||
// Continuous monitoring
|
||||
loop {
|
||||
let (cert_service, cert_rx) = cert_service_and_rx
|
||||
.as_ref()
|
||||
.map(|(s, r)| (Some(s), Some(r)))
|
||||
.unwrap_or((None, None));
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
let (acme_join_handles, server_config_acme_challenge) = acme_manager
|
||||
.as_ref()
|
||||
.map(|m| m.spawn_manager_tasks(Some(term_notify.clone())))
|
||||
.unwrap_or((vec![], Default::default()));
|
||||
|
||||
let rpxy_opts = {
|
||||
#[cfg(feature = "acme")]
|
||||
let res = RpxyOptionsBuilder::default()
|
||||
.proxy_config(proxy_conf.clone())
|
||||
.app_config_list(app_conf.clone())
|
||||
.cert_rx(cert_rx.cloned())
|
||||
.runtime_handle(runtime_handle.clone())
|
||||
.term_notify(Some(term_notify.clone()))
|
||||
.server_configs_acme_challenge(std::sync::Arc::new(server_config_acme_challenge))
|
||||
.build();
|
||||
|
||||
#[cfg(not(feature = "acme"))]
|
||||
let res = RpxyOptionsBuilder::default()
|
||||
.proxy_config(proxy_conf.clone())
|
||||
.app_config_list(app_conf.clone())
|
||||
.cert_rx(cert_rx.cloned())
|
||||
.runtime_handle(runtime_handle.clone())
|
||||
.term_notify(Some(term_notify.clone()))
|
||||
.build();
|
||||
res
|
||||
}?;
|
||||
|
||||
tokio::select! {
|
||||
rpxy_res = {
|
||||
#[cfg(feature = "acme")]
|
||||
{
|
||||
rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), acme_manager.as_ref(), &runtime_handle, Some(term_notify.clone()))
|
||||
rpxy_entrypoint(&rpxy_opts, cert_service, acme_join_handles)//, &runtime_handle)
|
||||
}
|
||||
#[cfg(not(feature = "acme"))]
|
||||
{
|
||||
rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), &runtime_handle, Some(term_notify.clone()))
|
||||
rpxy_entrypoint(&rpxy_opts, cert_service)//, &runtime_handle)
|
||||
}
|
||||
} => {
|
||||
error!("rpxy entrypoint or cert service exited");
|
||||
|
|
@ -159,8 +202,20 @@ async fn rpxy_service_with_watcher(
|
|||
continue;
|
||||
}
|
||||
};
|
||||
#[cfg(feature = "acme")]
|
||||
{
|
||||
match build_acme_manager(&config_toml, runtime_handle.clone()).await {
|
||||
Ok(m) => {
|
||||
acme_manager = m;
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Invalid acme configuration. Configuration does not updated: {e}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Configuration updated. Terminate all spawned proxy services and force to re-bind TCP/UDP sockets");
|
||||
info!("Configuration updated. Terminate all spawned services and force to re-bind TCP/UDP sockets");
|
||||
term_notify.notify_waiters();
|
||||
// tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||
}
|
||||
|
|
@ -174,18 +229,14 @@ async fn rpxy_service_with_watcher(
|
|||
#[cfg(not(feature = "acme"))]
|
||||
/// Wrapper of entry point for rpxy service with certificate management service
|
||||
async fn rpxy_entrypoint(
|
||||
proxy_config: &rpxy_lib::ProxyConfig,
|
||||
app_config_list: &rpxy_lib::AppConfigList,
|
||||
cert_service_and_rx: Option<&(
|
||||
ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>,
|
||||
ReloaderReceiver<rpxy_certs::ServerCryptoBase>,
|
||||
)>,
|
||||
runtime_handle: &tokio::runtime::Handle,
|
||||
term_notify: Option<std::sync::Arc<tokio::sync::Notify>>,
|
||||
rpxy_opts: &RpxyOptions,
|
||||
cert_service: Option<&ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>>,
|
||||
// runtime_handle: &tokio::runtime::Handle,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
if let Some((cert_service, cert_rx)) = cert_service_and_rx {
|
||||
// TODO: refactor: update routine
|
||||
if let Some(cert_service) = cert_service {
|
||||
tokio::select! {
|
||||
rpxy_res = entrypoint(proxy_config, app_config_list, Some(cert_rx), runtime_handle, term_notify) => {
|
||||
rpxy_res = entrypoint(rpxy_opts) => {
|
||||
error!("rpxy entrypoint exited");
|
||||
rpxy_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
|
|
@ -195,46 +246,49 @@ async fn rpxy_entrypoint(
|
|||
}
|
||||
}
|
||||
} else {
|
||||
entrypoint(proxy_config, app_config_list, None, runtime_handle, term_notify)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))
|
||||
entrypoint(rpxy_opts).await.map_err(|e| anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
/// Wrapper of entry point for rpxy service with certificate management service
|
||||
async fn rpxy_entrypoint(
|
||||
proxy_config: &rpxy_lib::ProxyConfig,
|
||||
app_config_list: &rpxy_lib::AppConfigList,
|
||||
cert_service_and_rx: Option<&(
|
||||
ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>,
|
||||
ReloaderReceiver<rpxy_certs::ServerCryptoBase>,
|
||||
)>,
|
||||
acme_manager: Option<&rpxy_acme::AcmeManager>,
|
||||
runtime_handle: &tokio::runtime::Handle,
|
||||
term_notify: Option<std::sync::Arc<tokio::sync::Notify>>,
|
||||
rpxy_opts: &RpxyOptions,
|
||||
cert_service: Option<&ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>>,
|
||||
acme_task_handles: Vec<tokio::task::JoinHandle<()>>,
|
||||
// runtime_handle: &tokio::runtime::Handle,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
// TODO: remove later, reconsider routine
|
||||
println!("ACME manager:\n{:#?}", acme_manager);
|
||||
let x = acme_manager.unwrap().clone();
|
||||
let (handle, confs) = x.spawn_manager_tasks();
|
||||
tokio::spawn(async move { futures_util::future::select_all(handle).await });
|
||||
// TODO:
|
||||
|
||||
if let Some((cert_service, cert_rx)) = cert_service_and_rx {
|
||||
tokio::select! {
|
||||
rpxy_res = entrypoint(proxy_config, app_config_list, Some(cert_rx), runtime_handle, term_notify) => {
|
||||
error!("rpxy entrypoint exited");
|
||||
rpxy_res.map_err(|e| anyhow!(e))
|
||||
// TODO: refactor: update routine
|
||||
if let Some(cert_service) = cert_service {
|
||||
if acme_task_handles.is_empty() {
|
||||
tokio::select! {
|
||||
rpxy_res = entrypoint(rpxy_opts) => {
|
||||
error!("rpxy entrypoint exited");
|
||||
rpxy_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
cert_res = cert_service.start() => {
|
||||
error!("cert reloader service exited");
|
||||
cert_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
}
|
||||
cert_res = cert_service.start() => {
|
||||
error!("cert reloader service exited");
|
||||
cert_res.map_err(|e| anyhow!(e))
|
||||
} else {
|
||||
let select_all = futures_util::future::select_all(acme_task_handles);
|
||||
tokio::select! {
|
||||
rpxy_res = entrypoint(rpxy_opts) => {
|
||||
error!("rpxy entrypoint exited");
|
||||
rpxy_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
(acme_res, _, _) = select_all => {
|
||||
error!("acme manager exited");
|
||||
acme_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
cert_res = cert_service.start() => {
|
||||
error!("cert reloader service exited");
|
||||
cert_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
entrypoint(proxy_config, app_config_list, None, runtime_handle, term_notify)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))
|
||||
entrypoint(rpxy_opts).await.map_err(|e| anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue