reconsider to load request/response body data. believe quinn to properly handle buffering body
This commit is contained in:
parent
f51cd5e012
commit
da01f8ee6a
3 changed files with 42 additions and 36 deletions
|
|
@ -54,6 +54,7 @@ h3-quinn = { git = "https://github.com/hyperium/h3.git" }
|
||||||
bytes = "1.1.0"
|
bytes = "1.1.0"
|
||||||
mimalloc-rust = "0.2.0"
|
mimalloc-rust = "0.2.0"
|
||||||
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -12,8 +12,10 @@ pub const CERTS_WATCH_DELAY_SECS: u32 = 30;
|
||||||
#[cfg(feature = "h3")]
|
#[cfg(feature = "h3")]
|
||||||
pub const H3_ALT_SVC_MAX_AGE: u32 = 3600;
|
pub const H3_ALT_SVC_MAX_AGE: u32 = 3600;
|
||||||
|
|
||||||
#[cfg(feature = "h3")]
|
// #[cfg(feature = "h3")]
|
||||||
pub const H3_RESPONSE_BUF_SIZE: usize = 65_536; // 64KB
|
// pub const H3_RESPONSE_BUF_SIZE: usize = 65_536; // 64KB
|
||||||
|
// #[cfg(feature = "h3")]
|
||||||
|
// pub const H3_REQUEST_BUF_SIZE: usize = 65_536; // 64KB // handled by quinn
|
||||||
|
|
||||||
#[cfg(feature = "h3")]
|
#[cfg(feature = "h3")]
|
||||||
pub const H3_REQUEST_MAX_BODY_SIZE: usize = 268_435_456; // 256MB
|
pub const H3_REQUEST_MAX_BODY_SIZE: usize = 268_435_456; // 256MB
|
||||||
|
|
|
||||||
|
|
@ -1,9 +1,9 @@
|
||||||
use super::Proxy;
|
use super::Proxy;
|
||||||
use crate::{backend::ServerNameLC, constants::*, error::*, log::*};
|
use crate::{backend::ServerNameLC, constants::*, error::*, log::*};
|
||||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
use bytes::{Buf, Bytes};
|
||||||
use h3::{quic::BidiStream, server::RequestStream};
|
use h3::{quic::BidiStream, server::RequestStream};
|
||||||
use hyper::{client::connect::Connect, Body, Request, Response};
|
use hyper::{client::connect::Connect, Body, Request, Response};
|
||||||
use std::{io::Read, net::SocketAddr};
|
use std::net::SocketAddr;
|
||||||
use tokio::time::{timeout, Duration};
|
use tokio::time::{timeout, Duration};
|
||||||
|
|
||||||
impl<T> Proxy<T>
|
impl<T> Proxy<T>
|
||||||
|
|
@ -94,36 +94,43 @@ where
|
||||||
async fn handle_stream_h3<S>(
|
async fn handle_stream_h3<S>(
|
||||||
self,
|
self,
|
||||||
req: Request<()>,
|
req: Request<()>,
|
||||||
mut stream: RequestStream<S, Bytes>,
|
stream: RequestStream<S, Bytes>,
|
||||||
client_addr: SocketAddr,
|
client_addr: SocketAddr,
|
||||||
tls_server_name: ServerNameLC,
|
tls_server_name: ServerNameLC,
|
||||||
) -> Result<()>
|
) -> Result<()>
|
||||||
where
|
where
|
||||||
S: BidiStream<Bytes>,
|
S: BidiStream<Bytes> + Send + 'static,
|
||||||
|
<S as BidiStream<Bytes>>::RecvStream: Send,
|
||||||
{
|
{
|
||||||
let (req_parts, _) = req.into_parts();
|
let (req_parts, _) = req.into_parts();
|
||||||
// let (mut send_stream, mut recv_stream) = stream.split(); // TODO: split stream and async body handling
|
// split stream and async body handling
|
||||||
|
let (mut send_stream, mut recv_stream) = stream.split();
|
||||||
// TODO: h3 -> h2/http1.1等のプロトコル変換のため、一旦全部バッファリングしないと無理そう。H3->H3ならBytesを直に流し込めるのだが。
|
|
||||||
let mut body_buf = BytesMut::new();
|
|
||||||
while let Some(chunk) = stream.recv_data().await? {
|
|
||||||
debug!("HTTP/3 request body");
|
|
||||||
if body_buf.len() + chunk.remaining() > H3_REQUEST_MAX_BODY_SIZE {
|
|
||||||
error!("Exceeds max request body size for HTTP/3");
|
|
||||||
return Err(anyhow!("Exceeds max request body size for HTTP/3"));
|
|
||||||
}
|
|
||||||
body_buf.put(chunk);
|
|
||||||
}
|
|
||||||
// trailers
|
|
||||||
let trailers = stream.recv_trailers().await?;
|
|
||||||
|
|
||||||
// generate streamed body with trailers using channel
|
// generate streamed body with trailers using channel
|
||||||
let (body_sender, req_body) = Body::channel();
|
let (body_sender, req_body) = Body::channel();
|
||||||
|
|
||||||
|
// Buffering and sending body through channel for protocol conversion like h3 -> h2/http1.1
|
||||||
|
// The underling buffering, i.e., buffer given by the API recv_data.await?, is handled by quinn.
|
||||||
self.globals.runtime_handle.spawn(async move {
|
self.globals.runtime_handle.spawn(async move {
|
||||||
let mut sender = body_sender;
|
let mut sender = body_sender;
|
||||||
sender.send_data(body_buf.freeze()).await?;
|
let mut size = 0usize;
|
||||||
|
while let Some(mut body) = recv_stream.recv_data().await? {
|
||||||
|
debug!("HTTP/3 incoming request body");
|
||||||
|
size += body.remaining();
|
||||||
|
if size > H3_REQUEST_MAX_BODY_SIZE {
|
||||||
|
error!("Exceeds max request body size for HTTP/3");
|
||||||
|
return Err(anyhow!("Exceeds max request body size for HTTP/3"));
|
||||||
|
}
|
||||||
|
// create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes
|
||||||
|
sender
|
||||||
|
.send_data(body.copy_to_bytes(body.remaining()))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
// trailers: use inner for work around. (directly get trailer)
|
||||||
|
let trailers = recv_stream.as_mut().recv_trailers().await?;
|
||||||
if trailers.is_some() {
|
if trailers.is_some() {
|
||||||
debug!("HTTP/3 request with trailers");
|
debug!("HTTP/3 incoming request trailers");
|
||||||
sender.send_trailers(trailers.unwrap()).await?;
|
sender.send_trailers(trailers.unwrap()).await?;
|
||||||
}
|
}
|
||||||
Ok(()) as Result<()>
|
Ok(()) as Result<()>
|
||||||
|
|
@ -145,27 +152,23 @@ where
|
||||||
let (new_res_parts, new_body) = res.into_parts();
|
let (new_res_parts, new_body) = res.into_parts();
|
||||||
let new_res = Response::from_parts(new_res_parts, ());
|
let new_res = Response::from_parts(new_res_parts, ());
|
||||||
|
|
||||||
match stream.send_response(new_res).await {
|
match send_stream.send_response(new_res).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
debug!("HTTP/3 response to connection successful");
|
debug!("HTTP/3 response to connection successful");
|
||||||
let body_data = hyper::body::aggregate(new_body).await?; // aggregate body without copying
|
// aggregate body without copying
|
||||||
let mut reader = body_data.reader();
|
let mut body_data = hyper::body::aggregate(new_body).await?;
|
||||||
let mut buf = [0u8; H3_RESPONSE_BUF_SIZE];
|
|
||||||
loop {
|
// create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes
|
||||||
let num = reader.read(&mut buf)?;
|
send_stream
|
||||||
if num == 0 {
|
.send_data(body_data.copy_to_bytes(body_data.remaining()))
|
||||||
break;
|
.await?;
|
||||||
}
|
|
||||||
stream
|
|
||||||
.send_data(Bytes::copy_from_slice(&buf[..num]))
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
// TODO: needs handling trailer? should be included in body from handler.
|
// TODO: needs handling trailer? should be included in body from handler.
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Unable to send response to connection peer: {:?}", err);
|
error!("Unable to send response to connection peer: {:?}", err);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(stream.finish().await?)
|
Ok(send_stream.finish().await?)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue