commit
9c3a3bac35
7 changed files with 242 additions and 226 deletions
|
|
@ -1,5 +1,5 @@
|
|||
[workspace.package]
|
||||
version = "0.9.0-alpha.1"
|
||||
version = "0.9.0-alpha.2"
|
||||
authors = ["Jun Kurihara"]
|
||||
homepage = "https://github.com/junkurihara/rust-rpxy"
|
||||
repository = "https://github.com/junkurihara/rust-rpxy"
|
||||
|
|
|
|||
|
|
@ -115,7 +115,7 @@ impl AcmeManager {
|
|||
if let Some(cancel_token) = cancel_token.as_ref() {
|
||||
tokio::select! {
|
||||
_ = task => {},
|
||||
_ = cancel_token.cancelled() => { info!("rpxy ACME manager task for {domain} terminated") }
|
||||
_ = cancel_token.cancelled() => { debug!("rpxy ACME manager task for {domain} terminated") }
|
||||
}
|
||||
} else {
|
||||
task.await;
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"];
|
||||
pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"];
|
||||
pub const CONFIG_WATCH_DELAY_SECS: u32 = 20;
|
||||
pub const CONFIG_WATCH_DELAY_SECS: u32 = 15;
|
||||
|
||||
#[cfg(feature = "cache")]
|
||||
// Cache directory
|
||||
|
|
|
|||
|
|
@ -16,6 +16,8 @@ use crate::{
|
|||
};
|
||||
use hot_reload::{ReloaderReceiver, ReloaderService};
|
||||
use rpxy_lib::{entrypoint, RpxyOptions, RpxyOptionsBuilder};
|
||||
use std::sync::Arc;
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
fn main() {
|
||||
init_logger();
|
||||
|
|
@ -44,67 +46,203 @@ fn main() {
|
|||
.unwrap();
|
||||
|
||||
tokio::select! {
|
||||
Err(e) = config_service.start() => {
|
||||
error!("config reloader service exited: {e}");
|
||||
std::process::exit(1);
|
||||
config_res = config_service.start() => {
|
||||
if let Err(e) = config_res {
|
||||
error!("config reloader service exited: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
Err(e) = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => {
|
||||
error!("rpxy service existed: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
else => {
|
||||
std::process::exit(0);
|
||||
rpxy_res = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => {
|
||||
if let Err(e) = rpxy_res {
|
||||
error!("rpxy service existed: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
}
|
||||
std::process::exit(0);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// rpxy service definition
|
||||
struct RpxyService {
|
||||
runtime_handle: tokio::runtime::Handle,
|
||||
proxy_conf: rpxy_lib::ProxyConfig,
|
||||
app_conf: rpxy_lib::AppConfigList,
|
||||
cert_service: Option<Arc<ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>>>,
|
||||
cert_rx: Option<ReloaderReceiver<rpxy_certs::ServerCryptoBase>>,
|
||||
#[cfg(feature = "acme")]
|
||||
acme_manager: Option<rpxy_acme::AcmeManager>,
|
||||
}
|
||||
|
||||
impl RpxyService {
|
||||
async fn new(config_toml: &ConfigToml, runtime_handle: tokio::runtime::Handle) -> Result<Self, anyhow::Error> {
|
||||
let (proxy_conf, app_conf) = build_settings(config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;
|
||||
|
||||
let (cert_service, cert_rx) = build_cert_manager(config_toml)
|
||||
.await
|
||||
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?
|
||||
.map(|(s, r)| (Some(Arc::new(s)), Some(r)))
|
||||
.unwrap_or((None, None));
|
||||
|
||||
Ok(RpxyService {
|
||||
runtime_handle: runtime_handle.clone(),
|
||||
proxy_conf,
|
||||
app_conf,
|
||||
cert_service,
|
||||
cert_rx,
|
||||
#[cfg(feature = "acme")]
|
||||
acme_manager: build_acme_manager(config_toml, runtime_handle.clone()).await?,
|
||||
})
|
||||
}
|
||||
|
||||
async fn start(&self, cancel_token: Option<CancellationToken>) -> Result<(), anyhow::Error> {
|
||||
let RpxyService {
|
||||
runtime_handle,
|
||||
proxy_conf,
|
||||
app_conf,
|
||||
cert_service: _,
|
||||
cert_rx,
|
||||
#[cfg(feature = "acme")]
|
||||
acme_manager,
|
||||
} = self;
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
{
|
||||
let (acme_join_handles, server_config_acme_challenge) = acme_manager
|
||||
.as_ref()
|
||||
.map(|m| m.spawn_manager_tasks(cancel_token.as_ref().map(|t| t.child_token())))
|
||||
.unwrap_or((vec![], Default::default()));
|
||||
let rpxy_opts = RpxyOptionsBuilder::default()
|
||||
.proxy_config(proxy_conf.clone())
|
||||
.app_config_list(app_conf.clone())
|
||||
.cert_rx(cert_rx.clone())
|
||||
.runtime_handle(runtime_handle.clone())
|
||||
.cancel_token(cancel_token.as_ref().map(|t| t.child_token()))
|
||||
.server_configs_acme_challenge(Arc::new(server_config_acme_challenge))
|
||||
.build()?;
|
||||
self.start_inner(rpxy_opts, acme_join_handles).await.map_err(|e| anyhow!(e))
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "acme"))]
|
||||
{
|
||||
let rpxy_opts = RpxyOptionsBuilder::default()
|
||||
.proxy_config(proxy_conf.clone())
|
||||
.app_config_list(app_conf.clone())
|
||||
.cert_rx(cert_rx.clone())
|
||||
.runtime_handle(runtime_handle.clone())
|
||||
.cancel_token(cancel_token.as_ref().map(|t| t.child_token()))
|
||||
.build()?;
|
||||
self.start_inner(rpxy_opts).await.map_err(|e| anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
/// Wrapper of entry point for rpxy service with certificate management service
|
||||
async fn start_inner(
|
||||
&self,
|
||||
rpxy_opts: RpxyOptions,
|
||||
#[cfg(feature = "acme")] acme_task_handles: Vec<tokio::task::JoinHandle<()>>,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
let cancel_token = rpxy_opts.cancel_token.clone();
|
||||
let runtime_handle = rpxy_opts.runtime_handle.clone();
|
||||
|
||||
// spawn rpxy entrypoint, where cancellation token is possibly contained inside the service
|
||||
let cancel_token_clone = cancel_token.clone();
|
||||
let rpxy_handle = runtime_handle.spawn(async move {
|
||||
if let Err(e) = entrypoint(&rpxy_opts).await {
|
||||
error!("rpxy entrypoint exited on error: {e}");
|
||||
if let Some(cancel_token) = cancel_token_clone {
|
||||
cancel_token.cancel();
|
||||
}
|
||||
return Err(anyhow!(e));
|
||||
}
|
||||
Ok(())
|
||||
});
|
||||
|
||||
if self.cert_service.is_none() {
|
||||
return rpxy_handle.await?;
|
||||
}
|
||||
|
||||
// spawn certificate reloader service, where cert service does not have cancellation token inside the service
|
||||
let cert_service = self.cert_service.as_ref().unwrap().clone();
|
||||
let cancel_token_clone = cancel_token.clone();
|
||||
let child_cancel_token = cancel_token.as_ref().map(|c| c.child_token());
|
||||
let cert_handle = runtime_handle.spawn(async move {
|
||||
if let Some(child_cancel_token) = child_cancel_token {
|
||||
tokio::select! {
|
||||
cert_res = cert_service.start() => {
|
||||
if let Err(ref e) = cert_res {
|
||||
error!("cert reloader service exited on error: {e}");
|
||||
}
|
||||
cancel_token_clone.unwrap().cancel();
|
||||
cert_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
_ = child_cancel_token.cancelled() => {
|
||||
debug!("cert reloader service terminated");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
} else {
|
||||
cert_service.start().await.map_err(|e| anyhow!(e))
|
||||
}
|
||||
});
|
||||
|
||||
#[cfg(not(feature = "acme"))]
|
||||
{
|
||||
let (rpxy_res, cert_res) = tokio::join!(rpxy_handle, cert_handle);
|
||||
let (rpxy_res, cert_res) = (rpxy_res?, cert_res?);
|
||||
match (rpxy_res, cert_res) {
|
||||
(Ok(()), Ok(())) => Ok(()),
|
||||
(Err(e), _) => Err(e),
|
||||
(_, Err(e)) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
{
|
||||
if acme_task_handles.is_empty() {
|
||||
let (rpxy_res, cert_res) = tokio::join!(rpxy_handle, cert_handle);
|
||||
let (rpxy_res, cert_res) = (rpxy_res?, cert_res?);
|
||||
return match (rpxy_res, cert_res) {
|
||||
(Ok(()), Ok(())) => Ok(()),
|
||||
(Err(e), _) => Err(e),
|
||||
(_, Err(e)) => Err(e),
|
||||
};
|
||||
}
|
||||
|
||||
// spawn acme manager tasks, where cancellation token is possibly contained inside the service
|
||||
let select_all = futures_util::future::select_all(acme_task_handles);
|
||||
let cancel_token_clone = cancel_token.clone();
|
||||
let acme_handle = runtime_handle.spawn(async move {
|
||||
let (acme_res, _, _) = select_all.await;
|
||||
if let Err(ref e) = acme_res {
|
||||
error!("acme manager exited on error: {e}");
|
||||
}
|
||||
if let Some(cancel_token) = cancel_token_clone {
|
||||
cancel_token.cancel();
|
||||
}
|
||||
acme_res.map_err(|e| anyhow!(e))
|
||||
});
|
||||
let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle);
|
||||
let (rpxy_res, cert_res, acme_res) = (rpxy_res?, cert_res?, acme_res?);
|
||||
match (rpxy_res, cert_res, acme_res) {
|
||||
(Ok(()), Ok(()), Ok(())) => Ok(()),
|
||||
(Err(e), _, _) => Err(e),
|
||||
(_, Err(e), _) => Err(e),
|
||||
(_, _, Err(e)) => Err(e),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn rpxy_service_without_watcher(
|
||||
config_file_path: &str,
|
||||
runtime_handle: tokio::runtime::Handle,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
info!("Start rpxy service");
|
||||
let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?;
|
||||
let (proxy_conf, app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;
|
||||
|
||||
let (cert_service, cert_rx) = build_cert_manager(&config_toml)
|
||||
.await
|
||||
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?
|
||||
.map(|(s, r)| (Some(s), Some(r)))
|
||||
.unwrap_or((None, None));
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
{
|
||||
let acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?;
|
||||
let (acme_join_handles, server_config_acme_challenge) = acme_manager
|
||||
.as_ref()
|
||||
.map(|m| m.spawn_manager_tasks(None))
|
||||
.unwrap_or((vec![], Default::default()));
|
||||
let rpxy_opts = RpxyOptionsBuilder::default()
|
||||
.proxy_config(proxy_conf)
|
||||
.app_config_list(app_conf)
|
||||
.cert_rx(cert_rx)
|
||||
.runtime_handle(runtime_handle.clone())
|
||||
.server_configs_acme_challenge(std::sync::Arc::new(server_config_acme_challenge))
|
||||
.build()?;
|
||||
rpxy_entrypoint(&rpxy_opts, cert_service.as_ref(), acme_join_handles) //, &runtime_handle)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "acme"))]
|
||||
{
|
||||
let rpxy_opts = RpxyOptionsBuilder::default()
|
||||
.proxy_config(proxy_conf.clone())
|
||||
.app_config_list(app_conf.clone())
|
||||
.cert_rx(cert_rx.clone())
|
||||
.runtime_handle(runtime_handle.clone())
|
||||
.build()?;
|
||||
rpxy_entrypoint(&rpxy_opts, cert_service.as_ref()) //, &runtime_handle)
|
||||
.await
|
||||
.map_err(|e| anyhow!(e))
|
||||
}
|
||||
let service = RpxyService::new(&config_toml, runtime_handle).await?;
|
||||
service.start(None).await
|
||||
}
|
||||
|
||||
async fn rpxy_service_with_watcher(
|
||||
|
|
@ -118,176 +256,41 @@ async fn rpxy_service_with_watcher(
|
|||
.borrow()
|
||||
.clone()
|
||||
.ok_or(anyhow!("Something wrong in config reloader receiver"))?;
|
||||
let (mut proxy_conf, mut app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
let mut acme_manager = build_acme_manager(&config_toml, runtime_handle.clone()).await?;
|
||||
|
||||
let mut cert_service_and_rx = build_cert_manager(&config_toml)
|
||||
.await
|
||||
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?;
|
||||
let mut service = RpxyService::new(&config_toml, runtime_handle.clone()).await?;
|
||||
|
||||
// Continuous monitoring
|
||||
loop {
|
||||
// Notifier for proxy service termination
|
||||
let cancel_token = tokio_util::sync::CancellationToken::new();
|
||||
|
||||
let (cert_service, cert_rx) = cert_service_and_rx
|
||||
.as_ref()
|
||||
.map(|(s, r)| (Some(s), Some(r)))
|
||||
.unwrap_or((None, None));
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
let (acme_join_handles, server_config_acme_challenge) = acme_manager
|
||||
.as_ref()
|
||||
.map(|m| m.spawn_manager_tasks(Some(cancel_token.child_token())))
|
||||
.unwrap_or((vec![], Default::default()));
|
||||
|
||||
let rpxy_opts = {
|
||||
#[cfg(feature = "acme")]
|
||||
let res = RpxyOptionsBuilder::default()
|
||||
.proxy_config(proxy_conf.clone())
|
||||
.app_config_list(app_conf.clone())
|
||||
.cert_rx(cert_rx.cloned())
|
||||
.runtime_handle(runtime_handle.clone())
|
||||
.cancel_token(Some(cancel_token.child_token()))
|
||||
.server_configs_acme_challenge(std::sync::Arc::new(server_config_acme_challenge))
|
||||
.build();
|
||||
|
||||
#[cfg(not(feature = "acme"))]
|
||||
let res = RpxyOptionsBuilder::default()
|
||||
.proxy_config(proxy_conf.clone())
|
||||
.app_config_list(app_conf.clone())
|
||||
.cert_rx(cert_rx.cloned())
|
||||
.runtime_handle(runtime_handle.clone())
|
||||
.term_notify(Some(term_notify.clone()))
|
||||
.build();
|
||||
res
|
||||
}?;
|
||||
|
||||
tokio::select! {
|
||||
rpxy_res = {
|
||||
#[cfg(feature = "acme")]
|
||||
{
|
||||
rpxy_entrypoint(&rpxy_opts, cert_service, acme_join_handles)//, &runtime_handle)
|
||||
/* ---------- */
|
||||
rpxy_res = service.start(Some(cancel_token.clone())) => {
|
||||
if let Err(ref e) = rpxy_res {
|
||||
error!("rpxy service exited on error: {e}");
|
||||
} else {
|
||||
error!("rpxy service exited");
|
||||
}
|
||||
#[cfg(not(feature = "acme"))]
|
||||
{
|
||||
rpxy_entrypoint(&rpxy_opts, cert_service)//, &runtime_handle)
|
||||
}
|
||||
} => {
|
||||
error!("rpxy entrypoint or cert service exited");
|
||||
return rpxy_res.map_err(|e| anyhow!(e));
|
||||
}
|
||||
/* ---------- */
|
||||
_ = config_rx.changed() => {
|
||||
let Some(config_toml) = config_rx.borrow().clone() else {
|
||||
let Some(new_config_toml) = config_rx.borrow().clone() else {
|
||||
error!("Something wrong in config reloader receiver");
|
||||
return Err(anyhow!("Something wrong in config reloader receiver"));
|
||||
};
|
||||
match build_settings(&config_toml) {
|
||||
Ok((p, a)) => {
|
||||
(proxy_conf, app_conf) = (p, a)
|
||||
match RpxyService::new(&new_config_toml, runtime_handle.clone()).await {
|
||||
Ok(new_service) => {
|
||||
info!("Configuration updated.");
|
||||
service = new_service;
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Invalid configuration. Configuration does not updated: {e}");
|
||||
continue;
|
||||
error!("rpxy failed to be ready. Configuration does not updated: {e}");
|
||||
}
|
||||
};
|
||||
match build_cert_manager(&config_toml).await {
|
||||
Ok(c) => {
|
||||
cert_service_and_rx = c;
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Invalid cert configuration. Configuration does not updated: {e}");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
#[cfg(feature = "acme")]
|
||||
{
|
||||
match build_acme_manager(&config_toml, runtime_handle.clone()).await {
|
||||
Ok(m) => {
|
||||
acme_manager = m;
|
||||
},
|
||||
Err(e) => {
|
||||
error!("Invalid acme configuration. Configuration does not updated: {e}");
|
||||
continue;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
info!("Configuration updated. Terminate all spawned services and force to re-bind TCP/UDP sockets");
|
||||
info!("Terminate all spawned services and force to re-bind TCP/UDP sockets");
|
||||
cancel_token.cancel();
|
||||
}
|
||||
else => break
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(not(feature = "acme"))]
|
||||
/// Wrapper of entry point for rpxy service with certificate management service
|
||||
async fn rpxy_entrypoint(
|
||||
rpxy_opts: &RpxyOptions,
|
||||
cert_service: Option<&ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>>,
|
||||
// runtime_handle: &tokio::runtime::Handle,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
// TODO: refactor: update routine
|
||||
if let Some(cert_service) = cert_service {
|
||||
tokio::select! {
|
||||
rpxy_res = entrypoint(rpxy_opts) => {
|
||||
error!("rpxy entrypoint exited");
|
||||
rpxy_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
cert_res = cert_service.start() => {
|
||||
error!("cert reloader service exited");
|
||||
cert_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
entrypoint(rpxy_opts).await.map_err(|e| anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
/// Wrapper of entry point for rpxy service with certificate management service
|
||||
async fn rpxy_entrypoint(
|
||||
rpxy_opts: &RpxyOptions,
|
||||
cert_service: Option<&ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>>,
|
||||
acme_task_handles: Vec<tokio::task::JoinHandle<()>>,
|
||||
// runtime_handle: &tokio::runtime::Handle,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
// TODO: refactor: update routine
|
||||
if let Some(cert_service) = cert_service {
|
||||
if acme_task_handles.is_empty() {
|
||||
tokio::select! {
|
||||
rpxy_res = entrypoint(rpxy_opts) => {
|
||||
error!("rpxy entrypoint exited");
|
||||
rpxy_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
cert_res = cert_service.start() => {
|
||||
error!("cert reloader service exited");
|
||||
cert_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
}
|
||||
} else {
|
||||
let select_all = futures_util::future::select_all(acme_task_handles);
|
||||
tokio::select! {
|
||||
rpxy_res = entrypoint(rpxy_opts) => {
|
||||
error!("rpxy entrypoint exited");
|
||||
rpxy_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
(acme_res, _, _) = select_all => {
|
||||
error!("acme manager exited");
|
||||
acme_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
cert_res = cert_service.start() => {
|
||||
error!("cert reloader service exited");
|
||||
cert_res.map_err(|e| anyhow!(e))
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
entrypoint(rpxy_opts).await.map_err(|e| anyhow!(e))
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
use crate::{constants::*, count::RequestCount};
|
||||
use hot_reload::ReloaderReceiver;
|
||||
use rpxy_certs::ServerCryptoBase;
|
||||
use std::{net::SocketAddr, sync::Arc, time::Duration};
|
||||
use std::{net::SocketAddr, time::Duration};
|
||||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/// Global object containing proxy configurations and shared object like counters.
|
||||
|
|
@ -20,7 +20,7 @@ pub struct Globals {
|
|||
|
||||
#[cfg(feature = "acme")]
|
||||
/// ServerConfig used for only ACME challenge for ACME domains
|
||||
pub server_configs_acme_challenge: Arc<rustc_hash::FxHashMap<String, Arc<rustls::ServerConfig>>>,
|
||||
pub server_configs_acme_challenge: std::sync::Arc<rustc_hash::FxHashMap<String, std::sync::Arc<rustls::ServerConfig>>>,
|
||||
}
|
||||
|
||||
/// Configuration parameters for proxy transport and request handlers
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ use crate::{
|
|||
message_handler::HttpMessageHandlerBuilder,
|
||||
proxy::Proxy,
|
||||
};
|
||||
use futures::future::select_all;
|
||||
use futures::future::join_all;
|
||||
use hot_reload::ReloaderReceiver;
|
||||
use rpxy_certs::ServerCryptoBase;
|
||||
use std::sync::Arc;
|
||||
|
|
@ -131,7 +131,7 @@ pub async fn entrypoint(
|
|||
|
||||
// spawn each proxy for a given socket with copied Arc-ed backend, message_handler and connection builder.
|
||||
let addresses = globals.proxy_config.listen_sockets.clone();
|
||||
let futures_iter = addresses.into_iter().map(|listening_on| {
|
||||
let join_handles = addresses.into_iter().map(|listening_on| {
|
||||
let mut tls_enabled = false;
|
||||
if let Some(https_port) = globals.proxy_config.https_port {
|
||||
tls_enabled = https_port == listening_on.port()
|
||||
|
|
@ -143,11 +143,41 @@ pub async fn entrypoint(
|
|||
connection_builder: connection_builder.clone(),
|
||||
message_handler: message_handler.clone(),
|
||||
};
|
||||
globals.runtime_handle.spawn(async move { proxy.start().await })
|
||||
|
||||
let cancel_token = globals.cancel_token.as_ref().map(|t| t.child_token());
|
||||
let parent_cancel_token_clone = globals.cancel_token.clone();
|
||||
globals.runtime_handle.spawn(async move {
|
||||
info!("rpxy proxy service for {listening_on} started");
|
||||
if let Some(cancel_token) = cancel_token {
|
||||
tokio::select! {
|
||||
_ = cancel_token.cancelled() => {
|
||||
debug!("rpxy proxy service for {listening_on} terminated");
|
||||
Ok(())
|
||||
},
|
||||
proxy_res = proxy.start() => {
|
||||
info!("rpxy proxy service for {listening_on} exited");
|
||||
// cancel other proxy tasks
|
||||
parent_cancel_token_clone.unwrap().cancel();
|
||||
proxy_res
|
||||
}
|
||||
}
|
||||
} else {
|
||||
proxy.start().await
|
||||
}
|
||||
})
|
||||
});
|
||||
|
||||
if let (Ok(Err(e)), _, _) = select_all(futures_iter).await {
|
||||
error!("Some proxy services are down: {}", e);
|
||||
let join_res = join_all(join_handles).await;
|
||||
let mut errs = join_res.into_iter().filter_map(|res| {
|
||||
if let Ok(Err(e)) = res {
|
||||
error!("Some proxy services are down: {}", e);
|
||||
Some(e)
|
||||
} else {
|
||||
None
|
||||
}
|
||||
});
|
||||
// returns the first error as the representative error
|
||||
if let Some(e) = errs.next() {
|
||||
return Err(e);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -312,23 +312,6 @@ where
|
|||
}
|
||||
};
|
||||
|
||||
match &self.globals.cancel_token {
|
||||
Some(cancel_token) => {
|
||||
select! {
|
||||
_ = proxy_service.fuse() => {
|
||||
warn!("Proxy service got down");
|
||||
}
|
||||
_ = cancel_token.cancelled().fuse() => {
|
||||
info!("Proxy service listening on {} receives term signal", self.listening_on);
|
||||
}
|
||||
}
|
||||
}
|
||||
None => {
|
||||
proxy_service.await?;
|
||||
warn!("Proxy service got down");
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
proxy_service.await
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue