refactor: cleanup codes

This commit is contained in:
Jun Kurihara 2023-07-21 22:07:36 +09:00
commit f6c4032f83
No known key found for this signature in database
GPG key ID: 6D3FEE70E498C15B
10 changed files with 280 additions and 124 deletions

View file

@ -10,9 +10,15 @@ pub enum RpxyError {
#[error("Proxy build error")]
ProxyBuild(#[from] crate::proxy::ProxyBuilderError),
#[error("Backend build error")]
BackendBuild(#[from] crate::backend::BackendBuilderError),
#[error("MessageHandler build error")]
HandlerBuild(#[from] crate::handler::HttpMessageHandlerBuilderError),
#[error("Config builder error: {0}")]
ConfigBuild(&'static str),
#[error("Http Message Handler Error: {0}")]
Handler(&'static str),

View file

@ -1,4 +1,14 @@
use crate::{backend::Backends, certs::CryptoSource, constants::*};
use crate::{
backend::{
Backend, BackendBuilder, Backends, ReverseProxy, Upstream, UpstreamGroup, UpstreamGroupBuilder, UpstreamOption,
},
certs::CryptoSource,
constants::*,
error::RpxyError,
log::*,
utils::{BytesName, PathNameBytesExp},
};
use rustc_hash::FxHashMap as HashMap;
use std::net::SocketAddr;
use std::sync::{
atomic::{AtomicUsize, Ordering},
@ -26,6 +36,7 @@ where
}
/// Configuration parameters for proxy transport and request handlers
#[derive(PartialEq, Eq)]
pub struct ProxyConfig {
pub listen_sockets: Vec<SocketAddr>, // when instantiate server
pub http_port: Option<u16>, // when instantiate server
@ -54,7 +65,7 @@ pub struct ProxyConfig {
#[cfg(feature = "http3")]
pub h3_max_concurrent_connections: u32,
#[cfg(feature = "http3")]
pub h3_max_idle_timeout: Option<quinn::IdleTimeout>,
pub h3_max_idle_timeout: Option<Duration>,
}
impl Default for ProxyConfig {
@ -87,11 +98,172 @@ impl Default for ProxyConfig {
#[cfg(feature = "http3")]
h3_max_concurrent_unistream: H3::MAX_CONCURRENT_UNISTREAM.into(),
#[cfg(feature = "http3")]
h3_max_idle_timeout: Some(quinn::IdleTimeout::try_from(Duration::from_secs(H3::MAX_IDLE_TIMEOUT)).unwrap()),
h3_max_idle_timeout: Some(Duration::from_secs(H3::MAX_IDLE_TIMEOUT)),
}
}
}
/// Configuration parameters for backend applications
#[derive(PartialEq, Eq)]
pub struct AppConfigList<T>
where
T: CryptoSource,
{
pub inner: Vec<AppConfig<T>>,
pub default_app: Option<String>,
}
impl<T> TryInto<Backends<T>> for AppConfigList<T>
where
T: CryptoSource + Clone,
{
type Error = RpxyError;
fn try_into(self) -> Result<Backends<T>, Self::Error> {
let mut backends = Backends::new();
for app_config in self.inner.iter() {
let backend = app_config.try_into()?;
backends
.apps
.insert(app_config.server_name.clone().to_server_name_vec(), backend);
info!("Registering application: ({})", &app_config.server_name);
}
// default backend application for plaintext http requests
if let Some(d) = self.default_app {
let d_sn: Vec<&str> = backends
.apps
.iter()
.filter(|(_k, v)| v.app_name == d)
.map(|(_, v)| v.server_name.as_ref())
.collect();
if !d_sn.is_empty() {
info!(
"Serving plaintext http for requests to unconfigured server_name by app {} (server_name: {}).",
d, d_sn[0]
);
backends.default_server_name_bytes = Some(d_sn[0].to_server_name_vec());
}
}
Ok(backends)
}
}
/// Configuration parameters for single backend application
#[derive(PartialEq, Eq)]
pub struct AppConfig<T>
where
T: CryptoSource,
{
pub server_name: String,
pub reverse_proxy: Vec<ReverseProxyConfig>,
pub tls: Option<TlsConfig<T>>,
}
impl<T> TryInto<Backend<T>> for &AppConfig<T>
where
T: CryptoSource + Clone,
{
type Error = RpxyError;
fn try_into(self) -> Result<Backend<T>, Self::Error> {
// backend builder
let mut backend_builder = BackendBuilder::default();
// reverse proxy settings
let reverse_proxy = self.try_into()?;
backend_builder
.app_name(self.server_name.clone())
.server_name(self.server_name.clone())
.reverse_proxy(reverse_proxy);
// TLS settings and build backend instance
let backend = if self.tls.is_none() {
backend_builder.build().map_err(RpxyError::BackendBuild)?
} else {
let tls = self.tls.as_ref().unwrap();
backend_builder
.https_redirection(Some(tls.https_redirection))
.crypto_source(Some(tls.inner.clone()))
.build()?
};
Ok(backend)
}
}
impl<T> TryInto<ReverseProxy> for &AppConfig<T>
where
T: CryptoSource + Clone,
{
type Error = RpxyError;
fn try_into(self) -> Result<ReverseProxy, Self::Error> {
let mut upstream: HashMap<PathNameBytesExp, UpstreamGroup> = HashMap::default();
self.reverse_proxy.iter().for_each(|rpo| {
let upstream_vec: Vec<Upstream> = rpo.upstream.iter().map(|x| x.try_into().unwrap()).collect();
// let upstream_iter = rpo.upstream.iter().map(|x| x.to_upstream().unwrap());
// let lb_upstream_num = vec_upstream.len();
let elem = UpstreamGroupBuilder::default()
.upstream(&upstream_vec)
.path(&rpo.path)
.replace_path(&rpo.replace_path)
.lb(&rpo.load_balance, &upstream_vec, &self.server_name, &rpo.path)
.opts(&rpo.upstream_options)
.build()
.unwrap();
upstream.insert(elem.path.clone(), elem);
});
if self.reverse_proxy.iter().filter(|rpo| rpo.path.is_none()).count() >= 2 {
error!("Multiple default reverse proxy setting");
return Err(RpxyError::ConfigBuild("Invalid reverse proxy setting"));
}
if !(upstream.iter().all(|(_, elem)| {
!(elem.opts.contains(&UpstreamOption::ConvertHttpsTo11) && elem.opts.contains(&UpstreamOption::ConvertHttpsTo2))
})) {
error!("Either one of force_http11 or force_http2 can be enabled");
return Err(RpxyError::ConfigBuild("Invalid upstream option setting"));
}
Ok(ReverseProxy { upstream })
}
}
/// Configuration parameters for single reverse proxy corresponding to the path
#[derive(PartialEq, Eq)]
pub struct ReverseProxyConfig {
pub path: Option<String>,
pub replace_path: Option<String>,
pub upstream: Vec<UpstreamUri>,
pub upstream_options: Option<Vec<String>>,
pub load_balance: Option<String>,
}
/// Configuration parameters for single upstream destination from a reverse proxy
#[derive(PartialEq, Eq)]
pub struct UpstreamUri {
pub inner: hyper::Uri,
}
impl TryInto<Upstream> for &UpstreamUri {
type Error = anyhow::Error;
fn try_into(self) -> std::result::Result<Upstream, Self::Error> {
Ok(Upstream {
uri: self.inner.clone(),
})
}
}
/// Configuration parameters on TLS for a single backend application
#[derive(PartialEq, Eq)]
pub struct TlsConfig<T>
where
T: CryptoSource,
{
pub inner: T,
pub https_redirection: bool,
}
#[derive(Debug, Clone, Default)]
/// Counter for serving requests
pub struct RequestCount(Arc<AtomicUsize>);

View file

@ -8,19 +8,15 @@ mod log;
mod proxy;
mod utils;
use crate::{error::*, handler::HttpMessageHandlerBuilder, log::*, proxy::ProxyBuilder};
use crate::{error::*, globals::Globals, handler::HttpMessageHandlerBuilder, log::*, proxy::ProxyBuilder};
use futures::future::select_all;
use hyper::Client;
// use hyper_trust_dns::TrustDnsResolver;
use std::sync::Arc;
pub use crate::{
backend::{
Backend, BackendBuilder, Backends, ReverseProxy, Upstream, UpstreamGroup, UpstreamGroupBuilder, UpstreamOption,
},
certs::{CertsAndKeys, CryptoSource},
globals::{Globals, ProxyConfig}, // TODO: BackendConfigに変える
utils::{BytesName, PathNameBytesExp},
globals::{AppConfig, AppConfigList, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri},
};
pub mod reexports {
pub use hyper::Uri;
@ -28,10 +24,21 @@ pub mod reexports {
}
/// Entrypoint that creates and spawns tasks of reverse proxy services
pub async fn entrypoint<T>(globals: Arc<Globals<T>>) -> Result<()>
pub async fn entrypoint<T>(
proxy_config: ProxyConfig,
app_config_list: AppConfigList<T>,
runtime_handle: tokio::runtime::Handle,
) -> Result<()>
where
T: CryptoSource + Clone + Send + Sync + 'static,
{
// build global
let globals = Arc::new(Globals {
proxy_config,
backends: app_config_list.try_into()?,
request_count: Default::default(),
runtime_handle,
});
// let connector = TrustDnsResolver::default().into_rustls_webpki_https_connector();
let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()

View file

@ -119,7 +119,13 @@ where
transport_config_quic
.max_concurrent_bidi_streams(self.globals.proxy_config.h3_max_concurrent_bidistream)
.max_concurrent_uni_streams(self.globals.proxy_config.h3_max_concurrent_unistream)
.max_idle_timeout(self.globals.proxy_config.h3_max_idle_timeout);
.max_idle_timeout(
self
.globals
.proxy_config
.h3_max_idle_timeout
.map(|v| quinn::IdleTimeout::try_from(v).unwrap()),
);
let mut server_config_h3 = QuicServerConfig::with_crypto(Arc::new(rustls_server_config));
server_config_h3.transport = Arc::new(transport_config_quic);