wip: implementing message handler

This commit is contained in:
Jun Kurihara 2023-11-27 15:39:19 +09:00
commit a9ce26ae76
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
16 changed files with 520 additions and 98 deletions

View file

@ -1,10 +1,8 @@
use super::proxy_main::Proxy;
use crate::{
crypto::CryptoSource,
error::*,
hyper_ext::{
body::{IncomingLike, IncomingOr},
full, synthetic_response,
},
hyper_ext::body::{IncomingLike, IncomingOr},
log::*,
name_exp::ServerName,
};
@ -22,13 +20,11 @@ use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic
// use futures::Stream;
// use hyper_util::client::legacy::connect::Connect;
// impl<U> Proxy<U>
// where
// // T: Connect + Clone + Sync + Send + 'static,
// U: CryptoSource + Clone + Sync + Send + 'static,
// {
impl Proxy {
impl<U> Proxy<U>
where
// T: Connect + Clone + Sync + Send + 'static,
U: CryptoSource + Clone + Sync + Send + 'static,
{
pub(super) async fn h3_serve_connection<C>(
&self,
quic_connection: C,
@ -151,24 +147,17 @@ impl Proxy {
});
let mut new_req: Request<IncomingOr<IncomingLike>> = Request::from_parts(req_parts, IncomingOr::Right(req_body));
// let res = selfw
// .msg_handler
// .clone()
// .handle_request(
// new_req,
// client_addr,
// self.listening_on,
// self.tls_enabled,
// Some(tls_server_name),
// )
// .await?;
// TODO: TODO: TODO: remove later
let body = full(Bytes::from("hello h3 echo"));
// here response is IncomingOr<BoxBody> from message handler
let res = synthetic_response(Response::builder().body(body).unwrap())?;
/////////////////
// Response<IncomingOr<BoxBody>> wrapped by RpxyResult
let res = self
.message_handler
.handle_request(
new_req,
client_addr,
self.listening_on,
self.tls_enabled,
Some(tls_server_name),
)
.await?;
let (new_res_parts, new_body) = res.into_parts();
let new_res = Response::from_parts(new_res_parts, ());

View file

@ -1,16 +1,15 @@
use super::socket::bind_tcp_socket;
use crate::{
constants::TLS_HANDSHAKE_TIMEOUT_SEC,
crypto::{ServerCrypto, SniServerCryptoMap},
crypto::{CryptoSource, ServerCrypto, SniServerCryptoMap},
error::*,
globals::Globals,
hyper_ext::{
body::{BoxBody, IncomingOr},
full,
rt::LocalExecutor,
synthetic_response,
},
log::*,
message_handle::HttpMessageHandler,
name_exp::ServerName,
};
use futures::{select, FutureExt};
@ -26,34 +25,37 @@ use tokio::time::timeout;
/// Wrapper function to handle request for HTTP/1.1 and HTTP/2
/// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler
async fn serve_request(
async fn serve_request<U>(
mut req: Request<Incoming>,
// handler: Arc<HttpMessageHandler<T, U>>,
// handler: Arc<HttpMessageHandler<U>>,
handler: Arc<HttpMessageHandler<U>>,
client_addr: SocketAddr,
listen_addr: SocketAddr,
tls_enabled: bool,
tls_server_name: Option<ServerName>,
) -> RpxyResult<Response<IncomingOr<BoxBody>>> {
// match handler
// .handle_request(req, client_addr, listen_addr, tls_enabled, tls_server_name)
// .await?
// {
// Ok(res) => passthrough_response(res),
// Err(e) => synthetic_error_response(StatusCode::from(e)),
// }
//////////////
// TODO: remove later
let body = full(hyper::body::Bytes::from("hello"));
let res = Response::builder().body(body).unwrap();
synthetic_response(res)
//////////////
) -> RpxyResult<Response<IncomingOr<BoxBody>>>
where
// T: Connect + Clone + Sync + Send + 'static,
U: CryptoSource + Clone,
{
handler
.handle_request(
req.map(IncomingOr::Left),
client_addr,
listen_addr,
tls_enabled,
tls_server_name,
)
.await
}
#[derive(Clone)]
/// Proxy main object responsible to serve requests received from clients at the given socket address.
pub(crate) struct Proxy<E = LocalExecutor> {
pub(crate) struct Proxy<U, E = LocalExecutor>
where
// T: Connect + Clone + Sync + Send + 'static,
U: CryptoSource + Clone + Sync + Send + 'static,
{
/// global context shared among async tasks
pub globals: Arc<Globals>,
/// listen socket address
@ -62,9 +64,15 @@ pub(crate) struct Proxy<E = LocalExecutor> {
pub tls_enabled: bool,
/// hyper connection builder serving http request
pub connection_builder: Arc<ConnectionBuilder<E>>,
/// message handler serving incoming http request
pub message_handler: Arc<HttpMessageHandler<U>>,
}
impl Proxy {
impl<U> Proxy<U>
where
// T: Connect + Clone + Sync + Send + 'static,
U: CryptoSource + Clone + Sync + Send + 'static,
{
/// Serves requests from clients
fn serve_connection<I>(&self, stream: I, peer_addr: SocketAddr, tls_server_name: Option<ServerName>)
where
@ -78,7 +86,7 @@ impl Proxy {
debug!("Request incoming: current # {}", request_count.current());
let server_clone = self.connection_builder.clone();
// let msg_handler_clone = self.msg_handler.clone();
let message_handler_clone = self.message_handler.clone();
let timeout_sec = self.globals.proxy_config.proxy_timeout;
let tls_enabled = self.tls_enabled;
let listening_on = self.listening_on;
@ -90,7 +98,7 @@ impl Proxy {
service_fn(move |req: Request<Incoming>| {
serve_request(
req,
// msg_handler_clone.clone(),
message_handler_clone.clone(),
peer_addr,
listening_on,
tls_enabled,

View file

@ -1,15 +1,20 @@
use super::proxy_main::Proxy;
use super::socket::bind_udp_socket;
use crate::{crypto::ServerCrypto, error::*, log::*, name_exp::ByteName};
use crate::{
crypto::{CryptoSource, ServerCrypto},
error::*,
log::*,
name_exp::ByteName,
};
// use hyper_util::client::legacy::connect::Connect;
use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerConfig, TransportConfig};
use rustls::ServerConfig;
use std::sync::Arc;
impl Proxy
// where
// // T: Connect + Clone + Sync + Send + 'static,
// U: CryptoSource + Clone + Sync + Send + 'static,
impl<U> Proxy<U>
where
// T: Connect + Clone + Sync + Send + 'static,
U: CryptoSource + Clone + Sync + Send + 'static,
{
pub(super) async fn h3_listener_service(&self) -> RpxyResult<()> {
let Some(mut server_crypto_rx) = self.globals.cert_reloader_rx.clone() else {

View file

@ -10,7 +10,11 @@ use std::sync::Arc;
// use hyper_util::client::legacy::connect::Connect;
use s2n_quic::provider;
impl Proxy {
impl<U> Proxy<U>
where
// T: Connect + Clone + Sync + Send + 'static,
U: CryptoSource + Clone + Sync + Send + 'static,
{
/// Start UDP proxy serving with HTTP/3 request for configured host names
pub(super) async fn h3_listener_service(&self) -> RpxyResult<()> {
let Some(mut server_crypto_rx) = self.globals.cert_reloader_rx.clone() else {