diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index f63a06c..3b1afc9 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -28,6 +28,8 @@ pub enum RpxyError { HyperIncomingLikeNewClosed, #[error("New body write aborted")] HyperNewBodyWriteAborted, + #[error("Hyper error in serving request or response body type: {0}")] + HyperBodyError(#[from] hyper::Error), // http/3 errors #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] diff --git a/rpxy-lib/src/forwarder/cache/cache_main.rs b/rpxy-lib/src/forwarder/cache/cache_main.rs index 2bc4548..659ac41 100644 --- a/rpxy-lib/src/forwarder/cache/cache_main.rs +++ b/rpxy-lib/src/forwarder/cache/cache_main.rs @@ -118,7 +118,7 @@ impl RpxyCache { .map(|f| { if f.is_data() { let data_bytes = f.data_ref().unwrap().clone(); - println!("ddddde"); + debug!("cache data bytes of {} bytes", data_bytes.len()) // TODO: cache data bytes as file or on memory // fileにするかmemoryにするかの判断はある程度までバッファしてやってという手を使うことになる。途中までキャッシュしたやつはどうするかとかいう判断も必要。 // ファイルとObjectのbindをどうやってするか diff --git a/rpxy-lib/src/forwarder/mod.rs b/rpxy-lib/src/forwarder/mod.rs index d53cd73..26aa0c9 100644 --- a/rpxy-lib/src/forwarder/mod.rs +++ b/rpxy-lib/src/forwarder/mod.rs @@ -2,9 +2,9 @@ mod cache; mod client; -use crate::hyper_ext::body::{IncomingLike, IncomingOr}; +use crate::hyper_ext::body::RequestBody; -pub(crate) type Forwarder = client::Forwarder>; +pub(crate) type Forwarder = client::Forwarder; pub(crate) use client::ForwardRequest; #[cfg(feature = "cache")] diff --git a/rpxy-lib/src/hyper_ext/body_type.rs b/rpxy-lib/src/hyper_ext/body_type.rs index c1eb54b..a143eac 100644 --- a/rpxy-lib/src/hyper_ext/body_type.rs +++ b/rpxy-lib/src/hyper_ext/body_type.rs @@ -1,25 +1,11 @@ -// use http::Response; -use http_body_util::{combinators, BodyExt, Either, Empty, Full}; +use super::body::IncomingLike; +use crate::error::RpxyError; +use http_body_util::{combinators, BodyExt, Empty, Full}; use hyper::body::{Body, Bytes, Incoming}; use std::pin::Pin; /// Type for synthetic boxed body pub(crate) type BoxBody = combinators::BoxBody; -/// Type for either passthrough body or given body type, specifically synthetic boxed body -pub(crate) type IncomingOr = Either; - -// /// helper function to build http response with passthrough body -// pub(crate) fn wrap_incoming_body_response(response: Response) -> Response> -// where -// B: hyper::body::Body, -// { -// response.map(IncomingOr::Left) -// } - -// /// helper function to build http response with synthetic body -// pub(crate) fn wrap_synthetic_body_response(response: Response) -> Response> { -// response.map(IncomingOr::Right) -// } /// helper function to build a empty body pub(crate) fn empty() -> BoxBody { @@ -31,6 +17,30 @@ pub(crate) fn full(body: Bytes) -> BoxBody { Full::new(body).map_err(|never| match never {}).boxed() } +/* ------------------------------------ */ +/// Request body used in this project +/// - Incoming: just a type that only forwards the downstream request body to upstream. +/// - IncomingLike: a Incoming-like type in which channel is used +pub(crate) enum RequestBody { + Incoming(Incoming), + IncomingLike(IncomingLike), +} + +impl Body for RequestBody { + type Data = bytes::Bytes; + type Error = RpxyError; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, Self::Error>>> { + match self.get_mut() { + RequestBody::Incoming(incoming) => Pin::new(incoming).poll_frame(cx).map_err(RpxyError::HyperBodyError), + RequestBody::IncomingLike(incoming_like) => Pin::new(incoming_like).poll_frame(cx), + } + } +} + /* ------------------------------------ */ #[cfg(feature = "cache")] use futures::channel::mpsc::UnboundedReceiver; @@ -44,8 +54,8 @@ pub(crate) type UnboundedStreamBody = StreamBody, @@ -68,5 +78,6 @@ impl Body for ResponseBody { #[cfg(feature = "cache")] ResponseBody::Streamed(streamed) => Pin::new(streamed).poll_frame(cx), } + .map_err(RpxyError::HyperBodyError) } } diff --git a/rpxy-lib/src/hyper_ext/mod.rs b/rpxy-lib/src/hyper_ext/mod.rs index 8b3776c..a4c5196 100644 --- a/rpxy-lib/src/hyper_ext/mod.rs +++ b/rpxy-lib/src/hyper_ext/mod.rs @@ -12,5 +12,5 @@ pub(crate) mod rt { #[allow(unused)] pub(crate) mod body { pub(crate) use super::body_incoming_like::IncomingLike; - pub(crate) use super::body_type::{empty, full, BoxBody, IncomingOr, ResponseBody, UnboundedStreamBody}; + pub(crate) use super::body_type::{empty, full, BoxBody, RequestBody, ResponseBody, UnboundedStreamBody}; } diff --git a/rpxy-lib/src/message_handler/handler_main.rs b/rpxy-lib/src/message_handler/handler_main.rs index b5ae87d..ceb5db4 100644 --- a/rpxy-lib/src/message_handler/handler_main.rs +++ b/rpxy-lib/src/message_handler/handler_main.rs @@ -11,7 +11,7 @@ use crate::{ error::*, forwarder::{ForwardRequest, Forwarder}, globals::Globals, - hyper_ext::body::{IncomingLike, IncomingOr, ResponseBody}, + hyper_ext::body::{RequestBody, ResponseBody}, log::*, name_exp::ServerName, }; @@ -53,7 +53,7 @@ where /// Responsible to passthrough responses from backend applications or generate synthetic error responses. pub async fn handle_request( &self, - req: Request>, + req: Request, client_addr: SocketAddr, // For access control listen_addr: SocketAddr, tls_enabled: bool, @@ -94,7 +94,7 @@ where async fn handle_request_inner( &self, log_data: &mut HttpMessageLog, - mut req: Request>, + mut req: Request, client_addr: SocketAddr, // For access control listen_addr: SocketAddr, tls_enabled: bool, diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index 61328b2..0295430 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -2,7 +2,7 @@ use super::proxy_main::Proxy; use crate::{ crypto::CryptoSource, error::*, - hyper_ext::body::{IncomingLike, IncomingOr}, + hyper_ext::body::{IncomingLike, RequestBody}, log::*, name_exp::ServerName, }; @@ -137,7 +137,7 @@ where Ok(()) as RpxyResult<()> }); - let new_req: Request> = Request::from_parts(req_parts, IncomingOr::Right(req_body)); + let new_req: Request = Request::from_parts(req_parts, RequestBody::IncomingLike(req_body)); let res = self .message_handler .handle_request( @@ -155,6 +155,7 @@ where match send_stream.send_response(new_res).await { Ok(_) => { debug!("HTTP/3 response to connection successful"); + // on-demand body streaming to downstream without expanding the object onto memory. loop { let frame = match new_body.frame().await { Some(frame) => frame, @@ -175,16 +176,6 @@ where send_stream.send_trailers(trailers).await?; } } - // // aggregate body without copying - // let body_data = new_body - // .collect() - // .await - // .map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?; - - // // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes() - // send_stream.send_data(body_data.to_bytes()).await?; - - // TODO: needs handling trailer? should be included in body from handler. } Err(err) => { error!("Unable to send response to connection peer: {:?}", err); diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 2d7a649..4fea840 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -5,7 +5,7 @@ use crate::{ error::*, globals::Globals, hyper_ext::{ - body::{IncomingOr, ResponseBody}, + body::{RequestBody, ResponseBody}, rt::LocalExecutor, }, log::*, @@ -39,7 +39,7 @@ where { handler .handle_request( - req.map(IncomingOr::Left), + req.map(RequestBody::Incoming), client_addr, listen_addr, tls_enabled,