wip: started to integrate rpxy-certs to rpxy-lib
This commit is contained in:
parent
7632b1fdeb
commit
2f9f0a1122
8 changed files with 81 additions and 32 deletions
|
|
@ -1,5 +1,5 @@
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
version = "0.7.2"
|
version = "0.8.0"
|
||||||
authors = ["Jun Kurihara"]
|
authors = ["Jun Kurihara"]
|
||||||
homepage = "https://github.com/junkurihara/rust-rpxy"
|
homepage = "https://github.com/junkurihara/rust-rpxy"
|
||||||
repository = "https://github.com/junkurihara/rust-rpxy"
|
repository = "https://github.com/junkurihara/rust-rpxy"
|
||||||
|
|
|
||||||
|
|
@ -69,12 +69,10 @@ fn read_certs_and_keys(
|
||||||
|
|
||||||
let certs: Vec<_> = {
|
let certs: Vec<_> = {
|
||||||
let certs_path_str = cert_path.display().to_string();
|
let certs_path_str = cert_path.display().to_string();
|
||||||
let mut reader = BufReader::new(File::open(cert_path).map_err(|e| {
|
let mut reader = BufReader::new(
|
||||||
io::Error::new(
|
File::open(cert_path)
|
||||||
e.kind(),
|
.map_err(|e| io::Error::new(e.kind(), format!("Unable to load the certificates [{certs_path_str}]: {e}")))?,
|
||||||
format!("Unable to load the certificates [{certs_path_str}]: {e}"),
|
);
|
||||||
)
|
|
||||||
})?);
|
|
||||||
rustls_pemfile::certs(&mut reader)
|
rustls_pemfile::certs(&mut reader)
|
||||||
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "Unable to parse the certificates"))?
|
.map_err(|_| io::Error::new(io::ErrorKind::InvalidInput, "Unable to parse the certificates"))?
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -97,13 +97,16 @@ pub fn build_settings(config: &ConfigToml) -> std::result::Result<(ProxyConfig,
|
||||||
pub async fn build_cert_manager(
|
pub async fn build_cert_manager(
|
||||||
config: &ConfigToml,
|
config: &ConfigToml,
|
||||||
) -> Result<
|
) -> Result<
|
||||||
(
|
Option<(
|
||||||
ReloaderService<CryptoReloader, ServerCryptoBase>,
|
ReloaderService<CryptoReloader, ServerCryptoBase>,
|
||||||
ReloaderReceiver<ServerCryptoBase>,
|
ReloaderReceiver<ServerCryptoBase>,
|
||||||
),
|
)>,
|
||||||
anyhow::Error,
|
anyhow::Error,
|
||||||
> {
|
> {
|
||||||
let apps = config.apps.as_ref().ok_or(anyhow!("No apps"))?;
|
let apps = config.apps.as_ref().ok_or(anyhow!("No apps"))?;
|
||||||
|
if config.listen_port_tls.is_none() {
|
||||||
|
return Ok(None);
|
||||||
|
}
|
||||||
let mut crypto_source_map = HashMap::default();
|
let mut crypto_source_map = HashMap::default();
|
||||||
for app in apps.0.values() {
|
for app in apps.0.values() {
|
||||||
if let Some(tls) = app.tls.as_ref() {
|
if let Some(tls) = app.tls.as_ref() {
|
||||||
|
|
@ -118,5 +121,5 @@ pub async fn build_cert_manager(
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let res = build_cert_reloader(&crypto_source_map, None).await?;
|
let res = build_cert_reloader(&crypto_source_map, None).await?;
|
||||||
Ok(res)
|
Ok(Some(res))
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -66,20 +66,14 @@ async fn rpxy_service_without_watcher(
|
||||||
info!("Start rpxy service");
|
info!("Start rpxy service");
|
||||||
let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?;
|
let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?;
|
||||||
let (proxy_conf, app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;
|
let (proxy_conf, app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;
|
||||||
let (cert_service, cert_rx) = build_cert_manager(&config_toml)
|
|
||||||
|
let cert_service_and_rx = build_cert_manager(&config_toml)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?;
|
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?;
|
||||||
|
|
||||||
tokio::select! {
|
rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), &runtime_handle, None)
|
||||||
rpxy_res = entrypoint(&proxy_conf, &app_conf, &runtime_handle, None) => {
|
.await
|
||||||
error!("rpxy entrypoint exited");
|
.map_err(|e| anyhow!(e))
|
||||||
rpxy_res.map_err(|e| anyhow!(e))
|
|
||||||
}
|
|
||||||
cert_res = cert_service.start() => {
|
|
||||||
error!("cert reloader service exited");
|
|
||||||
cert_res.map_err(|e| anyhow!(e))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn rpxy_service_with_watcher(
|
async fn rpxy_service_with_watcher(
|
||||||
|
|
@ -95,7 +89,7 @@ async fn rpxy_service_with_watcher(
|
||||||
.ok_or(anyhow!("Something wrong in config reloader receiver"))?;
|
.ok_or(anyhow!("Something wrong in config reloader receiver"))?;
|
||||||
let (mut proxy_conf, mut app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;
|
let (mut proxy_conf, mut app_conf) = build_settings(&config_toml).map_err(|e| anyhow!("Invalid configuration: {e}"))?;
|
||||||
|
|
||||||
let (mut cert_service, mut cert_rx) = build_cert_manager(&config_toml)
|
let mut cert_service_and_rx = build_cert_manager(&config_toml)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?;
|
.map_err(|e| anyhow!("Invalid cert configuration: {e}"))?;
|
||||||
|
|
||||||
|
|
@ -105,8 +99,8 @@ async fn rpxy_service_with_watcher(
|
||||||
// Continuous monitoring
|
// Continuous monitoring
|
||||||
loop {
|
loop {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
rpxy_res = entrypoint(&proxy_conf, &app_conf, &runtime_handle, Some(term_notify.clone())) => {
|
rpxy_res = rpxy_entrypoint(&proxy_conf, &app_conf, cert_service_and_rx.as_ref(), &runtime_handle, Some(term_notify.clone())) => {
|
||||||
error!("rpxy entrypoint exited");
|
error!("rpxy entrypoint or cert service exited");
|
||||||
return rpxy_res.map_err(|e| anyhow!(e));
|
return rpxy_res.map_err(|e| anyhow!(e));
|
||||||
}
|
}
|
||||||
_ = config_rx.changed() => {
|
_ = config_rx.changed() => {
|
||||||
|
|
@ -124,8 +118,8 @@ async fn rpxy_service_with_watcher(
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
match build_cert_manager(&config_toml).await {
|
match build_cert_manager(&config_toml).await {
|
||||||
Ok((c, r)) => {
|
Ok(c) => {
|
||||||
(cert_service, cert_rx) = (c, r)
|
cert_service_and_rx = c;
|
||||||
},
|
},
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
error!("Invalid cert configuration. Configuration does not updated: {e}");
|
error!("Invalid cert configuration. Configuration does not updated: {e}");
|
||||||
|
|
@ -137,13 +131,38 @@ async fn rpxy_service_with_watcher(
|
||||||
term_notify.notify_waiters();
|
term_notify.notify_waiters();
|
||||||
// tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
// tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
|
||||||
}
|
}
|
||||||
cert_res = cert_service.start() => {
|
|
||||||
error!("cert reloader service exited");
|
|
||||||
return cert_res.map_err(|e| anyhow!(e));
|
|
||||||
}
|
|
||||||
else => break
|
else => break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wrapper of entry point for rpxy service with certificate management service
|
||||||
|
async fn rpxy_entrypoint(
|
||||||
|
proxy_config: &rpxy_lib::ProxyConfig,
|
||||||
|
app_config_list: &rpxy_lib::AppConfigList<cert_file_reader::CryptoFileSource>,
|
||||||
|
cert_service_and_rx: Option<&(
|
||||||
|
ReloaderService<rpxy_certs::CryptoReloader, rpxy_certs::ServerCryptoBase>,
|
||||||
|
ReloaderReceiver<rpxy_certs::ServerCryptoBase>,
|
||||||
|
)>, // TODO:
|
||||||
|
runtime_handle: &tokio::runtime::Handle,
|
||||||
|
term_notify: Option<std::sync::Arc<tokio::sync::Notify>>,
|
||||||
|
) -> Result<(), anyhow::Error> {
|
||||||
|
if let Some((cert_service, cert_rx)) = cert_service_and_rx {
|
||||||
|
tokio::select! {
|
||||||
|
rpxy_res = entrypoint(proxy_config, app_config_list, Some(cert_rx), runtime_handle, term_notify) => {
|
||||||
|
error!("rpxy entrypoint exited");
|
||||||
|
rpxy_res.map_err(|e| anyhow!(e))
|
||||||
|
}
|
||||||
|
cert_res = cert_service.start() => {
|
||||||
|
error!("cert reloader service exited");
|
||||||
|
cert_res.map_err(|e| anyhow!(e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
entrypoint(proxy_config, app_config_list, None, runtime_handle, term_notify)
|
||||||
|
.await
|
||||||
|
.map_err(|e| anyhow!(e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,13 +14,14 @@ publish.workspace = true
|
||||||
|
|
||||||
[features]
|
[features]
|
||||||
default = ["http3-quinn", "sticky-cookie", "cache", "rustls-backend"]
|
default = ["http3-quinn", "sticky-cookie", "cache", "rustls-backend"]
|
||||||
http3-quinn = ["socket2", "quinn", "h3", "h3-quinn"]
|
http3-quinn = ["socket2", "quinn", "h3", "h3-quinn", "rpxy-certs/http3"]
|
||||||
http3-s2n = [
|
http3-s2n = [
|
||||||
"h3",
|
"h3",
|
||||||
"s2n-quic",
|
"s2n-quic",
|
||||||
"s2n-quic-core",
|
"s2n-quic-core",
|
||||||
"s2n-quic-rustls",
|
"s2n-quic-rustls",
|
||||||
"s2n-quic-h3",
|
"s2n-quic-h3",
|
||||||
|
"rpxy-certs/http3",
|
||||||
]
|
]
|
||||||
cache = ["http-cache-semantics", "lru", "sha2", "base64"]
|
cache = ["http-cache-semantics", "lru", "sha2", "base64"]
|
||||||
sticky-cookie = ["base64", "sha2", "chrono"]
|
sticky-cookie = ["base64", "sha2", "chrono"]
|
||||||
|
|
@ -70,6 +71,7 @@ hyper-rustls = { version = "0.27.1", default-features = false, features = [
|
||||||
], optional = true }
|
], optional = true }
|
||||||
|
|
||||||
# tls and cert management for server
|
# tls and cert management for server
|
||||||
|
rpxy-certs = { path = "../rpxy-certs/", default-features = false }
|
||||||
hot_reload = "0.1.5"
|
hot_reload = "0.1.5"
|
||||||
rustls = { version = "0.21.12", default-features = false }
|
rustls = { version = "0.21.12", default-features = false }
|
||||||
tokio-rustls = { version = "0.24.1", features = ["early-data"] }
|
tokio-rustls = { version = "0.24.1", features = ["early-data"] }
|
||||||
|
|
|
||||||
|
|
@ -19,6 +19,8 @@ pub struct Globals {
|
||||||
pub term_notify: Option<Arc<tokio::sync::Notify>>,
|
pub term_notify: Option<Arc<tokio::sync::Notify>>,
|
||||||
/// Shared context - Certificate reloader service receiver
|
/// Shared context - Certificate reloader service receiver
|
||||||
pub cert_reloader_rx: Option<ReloaderReceiver<ServerCryptoBase>>,
|
pub cert_reloader_rx: Option<ReloaderReceiver<ServerCryptoBase>>,
|
||||||
|
/// Shared context - Certificate reloader service receiver // TODO: newer one
|
||||||
|
pub cert_reloader_rx_new: Option<ReloaderReceiver<rpxy_certs::ServerCryptoBase>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Configuration parameters for proxy transport and request handlers
|
/// Configuration parameters for proxy transport and request handlers
|
||||||
|
|
|
||||||
|
|
@ -10,14 +10,17 @@ mod log;
|
||||||
mod message_handler;
|
mod message_handler;
|
||||||
mod name_exp;
|
mod name_exp;
|
||||||
mod proxy;
|
mod proxy;
|
||||||
|
/* ------------------------------------------------ */
|
||||||
use crate::{
|
use crate::{
|
||||||
crypto::build_cert_reloader, error::*, forwarder::Forwarder, globals::Globals, log::*,
|
crypto::build_cert_reloader, error::*, forwarder::Forwarder, globals::Globals, log::*,
|
||||||
message_handler::HttpMessageHandlerBuilder, proxy::Proxy,
|
message_handler::HttpMessageHandlerBuilder, proxy::Proxy,
|
||||||
};
|
};
|
||||||
use futures::future::select_all;
|
use futures::future::select_all;
|
||||||
|
use hot_reload::ReloaderReceiver;
|
||||||
|
use rpxy_certs::ServerCryptoBase;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
/* ------------------------------------------------ */
|
||||||
pub use crate::{
|
pub use crate::{
|
||||||
crypto::{CertsAndKeys, CryptoSource},
|
crypto::{CertsAndKeys, CryptoSource},
|
||||||
globals::{AppConfig, AppConfigList, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri},
|
globals::{AppConfig, AppConfigList, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri},
|
||||||
|
|
@ -31,6 +34,7 @@ pub mod reexports {
|
||||||
pub async fn entrypoint<T>(
|
pub async fn entrypoint<T>(
|
||||||
proxy_config: &ProxyConfig,
|
proxy_config: &ProxyConfig,
|
||||||
app_config_list: &AppConfigList<T>,
|
app_config_list: &AppConfigList<T>,
|
||||||
|
cert_rx: Option<&ReloaderReceiver<ServerCryptoBase>>, // TODO:
|
||||||
runtime_handle: &tokio::runtime::Handle,
|
runtime_handle: &tokio::runtime::Handle,
|
||||||
term_notify: Option<Arc<tokio::sync::Notify>>,
|
term_notify: Option<Arc<tokio::sync::Notify>>,
|
||||||
) -> RpxyResult<()>
|
) -> RpxyResult<()>
|
||||||
|
|
@ -94,6 +98,7 @@ where
|
||||||
runtime_handle: runtime_handle.clone(),
|
runtime_handle: runtime_handle.clone(),
|
||||||
term_notify: term_notify.clone(),
|
term_notify: term_notify.clone(),
|
||||||
cert_reloader_rx: cert_reloader_rx.clone(),
|
cert_reloader_rx: cert_reloader_rx.clone(),
|
||||||
|
cert_reloader_rx_new: cert_rx.cloned(), // TODO: newer one
|
||||||
});
|
});
|
||||||
|
|
||||||
// 4. build message handler containing Arc-ed http_client and backends, and make it contained in Arc as well
|
// 4. build message handler containing Arc-ed http_client and backends, and make it contained in Arc as well
|
||||||
|
|
|
||||||
|
|
@ -164,6 +164,10 @@ where
|
||||||
let Some(mut server_crypto_rx) = self.globals.cert_reloader_rx.clone() else {
|
let Some(mut server_crypto_rx) = self.globals.cert_reloader_rx.clone() else {
|
||||||
return Err(RpxyError::NoCertificateReloader);
|
return Err(RpxyError::NoCertificateReloader);
|
||||||
};
|
};
|
||||||
|
// TODO: newer one
|
||||||
|
let Some(mut server_crypto_rx_new) = self.globals.cert_reloader_rx_new.clone() else {
|
||||||
|
return Err(RpxyError::NoCertificateReloader);
|
||||||
|
};
|
||||||
let tcp_socket = bind_tcp_socket(&self.listening_on)?;
|
let tcp_socket = bind_tcp_socket(&self.listening_on)?;
|
||||||
let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?;
|
let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?;
|
||||||
info!("Start TCP proxy serving with HTTPS request for configured host names");
|
info!("Start TCP proxy serving with HTTPS request for configured host names");
|
||||||
|
|
@ -237,6 +241,22 @@ where
|
||||||
};
|
};
|
||||||
server_crypto_map = Some(server_crypto.inner_local_map.clone());
|
server_crypto_map = Some(server_crypto.inner_local_map.clone());
|
||||||
}
|
}
|
||||||
|
// TODO: newer one
|
||||||
|
_ = server_crypto_rx_new.changed().fuse() => {
|
||||||
|
if server_crypto_rx_new.borrow().is_none() {
|
||||||
|
error!("Reloader is broken");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let cert_keys_map = server_crypto_rx_new.borrow().clone().unwrap();
|
||||||
|
// let Some(server_crypto) = cert_keys_map.try_into().ok() else {
|
||||||
|
// break;
|
||||||
|
// };
|
||||||
|
// let Some(server_crypto): Option<Arc<ServerCrypto>> = (&cert_keys_map).try_into().ok() else {
|
||||||
|
// error!("Failed to update server crypto");
|
||||||
|
// break;
|
||||||
|
// };
|
||||||
|
// server_crypto_map = Some(server_crypto.inner_local_map.clone());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue