wip: refactoring all the structure and improve error messages

This commit is contained in:
Jun Kurihara 2023-11-22 22:48:14 +09:00
commit de91c7a68f
No known key found for this signature in database
GPG key ID: 6D3FEE70E498C15B
10 changed files with 268 additions and 56 deletions

31
rpxy-lib/src/count.rs Normal file
View file

@ -0,0 +1,31 @@
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
#[derive(Debug, Clone, Default)]
/// Counter for serving requests
pub struct RequestCount(Arc<AtomicUsize>);
impl RequestCount {
pub fn current(&self) -> usize {
self.0.load(Ordering::Relaxed)
}
pub fn increment(&self) -> usize {
self.0.fetch_add(1, Ordering::Relaxed)
}
pub fn decrement(&self) -> usize {
let mut count;
while {
count = self.0.load(Ordering::Relaxed);
count > 0
&& self
.0
.compare_exchange(count, count - 1, Ordering::Relaxed, Ordering::Relaxed)
!= Ok(count)
} {}
count
}
}

View file

@ -1,8 +1,11 @@
pub use anyhow::{anyhow, bail, ensure, Context};
use thiserror::Error;
pub type Result<T> = std::result::Result<T, RpxyError>;
pub type RpxyResult<T> = std::result::Result<T, RpxyError>;
/// Describes things that can go wrong in the Rpxy
#[derive(Debug, Error)]
pub enum RpxyError {}
pub enum RpxyError {
#[error("IO error: {0}")]
Io(#[from] std::io::Error),
}

View file

@ -1,13 +1,30 @@
use crate::{certs::CryptoSource, constants::*};
use std::{net::SocketAddr, time::Duration};
use crate::{certs::CryptoSource, constants::*, count::RequestCount};
use std::{net::SocketAddr, sync::Arc, time::Duration};
/// Global object containing proxy configurations and shared object like counters.
/// But note that in Globals, we do not have Mutex and RwLock. It is indeed, the context shared among async tasks.
pub struct Globals {
/// Configuration parameters for proxy transport and request handlers
pub proxy_config: ProxyConfig,
/// Shared context - Counter for serving requests
pub request_count: RequestCount,
/// Shared context - Async task runtime handler
pub runtime_handle: tokio::runtime::Handle,
/// Shared context - Notify object to stop async tasks
pub term_notify: Option<Arc<tokio::sync::Notify>>,
}
/// Configuration parameters for proxy transport and request handlers
#[derive(PartialEq, Eq, Clone)]
pub struct ProxyConfig {
pub listen_sockets: Vec<SocketAddr>, // when instantiate server
pub http_port: Option<u16>, // when instantiate server
pub https_port: Option<u16>, // when instantiate server
pub tcp_listen_backlog: u32, // when instantiate server
/// listen socket addresses
pub listen_sockets: Vec<SocketAddr>,
/// http port
pub http_port: Option<u16>,
/// https port
pub https_port: Option<u16>,
/// tcp listen backlog
pub tcp_listen_backlog: u32,
pub proxy_timeout: Duration, // when serving requests at Proxy
pub upstream_timeout: Duration, // when serving requests at Handler

View file

@ -0,0 +1,23 @@
use tokio::runtime::Handle;
#[derive(Clone)]
/// Executor for hyper
pub struct LocalExecutor {
runtime_handle: Handle,
}
impl LocalExecutor {
pub fn new(runtime_handle: Handle) -> Self {
LocalExecutor { runtime_handle }
}
}
impl<F> hyper::rt::Executor<F> for LocalExecutor
where
F: std::future::Future + Send + 'static,
F::Output: Send,
{
fn execute(&self, fut: F) {
self.runtime_handle.spawn(fut);
}
}

View file

@ -1,10 +1,14 @@
mod certs;
mod constants;
mod count;
mod error;
mod globals;
mod hyper_executor;
mod log;
mod proxy;
use crate::{error::*, log::*};
use crate::{error::*, globals::Globals, log::*, proxy::Proxy};
use futures::future::select_all;
use std::sync::Arc;
pub use crate::{
@ -25,7 +29,7 @@ pub async fn entrypoint<T>(
app_config_list: &AppConfigList<T>,
runtime_handle: &tokio::runtime::Handle,
term_notify: Option<Arc<tokio::sync::Notify>>,
) -> Result<()>
) -> RpxyResult<()>
where
T: CryptoSource + Clone + Send + Sync + 'static,
{
@ -58,15 +62,18 @@ where
info!("Cache is disabled")
}
// // build global
// let globals = Arc::new(Globals {
// proxy_config: proxy_config.clone(),
// backends: app_config_list.clone().try_into()?,
// request_count: Default::default(),
// runtime_handle: runtime_handle.clone(),
// term_notify: term_notify.clone(),
// });
// build global shared context
let globals = Arc::new(Globals {
proxy_config: proxy_config.clone(),
request_count: Default::default(),
runtime_handle: runtime_handle.clone(),
term_notify: term_notify.clone(),
});
// TODO: 1. build backends, and make it contained in Arc
// app_config_list: app_config_list.clone(),
// TODO: 2. build message handler with Arc-ed http_client and backends, and make it contained in Arc as well
// // build message handler including a request forwarder
// let msg_handler = Arc::new(
// HttpMessageHandlerBuilder::default()
@ -75,31 +82,31 @@ where
// .build()?,
// );
// let http_server = Arc::new(build_http_server(&globals));
// TODO: 3. spawn each proxy for a given socket with copied Arc-ed message_handler.
// build hyper connection builder shared with proxy instances
let connection_builder = proxy::connection_builder(&globals);
// let addresses = globals.proxy_config.listen_sockets.clone();
// let futures = select_all(addresses.into_iter().map(|addr| {
// let mut tls_enabled = false;
// if let Some(https_port) = globals.proxy_config.https_port {
// tls_enabled = https_port == addr.port()
// }
// spawn each proxy for a given socket with copied Arc-ed backend, message_handler and connection builder.
let addresses = globals.proxy_config.listen_sockets.clone();
let futures_iter = addresses.into_iter().map(|listening_on| {
let mut tls_enabled = false;
if let Some(https_port) = globals.proxy_config.https_port {
tls_enabled = https_port == listening_on.port()
}
let proxy = Proxy {
globals: globals.clone(),
listening_on,
tls_enabled,
connection_builder: connection_builder.clone(),
// TODO: message_handler
};
globals.runtime_handle.spawn(async move { proxy.start().await })
});
// let proxy = ProxyBuilder::default()
// .globals(globals.clone())
// .listening_on(addr)
// .tls_enabled(tls_enabled)
// .http_server(http_server.clone())
// .msg_handler(msg_handler.clone())
// .build()
// .unwrap();
// globals.runtime_handle.spawn(async move { proxy.start().await })
// }));
// // wait for all future
// if let (Ok(Err(e)), _, _) = futures.await {
// error!("Some proxy services are down: {}", e);
// };
// wait for all future
if let (Ok(Err(e)), _, _) = select_all(futures_iter).await {
error!("Some proxy services are down: {}", e);
};
Ok(())
}

22
rpxy-lib/src/proxy/mod.rs Normal file
View file

@ -0,0 +1,22 @@
mod proxy_main;
mod socket;
use crate::{globals::Globals, hyper_executor::LocalExecutor};
use hyper_util::server::{self, conn::auto::Builder as ConnectionBuilder};
use std::sync::Arc;
pub(crate) use proxy_main::Proxy;
/// build connection builder shared with proxy instances
pub(crate) fn connection_builder(globals: &Arc<Globals>) -> Arc<ConnectionBuilder<LocalExecutor>> {
let executor = LocalExecutor::new(globals.runtime_handle.clone());
let mut http_server = server::conn::auto::Builder::new(executor);
http_server
.http1()
.keep_alive(globals.proxy_config.keepalive)
.pipeline_flush(true);
http_server
.http2()
.max_concurrent_streams(globals.proxy_config.max_concurrent_streams);
Arc::new(http_server)
}

View file

@ -0,0 +1,63 @@
use super::socket::bind_tcp_socket;
use crate::{error::RpxyResult, globals::Globals, hyper_executor::LocalExecutor, log::*};
use hyper_util::server::conn::auto::Builder as ConnectionBuilder;
use std::{net::SocketAddr, sync::Arc};
/// Proxy main object responsible to serve requests received from clients at the given socket address.
pub(crate) struct Proxy<E = LocalExecutor> {
/// global context shared among async tasks
pub globals: Arc<Globals>,
/// listen socket address
pub listening_on: SocketAddr,
/// whether TLS is enabled or not
pub tls_enabled: bool,
/// hyper connection builder serving http request
pub connection_builder: Arc<ConnectionBuilder<E>>,
}
impl Proxy {
/// Start without TLS (HTTP cleartext)
async fn start_without_tls(&self) -> RpxyResult<()> {
let listener_service = async {
let tcp_socket = bind_tcp_socket(&self.listening_on)?;
let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?;
info!("Start TCP proxy serving with HTTP request for configured host names");
while let Ok((stream, client_addr)) = tcp_listener.accept().await {
// self.serve_connection(TokioIo::new(stream), client_addr, None);
}
Ok(()) as RpxyResult<()>
};
listener_service.await?;
Ok(())
}
/// Entrypoint for HTTP/1.1, 2 and 3 servers
pub async fn start(&self) -> RpxyResult<()> {
let proxy_service = async {
// if self.tls_enabled {
// self.start_with_tls().await
// } else {
self.start_without_tls().await
// }
};
match &self.globals.term_notify {
Some(term) => {
tokio::select! {
_ = proxy_service => {
warn!("Proxy service got down");
}
_ = term.notified() => {
info!("Proxy service listening on {} receives term signal", self.listening_on);
}
}
}
None => {
proxy_service.await?;
warn!("Proxy service got down");
}
}
Ok(())
}
}

View file

@ -0,0 +1,46 @@
use crate::{error::*, log::*};
#[cfg(feature = "http3-quinn")]
use socket2::{Domain, Protocol, Socket, Type};
use std::net::SocketAddr;
#[cfg(feature = "http3-quinn")]
use std::net::UdpSocket;
use tokio::net::TcpSocket;
/// Bind TCP socket to the given `SocketAddr`, and returns the TCP socket with `SO_REUSEADDR` and `SO_REUSEPORT` options.
/// This option is required to re-bind the socket address when the proxy instance is reconstructed.
pub(super) fn bind_tcp_socket(listening_on: &SocketAddr) -> RpxyResult<TcpSocket> {
let tcp_socket = if listening_on.is_ipv6() {
TcpSocket::new_v6()
} else {
TcpSocket::new_v4()
}?;
tcp_socket.set_reuseaddr(true)?;
tcp_socket.set_reuseport(true)?;
if let Err(e) = tcp_socket.bind(*listening_on) {
error!("Failed to bind TCP socket: {}", e);
return Err(RpxyError::Io(e));
};
Ok(tcp_socket)
}
#[cfg(feature = "http3-quinn")]
/// Bind UDP socket to the given `SocketAddr`, and returns the UDP socket with `SO_REUSEADDR` and `SO_REUSEPORT` options.
/// This option is required to re-bind the socket address when the proxy instance is reconstructed.
pub(super) fn bind_udp_socket(listening_on: &SocketAddr) -> RpxyResult<UdpSocket> {
let socket = if listening_on.is_ipv6() {
Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP))
} else {
Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP))
}?;
socket.set_reuse_address(true)?; // This isn't necessary?
socket.set_reuse_port(true)?;
socket.set_nonblocking(true)?; // This was made true inside quinn. so this line isn't necessary here. but just in case.
if let Err(e) = socket.bind(&(*listening_on).into()) {
error!("Failed to bind UDP socket: {}", e);
return Err(RpxyError::Io(e));
};
let udp_socket: UdpSocket = socket.into();
Ok(udp_socket)
}