refactor h3 handling
This commit is contained in:
		
					parent
					
						
							
								c3c6d4f4fd
							
						
					
				
			
			
				commit
				
					
						f2a341e198
					
				
			
		
					 4 changed files with 37 additions and 30 deletions
				
			
		|  | @ -2,12 +2,12 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; | ||||||
| pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; | pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; | ||||||
| // pub const HTTP_LISTEN_PORT: u16 = 8080;
 | // pub const HTTP_LISTEN_PORT: u16 = 8080;
 | ||||||
| // pub const HTTPS_LISTEN_PORT: u16 = 8443;
 | // pub const HTTPS_LISTEN_PORT: u16 = 8443;
 | ||||||
| pub const PROXY_TIMEOUT_SEC: u64 = 10; | pub const PROXY_TIMEOUT_SEC: u64 = 60; | ||||||
| pub const UPSTREAM_TIMEOUT_SEC: u64 = 8; | pub const UPSTREAM_TIMEOUT_SEC: u64 = 60; | ||||||
| pub const MAX_CLIENTS: usize = 512; | pub const MAX_CLIENTS: usize = 512; | ||||||
| pub const MAX_CONCURRENT_STREAMS: u32 = 16; | pub const MAX_CONCURRENT_STREAMS: u32 = 32; | ||||||
| // #[cfg(feature = "tls")]
 | // #[cfg(feature = "tls")]
 | ||||||
| pub const CERTS_WATCH_DELAY_SECS: u32 = 10; | pub const CERTS_WATCH_DELAY_SECS: u32 = 30; | ||||||
| 
 | 
 | ||||||
| #[cfg(feature = "h3")] | #[cfg(feature = "h3")] | ||||||
| pub const H3_ALT_SVC_MAX_AGE: u32 = 60; | pub const H3_ALT_SVC_MAX_AGE: u32 = 120; | ||||||
|  |  | ||||||
|  | @ -14,10 +14,7 @@ use hyper::{ | ||||||
|   Body, Client, Request, Response, StatusCode, Uri, Version, |   Body, Client, Request, Response, StatusCode, Uri, Version, | ||||||
| }; | }; | ||||||
| use std::{env, net::SocketAddr, sync::Arc}; | use std::{env, net::SocketAddr, sync::Arc}; | ||||||
| use tokio::{ | use tokio::{io::copy_bidirectional, time::timeout}; | ||||||
|   io::copy_bidirectional, |  | ||||||
|   time::{timeout, Duration}, |  | ||||||
| }; |  | ||||||
| 
 | 
 | ||||||
| #[derive(Clone)] | #[derive(Clone)] | ||||||
| pub struct HttpMessageHandler<T> | pub struct HttpMessageHandler<T> | ||||||
|  | @ -129,7 +126,7 @@ where | ||||||
|     // Forward request to
 |     // Forward request to
 | ||||||
|     let mut res_backend = { |     let mut res_backend = { | ||||||
|       match timeout( |       match timeout( | ||||||
|         self.globals.upstream_timeout + Duration::from_secs(1), |         self.globals.upstream_timeout, | ||||||
|         self.forwarder.request(req_forwarded), |         self.forwarder.request(req_forwarded), | ||||||
|       ) |       ) | ||||||
|       .await |       .await | ||||||
|  |  | ||||||
|  | @ -53,8 +53,11 @@ where | ||||||
|         //   .map_err(|e| anyhow!("HTTP/3 accept failed: {}", e))?
 |         //   .map_err(|e| anyhow!("HTTP/3 accept failed: {}", e))?
 | ||||||
|         while let Some((req, stream)) = match h3_conn.accept().await { |         while let Some((req, stream)) = match h3_conn.accept().await { | ||||||
|           Ok(opt_req) => opt_req, |           Ok(opt_req) => opt_req, | ||||||
|           Err(_) => { |           Err(e) => { | ||||||
|             warn!("HTTP/3 failed to accept incoming connection (likely timeout)"); |             warn!( | ||||||
|  |               "HTTP/3 failed to accept incoming connection (likely timeout): {}", | ||||||
|  |               e | ||||||
|  |             ); | ||||||
|             return Ok(h3_conn.shutdown(0).await?); |             return Ok(h3_conn.shutdown(0).await?); | ||||||
|           } |           } | ||||||
|         } { |         } { | ||||||
|  | @ -96,30 +99,32 @@ where | ||||||
|     tls_server_name: ServerNameLC, |     tls_server_name: ServerNameLC, | ||||||
|   ) -> Result<()> |   ) -> Result<()> | ||||||
|   where |   where | ||||||
|     S: BidiStream<Bytes>, |     S: BidiStream<Bytes> + Send + 'static, | ||||||
|   { |   { | ||||||
|     let (req_parts, _) = req.into_parts(); |     let (req_parts, _) = req.into_parts(); | ||||||
| 
 | 
 | ||||||
|     // TODO: h3 -> h2/http1.1などのプロトコル変換がなければ、bodyはBytes単位で直でsend_dataして転送した方がいい。やむなし。
 |     // TODO: h3 -> h2/http1.1等のプロトコル変換のため、一旦バッファリング。
 | ||||||
|  |     // 本来はbodyは直でstreamでcopy_bidirectionalしてして転送した方がいい。やむなし。
 | ||||||
|     let mut body_chunk: Vec<u8> = Vec::new(); |     let mut body_chunk: Vec<u8> = Vec::new(); | ||||||
|     while let Some(request_body) = stream.recv_data().await? { |     while let Some(request_body) = stream.recv_data().await? { | ||||||
|  |       debug!("HTTP/3 request body"); | ||||||
|       body_chunk.extend_from_slice(request_body.chunk()); |       body_chunk.extend_from_slice(request_body.chunk()); | ||||||
|     } |     } | ||||||
|     let body = if body_chunk.is_empty() { |  | ||||||
|       Body::default() |  | ||||||
|     } else { |  | ||||||
|       debug!("HTTP/3 request with non-empty body"); |  | ||||||
|       Body::from(body_chunk) |  | ||||||
|     }; |  | ||||||
|     // trailers
 |     // trailers
 | ||||||
|     let trailers = if let Some(trailers) = stream.recv_trailers().await? { |     let trailers = stream.recv_trailers().await?; | ||||||
|       debug!("HTTP/3 request with trailers"); |     // generate streamed body with trailers using channel
 | ||||||
|       trailers |     let (body_sender, req_body) = Body::channel(); | ||||||
|     } else { |     self.globals.runtime_handle.spawn(async move { | ||||||
|       HeaderMap::new() |       let mut sender = body_sender; | ||||||
|     }; |       sender.send_data(Bytes::from(body_chunk)).await?; | ||||||
|  |       if trailers.is_some() { | ||||||
|  |         debug!("HTTP/3 request with trailers"); | ||||||
|  |         sender.send_trailers(trailers.unwrap()).await?; | ||||||
|  |       } | ||||||
|  |       Ok(()) as Result<()> | ||||||
|  |     }); | ||||||
| 
 | 
 | ||||||
|     let new_req: Request<Body> = Request::from_parts(req_parts, body); |     let new_req: Request<Body> = Request::from_parts(req_parts, req_body); | ||||||
|     let res = self |     let res = self | ||||||
|       .msg_handler |       .msg_handler | ||||||
|       .clone() |       .clone() | ||||||
|  | @ -138,10 +143,15 @@ where | ||||||
|     match stream.send_response(new_res).await { |     match stream.send_response(new_res).await { | ||||||
|       Ok(_) => { |       Ok(_) => { | ||||||
|         debug!("HTTP/3 response to connection successful"); |         debug!("HTTP/3 response to connection successful"); | ||||||
|  |         // loop {
 | ||||||
|  |         //   let mut buf = BytesMut::with_capacity(4096 * 10);
 | ||||||
|  |         //   if file.read_buf(&mut buf).await? == 0 {
 | ||||||
|  |         //     break;
 | ||||||
|  |         //   }
 | ||||||
|  |         //   stream.send_data(buf.freeze()).await?;
 | ||||||
|  |         // }
 | ||||||
|         let data = hyper::body::to_bytes(new_body).await?; |         let data = hyper::body::to_bytes(new_body).await?; | ||||||
|         stream.send_data(data).await?; |         stream.send_data(data).await?; | ||||||
|         stream.send_trailers(trailers).await?; |  | ||||||
|         return Ok(stream.finish().await?); |  | ||||||
|       } |       } | ||||||
|       Err(err) => { |       Err(err) => { | ||||||
|         error!("Unable to send response to connection peer: {:?}", err); |         error!("Unable to send response to connection peer: {:?}", err); | ||||||
|  |  | ||||||
|  | @ -101,7 +101,7 @@ where | ||||||
|   } |   } | ||||||
| 
 | 
 | ||||||
|   #[cfg(feature = "h3")] |   #[cfg(feature = "h3")] | ||||||
|   async fn parse_sni_and_get_crypto_h3<'a>( |   async fn parse_sni_and_get_crypto_h3<'a, 'b>( | ||||||
|     &self, |     &self, | ||||||
|     peeked_conn: &mut quinn::Connecting, |     peeked_conn: &mut quinn::Connecting, | ||||||
|     server_crypto_map: &'a ServerCryptoMap, |     server_crypto_map: &'a ServerCryptoMap, | ||||||
|  |  | ||||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Jun Kurihara
				Jun Kurihara