From f2a341e1982fbd46dc2f1ac9df3899d50b2eddb0 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Mon, 18 Jul 2022 15:27:27 +0900 Subject: [PATCH] refactor h3 handling --- src/constants.rs | 10 ++++---- src/msg_handler/handler.rs | 7 ++---- src/proxy/proxy_h3.rs | 48 +++++++++++++++++++++++--------------- src/proxy/proxy_tls.rs | 2 +- 4 files changed, 37 insertions(+), 30 deletions(-) diff --git a/src/constants.rs b/src/constants.rs index 94e97ce..a8ff9ed 100644 --- a/src/constants.rs +++ b/src/constants.rs @@ -2,12 +2,12 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; // pub const HTTP_LISTEN_PORT: u16 = 8080; // pub const HTTPS_LISTEN_PORT: u16 = 8443; -pub const PROXY_TIMEOUT_SEC: u64 = 10; -pub const UPSTREAM_TIMEOUT_SEC: u64 = 8; +pub const PROXY_TIMEOUT_SEC: u64 = 60; +pub const UPSTREAM_TIMEOUT_SEC: u64 = 60; pub const MAX_CLIENTS: usize = 512; -pub const MAX_CONCURRENT_STREAMS: u32 = 16; +pub const MAX_CONCURRENT_STREAMS: u32 = 32; // #[cfg(feature = "tls")] -pub const CERTS_WATCH_DELAY_SECS: u32 = 10; +pub const CERTS_WATCH_DELAY_SECS: u32 = 30; #[cfg(feature = "h3")] -pub const H3_ALT_SVC_MAX_AGE: u32 = 60; +pub const H3_ALT_SVC_MAX_AGE: u32 = 120; diff --git a/src/msg_handler/handler.rs b/src/msg_handler/handler.rs index a7e5c64..76dd8fa 100644 --- a/src/msg_handler/handler.rs +++ b/src/msg_handler/handler.rs @@ -14,10 +14,7 @@ use hyper::{ Body, Client, Request, Response, StatusCode, Uri, Version, }; use std::{env, net::SocketAddr, sync::Arc}; -use tokio::{ - io::copy_bidirectional, - time::{timeout, Duration}, -}; +use tokio::{io::copy_bidirectional, time::timeout}; #[derive(Clone)] pub struct HttpMessageHandler @@ -129,7 +126,7 @@ where // Forward request to let mut res_backend = { match timeout( - self.globals.upstream_timeout + Duration::from_secs(1), + self.globals.upstream_timeout, self.forwarder.request(req_forwarded), ) .await diff --git a/src/proxy/proxy_h3.rs b/src/proxy/proxy_h3.rs index 069e533..db541f2 100644 --- a/src/proxy/proxy_h3.rs +++ b/src/proxy/proxy_h3.rs @@ -53,8 +53,11 @@ where // .map_err(|e| anyhow!("HTTP/3 accept failed: {}", e))? while let Some((req, stream)) = match h3_conn.accept().await { Ok(opt_req) => opt_req, - Err(_) => { - warn!("HTTP/3 failed to accept incoming connection (likely timeout)"); + Err(e) => { + warn!( + "HTTP/3 failed to accept incoming connection (likely timeout): {}", + e + ); return Ok(h3_conn.shutdown(0).await?); } } { @@ -96,30 +99,32 @@ where tls_server_name: ServerNameLC, ) -> Result<()> where - S: BidiStream, + S: BidiStream + Send + 'static, { let (req_parts, _) = req.into_parts(); - // TODO: h3 -> h2/http1.1などのプロトコル変換がなければ、bodyはBytes単位で直でsend_dataして転送した方がいい。やむなし。 + // TODO: h3 -> h2/http1.1等のプロトコル変換のため、一旦バッファリング。 + // 本来はbodyは直でstreamでcopy_bidirectionalしてして転送した方がいい。やむなし。 let mut body_chunk: Vec = Vec::new(); while let Some(request_body) = stream.recv_data().await? { + debug!("HTTP/3 request body"); body_chunk.extend_from_slice(request_body.chunk()); } - let body = if body_chunk.is_empty() { - Body::default() - } else { - debug!("HTTP/3 request with non-empty body"); - Body::from(body_chunk) - }; // trailers - let trailers = if let Some(trailers) = stream.recv_trailers().await? { - debug!("HTTP/3 request with trailers"); - trailers - } else { - HeaderMap::new() - }; + let trailers = stream.recv_trailers().await?; + // generate streamed body with trailers using channel + let (body_sender, req_body) = Body::channel(); + self.globals.runtime_handle.spawn(async move { + let mut sender = body_sender; + sender.send_data(Bytes::from(body_chunk)).await?; + if trailers.is_some() { + debug!("HTTP/3 request with trailers"); + sender.send_trailers(trailers.unwrap()).await?; + } + Ok(()) as Result<()> + }); - let new_req: Request = Request::from_parts(req_parts, body); + let new_req: Request = Request::from_parts(req_parts, req_body); let res = self .msg_handler .clone() @@ -138,10 +143,15 @@ where match stream.send_response(new_res).await { Ok(_) => { debug!("HTTP/3 response to connection successful"); + // loop { + // let mut buf = BytesMut::with_capacity(4096 * 10); + // if file.read_buf(&mut buf).await? == 0 { + // break; + // } + // stream.send_data(buf.freeze()).await?; + // } let data = hyper::body::to_bytes(new_body).await?; stream.send_data(data).await?; - stream.send_trailers(trailers).await?; - return Ok(stream.finish().await?); } Err(err) => { error!("Unable to send response to connection peer: {:?}", err); diff --git a/src/proxy/proxy_tls.rs b/src/proxy/proxy_tls.rs index bc38b3c..4d2c571 100644 --- a/src/proxy/proxy_tls.rs +++ b/src/proxy/proxy_tls.rs @@ -101,7 +101,7 @@ where } #[cfg(feature = "h3")] - async fn parse_sni_and_get_crypto_h3<'a>( + async fn parse_sni_and_get_crypto_h3<'a, 'b>( &self, peeked_conn: &mut quinn::Connecting, server_crypto_map: &'a ServerCryptoMap,