From da01f8ee6a8dc9e137b9b51a0cc2874cccbc68f8 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Wed, 20 Jul 2022 17:32:35 +0900 Subject: [PATCH] reconsider to load request/response body data. believe quinn to properly handle buffering body --- Cargo.toml | 1 + src/constants.rs | 6 ++-- src/proxy/proxy_h3.rs | 71 ++++++++++++++++++++++--------------------- 3 files changed, 42 insertions(+), 36 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 3ff008e..18485a7 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ h3-quinn = { git = "https://github.com/hyperium/h3.git" } bytes = "1.1.0" mimalloc-rust = "0.2.0" + [dev-dependencies] diff --git a/src/constants.rs b/src/constants.rs index 64a9b0e..76bcb7b 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -12,8 +12,10 @@ pub const CERTS_WATCH_DELAY_SECS: u32 = 30; #[cfg(feature = "h3")] pub const H3_ALT_SVC_MAX_AGE: u32 = 3600; -#[cfg(feature = "h3")] -pub const H3_RESPONSE_BUF_SIZE: usize = 65_536; // 64KB +// #[cfg(feature = "h3")] +// pub const H3_RESPONSE_BUF_SIZE: usize = 65_536; // 64KB +// #[cfg(feature = "h3")] +// pub const H3_REQUEST_BUF_SIZE: usize = 65_536; // 64KB // handled by quinn #[cfg(feature = "h3")] pub const H3_REQUEST_MAX_BODY_SIZE: usize = 268_435_456; // 256MB diff --git a/src/proxy/proxy_h3.rs b/src/proxy/proxy_h3.rs index be44917..7749ba2 100644 --- a/src/proxy/proxy_h3.rs +++ b/src/proxy/proxy_h3.rs @@ -1,9 +1,9 @@ use super::Proxy; use crate::{backend::ServerNameLC, constants::*, error::*, log::*}; -use bytes::{Buf, BufMut, Bytes, BytesMut}; +use bytes::{Buf, Bytes}; use h3::{quic::BidiStream, server::RequestStream}; use hyper::{client::connect::Connect, Body, Request, Response}; -use std::{io::Read, net::SocketAddr}; +use std::net::SocketAddr; use tokio::time::{timeout, Duration}; impl Proxy @@ -94,36 +94,43 @@ where async fn handle_stream_h3( self, req: Request<()>, - mut stream: RequestStream, + stream: RequestStream, client_addr: SocketAddr, tls_server_name: ServerNameLC, ) -> Result<()> where - S: BidiStream, + S: BidiStream + Send + 'static, + >::RecvStream: Send, { let (req_parts, _) = req.into_parts(); - // let (mut send_stream, mut recv_stream) = stream.split(); // TODO: split stream and async body handling - - // TODO: h3 -> h2/http1.1等のプロトコル変換のため、一旦全部バッファリングしないと無理そう。H3->H3ならBytesを直に流し込めるのだが。 - let mut body_buf = BytesMut::new(); - while let Some(chunk) = stream.recv_data().await? { - debug!("HTTP/3 request body"); - if body_buf.len() + chunk.remaining() > H3_REQUEST_MAX_BODY_SIZE { - error!("Exceeds max request body size for HTTP/3"); - return Err(anyhow!("Exceeds max request body size for HTTP/3")); - } - body_buf.put(chunk); - } - // trailers - let trailers = stream.recv_trailers().await?; + // split stream and async body handling + let (mut send_stream, mut recv_stream) = stream.split(); // generate streamed body with trailers using channel let (body_sender, req_body) = Body::channel(); + + // Buffering and sending body through channel for protocol conversion like h3 -> h2/http1.1 + // The underling buffering, i.e., buffer given by the API recv_data.await?, is handled by quinn. self.globals.runtime_handle.spawn(async move { let mut sender = body_sender; - sender.send_data(body_buf.freeze()).await?; + let mut size = 0usize; + while let Some(mut body) = recv_stream.recv_data().await? { + debug!("HTTP/3 incoming request body"); + size += body.remaining(); + if size > H3_REQUEST_MAX_BODY_SIZE { + error!("Exceeds max request body size for HTTP/3"); + return Err(anyhow!("Exceeds max request body size for HTTP/3")); + } + // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes + sender + .send_data(body.copy_to_bytes(body.remaining())) + .await?; + } + + // trailers: use inner for work around. (directly get trailer) + let trailers = recv_stream.as_mut().recv_trailers().await?; if trailers.is_some() { - debug!("HTTP/3 request with trailers"); + debug!("HTTP/3 incoming request trailers"); sender.send_trailers(trailers.unwrap()).await?; } Ok(()) as Result<()> @@ -145,27 +152,23 @@ where let (new_res_parts, new_body) = res.into_parts(); let new_res = Response::from_parts(new_res_parts, ()); - match stream.send_response(new_res).await { + match send_stream.send_response(new_res).await { Ok(_) => { debug!("HTTP/3 response to connection successful"); - let body_data = hyper::body::aggregate(new_body).await?; // aggregate body without copying - let mut reader = body_data.reader(); - let mut buf = [0u8; H3_RESPONSE_BUF_SIZE]; - loop { - let num = reader.read(&mut buf)?; - if num == 0 { - break; - } - stream - .send_data(Bytes::copy_from_slice(&buf[..num])) - .await?; - } + // aggregate body without copying + let mut body_data = hyper::body::aggregate(new_body).await?; + + // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes + send_stream + .send_data(body_data.copy_to_bytes(body_data.remaining())) + .await?; + // TODO: needs handling trailer? should be included in body from handler. } Err(err) => { error!("Unable to send response to connection peer: {:?}", err); } } - Ok(stream.finish().await?) + Ok(send_stream.finish().await?) } }