From aa95ab9c16a5eaa9ae5a39444f5ce56505ece667 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Wed, 6 Jul 2022 13:16:44 +0900 Subject: [PATCH] fix bug and somewhat reusable streams --- src/proxy/proxy_h3.rs | 19 +++++++++---------- src/proxy/proxy_handler.rs | 2 +- src/proxy/proxy_tls.rs | 10 +++++++++- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/proxy/proxy_h3.rs b/src/proxy/proxy_h3.rs index ee9ab47..8db08a3 100644 --- a/src/proxy/proxy_h3.rs +++ b/src/proxy/proxy_h3.rs @@ -10,7 +10,6 @@ where T: Connect + Clone + Sync + Send + 'static, { pub async fn client_serve_h3(self, conn: quinn::Connecting) -> Result<()> { - // TODO: client数の管理 let client_addr = conn.remote_address(); match conn.await { @@ -37,11 +36,11 @@ where info!("HTTP/3 connection established"); // TODO: Work around for timeout... - //while let Some((req, stream)) = h3_conn - // if let Some((req, stream)) = + // while let Some((req, stream)) = h3_conn + // .accept() // .await // .map_err(|e| anyhow!("HTTP/3 accept failed: {}", e))? - if let Some((req, stream)) = match tokio::time::timeout( + while let Some((req, stream)) = match tokio::time::timeout( tokio::time::Duration::from_millis(H3_CONN_TIMEOUT_MILLIS), h3_conn.accept(), ) @@ -49,7 +48,7 @@ where { Ok(r) => r.map_err(|e| anyhow!("HTTP/3 accept failed: {}", e))?, Err(_) => { - warn!("No incoming stream after connection establishment"); + warn!("No incoming stream after connection establishment / previous use"); h3_conn.shutdown(0).await?; return Ok(()); } @@ -66,11 +65,11 @@ where if let Err(e) = self_inner.handle_request_h3(req, stream, client_addr).await { error!("HTTP/3 request failed: {}", e); } - // TODO: Work around for timeout - if let Err(e) = h3_conn.shutdown(0).await { - error!("HTTP/3 connection shutdown failed: {}", e); - } - debug!("HTTP/3 connection shutdown (currently shutdown each time as work around for timeout)"); + // // TODO: Work around for timeout + // if let Err(e) = h3_conn.shutdown(0).await { + // error!("HTTP/3 connection shutdown failed: {}", e); + // } + // debug!("HTTP/3 connection shutdown (currently shutdown each time as work around for timeout)"); }); } } diff --git a/src/proxy/proxy_handler.rs b/src/proxy/proxy_handler.rs index a82d489..15cc21e 100644 --- a/src/proxy/proxy_handler.rs +++ b/src/proxy/proxy_handler.rs @@ -115,7 +115,7 @@ where res_backend.headers_mut().insert( hyper::header::ALT_SVC, format!( - "h3=\":{}\"; ma={}, h3-29\":{}\"; ma={}", + "h3=\":{}\"; ma={}, h3-29=\":{}\"; ma={}", port, H3_ALT_SVC_MAX_AGE, port, H3_ALT_SVC_MAX_AGE ) .parse() diff --git a/src/proxy/proxy_tls.rs b/src/proxy/proxy_tls.rs index a97d712..d70f230 100644 --- a/src/proxy/proxy_tls.rs +++ b/src/proxy/proxy_tls.rs @@ -144,11 +144,19 @@ where let peekable_incoming = std::pin::Pin::new(&mut p); if let Some(conn) = peekable_incoming.get_mut().next().await { if success { + // TODO: client数の管理 + let clients_count = self.globals.clients_count.clone(); + if clients_count.increment() > self.globals.max_clients { + clients_count.decrement(); + continue; + } let fut = self.clone().client_serve_h3(conn); - self.globals.runtime_handle.spawn(async { + self.globals.runtime_handle.spawn(async move { if let Err(e) = fut.await { warn!("QUIC or HTTP/3 connection failed: {}", e) } + clients_count.decrement(); + debug!("Client #: {}", clients_count.current()); }); } } else {