wip: feat: change request body from either to explicit enum

This commit is contained in:
Jun Kurihara 2023-12-12 20:17:13 +09:00
commit 1c18f3836a
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
8 changed files with 45 additions and 41 deletions

View file

@ -28,6 +28,8 @@ pub enum RpxyError {
HyperIncomingLikeNewClosed, HyperIncomingLikeNewClosed,
#[error("New body write aborted")] #[error("New body write aborted")]
HyperNewBodyWriteAborted, HyperNewBodyWriteAborted,
#[error("Hyper error in serving request or response body type: {0}")]
HyperBodyError(#[from] hyper::Error),
// http/3 errors // http/3 errors
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]

View file

@ -118,7 +118,7 @@ impl RpxyCache {
.map(|f| { .map(|f| {
if f.is_data() { if f.is_data() {
let data_bytes = f.data_ref().unwrap().clone(); let data_bytes = f.data_ref().unwrap().clone();
println!("ddddde"); debug!("cache data bytes of {} bytes", data_bytes.len())
// TODO: cache data bytes as file or on memory // TODO: cache data bytes as file or on memory
// fileにするかmemoryにするかの判断はある程度までバッファしてやってという手を使うことになる。途中までキャッシュしたやつはどうするかとかいう判断も必要。 // fileにするかmemoryにするかの判断はある程度までバッファしてやってという手を使うことになる。途中までキャッシュしたやつはどうするかとかいう判断も必要。
// ファイルとObjectのbindをどうやってするか // ファイルとObjectのbindをどうやってするか

View file

@ -2,9 +2,9 @@
mod cache; mod cache;
mod client; mod client;
use crate::hyper_ext::body::{IncomingLike, IncomingOr}; use crate::hyper_ext::body::RequestBody;
pub(crate) type Forwarder<C> = client::Forwarder<C, IncomingOr<IncomingLike>>; pub(crate) type Forwarder<C> = client::Forwarder<C, RequestBody>;
pub(crate) use client::ForwardRequest; pub(crate) use client::ForwardRequest;
#[cfg(feature = "cache")] #[cfg(feature = "cache")]

View file

@ -1,25 +1,11 @@
// use http::Response; use super::body::IncomingLike;
use http_body_util::{combinators, BodyExt, Either, Empty, Full}; use crate::error::RpxyError;
use http_body_util::{combinators, BodyExt, Empty, Full};
use hyper::body::{Body, Bytes, Incoming}; use hyper::body::{Body, Bytes, Incoming};
use std::pin::Pin; use std::pin::Pin;
/// Type for synthetic boxed body /// Type for synthetic boxed body
pub(crate) type BoxBody = combinators::BoxBody<Bytes, hyper::Error>; pub(crate) type BoxBody = combinators::BoxBody<Bytes, hyper::Error>;
/// Type for either passthrough body or given body type, specifically synthetic boxed body
pub(crate) type IncomingOr<B> = Either<Incoming, B>;
// /// helper function to build http response with passthrough body
// pub(crate) fn wrap_incoming_body_response<B>(response: Response<Incoming>) -> Response<IncomingOr<B>>
// where
// B: hyper::body::Body,
// {
// response.map(IncomingOr::Left)
// }
// /// helper function to build http response with synthetic body
// pub(crate) fn wrap_synthetic_body_response<B>(response: Response<B>) -> Response<IncomingOr<B>> {
// response.map(IncomingOr::Right)
// }
/// helper function to build a empty body /// helper function to build a empty body
pub(crate) fn empty() -> BoxBody { pub(crate) fn empty() -> BoxBody {
@ -31,6 +17,30 @@ pub(crate) fn full(body: Bytes) -> BoxBody {
Full::new(body).map_err(|never| match never {}).boxed() Full::new(body).map_err(|never| match never {}).boxed()
} }
/* ------------------------------------ */
/// Request body used in this project
/// - Incoming: just a type that only forwards the downstream request body to upstream.
/// - IncomingLike: a Incoming-like type in which channel is used
pub(crate) enum RequestBody {
Incoming(Incoming),
IncomingLike(IncomingLike),
}
impl Body for RequestBody {
type Data = bytes::Bytes;
type Error = RpxyError;
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
match self.get_mut() {
RequestBody::Incoming(incoming) => Pin::new(incoming).poll_frame(cx).map_err(RpxyError::HyperBodyError),
RequestBody::IncomingLike(incoming_like) => Pin::new(incoming_like).poll_frame(cx),
}
}
}
/* ------------------------------------ */ /* ------------------------------------ */
#[cfg(feature = "cache")] #[cfg(feature = "cache")]
use futures::channel::mpsc::UnboundedReceiver; use futures::channel::mpsc::UnboundedReceiver;
@ -44,8 +54,8 @@ pub(crate) type UnboundedStreamBody = StreamBody<UnboundedReceiver<Result<Frame<
/// Response body use in this project /// Response body use in this project
/// - Incoming: just a type that only forwards the upstream response body to downstream. /// - Incoming: just a type that only forwards the upstream response body to downstream.
/// - BoxedCache: a type that is generated from cache, e.g.,, small byte object. /// - Boxed: a type that is generated from cache or synthetic response body, e.g.,, small byte object.
/// - StreamedCache: another type that is generated from cache as stream, e.g., large byte object. /// - Streamed: another type that is generated from stream, e.g., large byte object.
pub(crate) enum ResponseBody { pub(crate) enum ResponseBody {
Incoming(Incoming), Incoming(Incoming),
Boxed(BoxBody), Boxed(BoxBody),
@ -55,7 +65,7 @@ pub(crate) enum ResponseBody {
impl Body for ResponseBody { impl Body for ResponseBody {
type Data = bytes::Bytes; type Data = bytes::Bytes;
type Error = hyper::Error; type Error = RpxyError;
fn poll_frame( fn poll_frame(
self: Pin<&mut Self>, self: Pin<&mut Self>,
@ -68,5 +78,6 @@ impl Body for ResponseBody {
#[cfg(feature = "cache")] #[cfg(feature = "cache")]
ResponseBody::Streamed(streamed) => Pin::new(streamed).poll_frame(cx), ResponseBody::Streamed(streamed) => Pin::new(streamed).poll_frame(cx),
} }
.map_err(RpxyError::HyperBodyError)
} }
} }

View file

@ -12,5 +12,5 @@ pub(crate) mod rt {
#[allow(unused)] #[allow(unused)]
pub(crate) mod body { pub(crate) mod body {
pub(crate) use super::body_incoming_like::IncomingLike; pub(crate) use super::body_incoming_like::IncomingLike;
pub(crate) use super::body_type::{empty, full, BoxBody, IncomingOr, ResponseBody, UnboundedStreamBody}; pub(crate) use super::body_type::{empty, full, BoxBody, RequestBody, ResponseBody, UnboundedStreamBody};
} }

View file

@ -11,7 +11,7 @@ use crate::{
error::*, error::*,
forwarder::{ForwardRequest, Forwarder}, forwarder::{ForwardRequest, Forwarder},
globals::Globals, globals::Globals,
hyper_ext::body::{IncomingLike, IncomingOr, ResponseBody}, hyper_ext::body::{RequestBody, ResponseBody},
log::*, log::*,
name_exp::ServerName, name_exp::ServerName,
}; };
@ -53,7 +53,7 @@ where
/// Responsible to passthrough responses from backend applications or generate synthetic error responses. /// Responsible to passthrough responses from backend applications or generate synthetic error responses.
pub async fn handle_request( pub async fn handle_request(
&self, &self,
req: Request<IncomingOr<IncomingLike>>, req: Request<RequestBody>,
client_addr: SocketAddr, // For access control client_addr: SocketAddr, // For access control
listen_addr: SocketAddr, listen_addr: SocketAddr,
tls_enabled: bool, tls_enabled: bool,
@ -94,7 +94,7 @@ where
async fn handle_request_inner( async fn handle_request_inner(
&self, &self,
log_data: &mut HttpMessageLog, log_data: &mut HttpMessageLog,
mut req: Request<IncomingOr<IncomingLike>>, mut req: Request<RequestBody>,
client_addr: SocketAddr, // For access control client_addr: SocketAddr, // For access control
listen_addr: SocketAddr, listen_addr: SocketAddr,
tls_enabled: bool, tls_enabled: bool,

View file

@ -2,7 +2,7 @@ use super::proxy_main::Proxy;
use crate::{ use crate::{
crypto::CryptoSource, crypto::CryptoSource,
error::*, error::*,
hyper_ext::body::{IncomingLike, IncomingOr}, hyper_ext::body::{IncomingLike, RequestBody},
log::*, log::*,
name_exp::ServerName, name_exp::ServerName,
}; };
@ -137,7 +137,7 @@ where
Ok(()) as RpxyResult<()> Ok(()) as RpxyResult<()>
}); });
let new_req: Request<IncomingOr<IncomingLike>> = Request::from_parts(req_parts, IncomingOr::Right(req_body)); let new_req: Request<RequestBody> = Request::from_parts(req_parts, RequestBody::IncomingLike(req_body));
let res = self let res = self
.message_handler .message_handler
.handle_request( .handle_request(
@ -155,6 +155,7 @@ where
match send_stream.send_response(new_res).await { match send_stream.send_response(new_res).await {
Ok(_) => { Ok(_) => {
debug!("HTTP/3 response to connection successful"); debug!("HTTP/3 response to connection successful");
// on-demand body streaming to downstream without expanding the object onto memory.
loop { loop {
let frame = match new_body.frame().await { let frame = match new_body.frame().await {
Some(frame) => frame, Some(frame) => frame,
@ -175,16 +176,6 @@ where
send_stream.send_trailers(trailers).await?; send_stream.send_trailers(trailers).await?;
} }
} }
// // aggregate body without copying
// let body_data = new_body
// .collect()
// .await
// .map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?;
// // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes()
// send_stream.send_data(body_data.to_bytes()).await?;
// TODO: needs handling trailer? should be included in body from handler.
} }
Err(err) => { Err(err) => {
error!("Unable to send response to connection peer: {:?}", err); error!("Unable to send response to connection peer: {:?}", err);

View file

@ -5,7 +5,7 @@ use crate::{
error::*, error::*,
globals::Globals, globals::Globals,
hyper_ext::{ hyper_ext::{
body::{IncomingOr, ResponseBody}, body::{RequestBody, ResponseBody},
rt::LocalExecutor, rt::LocalExecutor,
}, },
log::*, log::*,
@ -39,7 +39,7 @@ where
{ {
handler handler
.handle_request( .handle_request(
req.map(IncomingOr::Left), req.map(RequestBody::Incoming),
client_addr, client_addr,
listen_addr, listen_addr,
tls_enabled, tls_enabled,