update
This commit is contained in:
parent
fd4425afb4
commit
fc296a4d01
5 changed files with 101 additions and 80 deletions
|
|
@ -11,3 +11,9 @@ pub const CERTS_WATCH_DELAY_SECS: u32 = 30;
|
|||
|
||||
#[cfg(feature = "h3")]
|
||||
pub const H3_ALT_SVC_MAX_AGE: u32 = 3600;
|
||||
|
||||
#[cfg(feature = "h3")]
|
||||
pub const H3_RESPONSE_BUF_SIZE: usize = 65_536; // 64KB
|
||||
|
||||
#[cfg(feature = "h3")]
|
||||
pub const H3_REQUEST_MAX_BODY_SIZE: usize = 268_435_456; // 256MB
|
||||
|
|
|
|||
|
|
@ -1,9 +1,9 @@
|
|||
use super::Proxy;
|
||||
use crate::{backend::ServerNameLC, error::*, log::*};
|
||||
use bytes::{Buf, Bytes};
|
||||
use crate::{backend::ServerNameLC, constants::*, error::*, log::*};
|
||||
use bytes::{Buf, BufMut, Bytes, BytesMut};
|
||||
use h3::{quic::BidiStream, server::RequestStream};
|
||||
use hyper::{client::connect::Connect, Body, Request, Response};
|
||||
use std::net::SocketAddr;
|
||||
use std::{io::Read, net::SocketAddr};
|
||||
use tokio::time::{timeout, Duration};
|
||||
|
||||
impl<T> Proxy<T>
|
||||
|
|
@ -103,20 +103,24 @@ where
|
|||
{
|
||||
let (req_parts, _) = req.into_parts();
|
||||
|
||||
// TODO: h3 -> h2/http1.1等のプロトコル変換のため、一旦バッファリング。
|
||||
// 本来はbodyは直でstreamでcopy_bidirectionalしてして転送した方がいい。やむなし。
|
||||
let mut body_chunk: Vec<u8> = Vec::new();
|
||||
while let Some(request_body) = stream.recv_data().await? {
|
||||
// TODO: h3 -> h2/http1.1等のプロトコル変換のため、一旦全部バッファリングしないと無理そう。H3->H3ならBytesを直に流し込めるのだが。
|
||||
let mut body_buf = BytesMut::new();
|
||||
while let Some(chunk) = stream.recv_data().await? {
|
||||
debug!("HTTP/3 request body");
|
||||
body_chunk.extend_from_slice(request_body.chunk());
|
||||
if body_buf.len() + chunk.remaining() > H3_REQUEST_MAX_BODY_SIZE {
|
||||
error!("Exceeds max request body size for HTTP/3");
|
||||
return Err(anyhow!("Exceeds max request body size for HTTP/3"));
|
||||
}
|
||||
body_buf.put(chunk);
|
||||
}
|
||||
// trailers
|
||||
let trailers = stream.recv_trailers().await?;
|
||||
|
||||
// generate streamed body with trailers using channel
|
||||
let (body_sender, req_body) = Body::channel();
|
||||
self.globals.runtime_handle.spawn(async move {
|
||||
let mut sender = body_sender;
|
||||
sender.send_data(Bytes::from(body_chunk)).await?;
|
||||
sender.send_data(body_buf.freeze()).await?;
|
||||
if trailers.is_some() {
|
||||
debug!("HTTP/3 request with trailers");
|
||||
sender.send_trailers(trailers.unwrap()).await?;
|
||||
|
|
@ -143,15 +147,19 @@ where
|
|||
match stream.send_response(new_res).await {
|
||||
Ok(_) => {
|
||||
debug!("HTTP/3 response to connection successful");
|
||||
// loop {
|
||||
// let mut buf = BytesMut::with_capacity(4096 * 10);
|
||||
// if file.read_buf(&mut buf).await? == 0 {
|
||||
// break;
|
||||
// }
|
||||
// stream.send_data(buf.freeze()).await?;
|
||||
// }
|
||||
let data = hyper::body::to_bytes(new_body).await?;
|
||||
stream.send_data(data).await?;
|
||||
let body_data = hyper::body::aggregate(new_body).await?; // aggregate body without copying
|
||||
let mut reader = body_data.reader();
|
||||
let mut buf = [0u8; H3_RESPONSE_BUF_SIZE];
|
||||
loop {
|
||||
let num = reader.read(&mut buf)?;
|
||||
if num == 0 {
|
||||
break;
|
||||
}
|
||||
stream
|
||||
.send_data(Bytes::copy_from_slice(&buf[..num]))
|
||||
.await?;
|
||||
}
|
||||
// TODO: needs handling trailer? should be included in body from handler.
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Unable to send response to connection peer: {:?}", err);
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue