reconsider timeout for h3 connections
This commit is contained in:
parent
9c93b1cc31
commit
4f5a1cbf91
6 changed files with 35 additions and 35 deletions
|
|
@ -2,7 +2,8 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"];
|
||||||
pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"];
|
pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"];
|
||||||
// pub const HTTP_LISTEN_PORT: u16 = 8080;
|
// pub const HTTP_LISTEN_PORT: u16 = 8080;
|
||||||
// pub const HTTPS_LISTEN_PORT: u16 = 8443;
|
// pub const HTTPS_LISTEN_PORT: u16 = 8443;
|
||||||
pub const TIMEOUT_SEC: u64 = 10;
|
pub const PROXY_TIMEOUT_SEC: u64 = 10;
|
||||||
|
pub const UPSTREAM_TIMEOUT_SEC: u64 = 8;
|
||||||
pub const MAX_CLIENTS: usize = 512;
|
pub const MAX_CLIENTS: usize = 512;
|
||||||
pub const MAX_CONCURRENT_STREAMS: u32 = 16;
|
pub const MAX_CONCURRENT_STREAMS: u32 = 16;
|
||||||
// #[cfg(feature = "tls")]
|
// #[cfg(feature = "tls")]
|
||||||
|
|
@ -10,5 +11,3 @@ pub const CERTS_WATCH_DELAY_SECS: u32 = 10;
|
||||||
|
|
||||||
#[cfg(feature = "h3")]
|
#[cfg(feature = "h3")]
|
||||||
pub const H3_ALT_SVC_MAX_AGE: u32 = 60;
|
pub const H3_ALT_SVC_MAX_AGE: u32 = 60;
|
||||||
#[cfg(feature = "h3")]
|
|
||||||
pub const H3_CONN_TIMEOUT_MILLIS: u64 = 2000;
|
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,9 @@ pub struct Globals {
|
||||||
pub http_port: Option<u16>,
|
pub http_port: Option<u16>,
|
||||||
pub https_port: Option<u16>,
|
pub https_port: Option<u16>,
|
||||||
|
|
||||||
pub timeout: Duration,
|
pub proxy_timeout: Duration,
|
||||||
|
pub upstream_timeout: Duration,
|
||||||
|
|
||||||
pub max_clients: usize,
|
pub max_clients: usize,
|
||||||
pub clients_count: ClientsCount,
|
pub clients_count: ClientsCount,
|
||||||
pub max_concurrent_streams: u32,
|
pub max_concurrent_streams: u32,
|
||||||
|
|
|
||||||
|
|
@ -60,7 +60,11 @@ fn main() {
|
||||||
http_port: None,
|
http_port: None,
|
||||||
https_port: None,
|
https_port: None,
|
||||||
http3: false,
|
http3: false,
|
||||||
timeout: Duration::from_secs(TIMEOUT_SEC),
|
|
||||||
|
// TODO: Reconsider each timeout values
|
||||||
|
proxy_timeout: Duration::from_secs(PROXY_TIMEOUT_SEC),
|
||||||
|
upstream_timeout: Duration::from_secs(UPSTREAM_TIMEOUT_SEC),
|
||||||
|
|
||||||
max_clients: MAX_CLIENTS,
|
max_clients: MAX_CLIENTS,
|
||||||
clients_count: Default::default(),
|
clients_count: Default::default(),
|
||||||
max_concurrent_streams: MAX_CONCURRENT_STREAMS,
|
max_concurrent_streams: MAX_CONCURRENT_STREAMS,
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,10 @@ use hyper::{
|
||||||
Body, Client, Request, Response, StatusCode, Uri, Version,
|
Body, Client, Request, Response, StatusCode, Uri, Version,
|
||||||
};
|
};
|
||||||
use std::{net::SocketAddr, sync::Arc};
|
use std::{net::SocketAddr, sync::Arc};
|
||||||
use tokio::io::copy_bidirectional;
|
use tokio::{
|
||||||
|
io::copy_bidirectional,
|
||||||
|
time::{timeout, Duration},
|
||||||
|
};
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct HttpMessageHandler<T>
|
pub struct HttpMessageHandler<T>
|
||||||
|
|
@ -92,7 +95,11 @@ where
|
||||||
|
|
||||||
// Forward request to
|
// Forward request to
|
||||||
let mut res_backend = {
|
let mut res_backend = {
|
||||||
match tokio::time::timeout(self.globals.timeout, self.forwarder.request(req_forwarded)).await
|
match timeout(
|
||||||
|
self.globals.upstream_timeout + Duration::from_secs(1),
|
||||||
|
self.forwarder.request(req_forwarded),
|
||||||
|
)
|
||||||
|
.await
|
||||||
{
|
{
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
return http_error(StatusCode::GATEWAY_TIMEOUT);
|
return http_error(StatusCode::GATEWAY_TIMEOUT);
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,10 @@
|
||||||
use super::Proxy;
|
use super::Proxy;
|
||||||
use crate::{constants::*, error::*, log::*};
|
use crate::{error::*, log::*};
|
||||||
use bytes::{Buf, Bytes};
|
use bytes::{Buf, Bytes};
|
||||||
use h3::{quic::BidiStream, server::RequestStream};
|
use h3::{quic::BidiStream, server::RequestStream};
|
||||||
use hyper::{client::connect::Connect, Body, HeaderMap, Request, Response};
|
use hyper::{client::connect::Connect, Body, HeaderMap, Request, Response};
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use tokio::time::Duration;
|
use tokio::time::{timeout, Duration};
|
||||||
|
|
||||||
impl<T> Proxy<T>
|
impl<T> Proxy<T>
|
||||||
where
|
where
|
||||||
|
|
@ -17,10 +17,9 @@ where
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
let fut = self.clone().handle_connection_h3(conn);
|
let fut = self.clone().handle_connection_h3(conn);
|
||||||
let timeout_sec = self.globals.timeout;
|
|
||||||
self.globals.runtime_handle.spawn(async move {
|
self.globals.runtime_handle.spawn(async move {
|
||||||
if let Err(e) = tokio::time::timeout(timeout_sec + Duration::from_secs(1), fut).await {
|
// Timeout is based on underlying quic
|
||||||
// TODO: ここのtimeoutはどの値を使うべき?
|
if let Err(e) = fut.await {
|
||||||
warn!("QUIC or HTTP/3 connection failed: {}", e)
|
warn!("QUIC or HTTP/3 connection failed: {}", e)
|
||||||
}
|
}
|
||||||
clients_count.decrement();
|
clients_count.decrement();
|
||||||
|
|
@ -54,22 +53,16 @@ where
|
||||||
.await?;
|
.await?;
|
||||||
info!("HTTP/3 connection established");
|
info!("HTTP/3 connection established");
|
||||||
|
|
||||||
// TODO: Work around for timeout...
|
// Does this work enough?
|
||||||
// while let Some((req, stream)) = h3_conn
|
// while let Some((req, stream)) = h3_conn
|
||||||
// .accept()
|
// .accept()
|
||||||
// .await
|
// .await
|
||||||
// .map_err(|e| anyhow!("HTTP/3 accept failed: {}", e))?
|
// .map_err(|e| anyhow!("HTTP/3 accept failed: {}", e))?
|
||||||
while let Some((req, stream)) = match tokio::time::timeout(
|
while let Some((req, stream)) = match h3_conn.accept().await {
|
||||||
Duration::from_millis(H3_CONN_TIMEOUT_MILLIS),
|
Ok(opt_req) => opt_req,
|
||||||
h3_conn.accept(),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(r) => r.map_err(|e| anyhow!("HTTP/3 accept failed: {}", e))?,
|
|
||||||
Err(_) => {
|
Err(_) => {
|
||||||
warn!("No incoming stream after connection establishment / previous use");
|
warn!("HTTP/3 failed to accept incoming connection (likely timeout)");
|
||||||
h3_conn.shutdown(0).await?;
|
return Ok(h3_conn.shutdown(0).await?);
|
||||||
return Ok(());
|
|
||||||
}
|
}
|
||||||
} {
|
} {
|
||||||
debug!(
|
debug!(
|
||||||
|
|
@ -81,24 +74,20 @@ where
|
||||||
|
|
||||||
let self_inner = self.clone();
|
let self_inner = self.clone();
|
||||||
self.globals.runtime_handle.spawn(async move {
|
self.globals.runtime_handle.spawn(async move {
|
||||||
if let Err(e) = tokio::time::timeout(
|
if let Err(e) = timeout(
|
||||||
self_inner.globals.timeout + Duration::from_secs(1), // timeout per stream
|
self_inner.globals.proxy_timeout + Duration::from_secs(1), // timeout per stream are considered as same as one in http2
|
||||||
self_inner.handle_stream_h3(req, stream, client_addr),
|
self_inner.handle_stream_h3(req, stream, client_addr),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
error!("HTTP/3 request failed: {}", e);
|
error!("HTTP/3 failed to process stream: {}", e);
|
||||||
}
|
}
|
||||||
// // TODO: Work around for timeout
|
|
||||||
// if let Err(e) = h3_conn.shutdown(0).await {
|
|
||||||
// error!("HTTP/3 connection shutdown failed: {}", e);
|
|
||||||
// }
|
|
||||||
// debug!("HTTP/3 connection shutdown (currently shutdown each time as work around for timeout)");
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
warn!("QUIC accepting connection failed: {:?}", err);
|
warn!("QUIC accepting connection failed: {:?}", err);
|
||||||
|
return Err(anyhow!("{}", err));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ use tokio::{
|
||||||
io::{AsyncRead, AsyncWrite},
|
io::{AsyncRead, AsyncWrite},
|
||||||
net::TcpListener,
|
net::TcpListener,
|
||||||
runtime::Handle,
|
runtime::Handle,
|
||||||
time::Duration,
|
time::{timeout, Duration},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
|
@ -57,8 +57,8 @@ where
|
||||||
|
|
||||||
// let handler_inner = self.msg_handler.clone();
|
// let handler_inner = self.msg_handler.clone();
|
||||||
self.globals.runtime_handle.clone().spawn(async move {
|
self.globals.runtime_handle.clone().spawn(async move {
|
||||||
tokio::time::timeout(
|
timeout(
|
||||||
self.globals.timeout + Duration::from_secs(1),
|
self.globals.proxy_timeout + Duration::from_secs(1),
|
||||||
server
|
server
|
||||||
.serve_connection(
|
.serve_connection(
|
||||||
stream,
|
stream,
|
||||||
|
|
@ -106,7 +106,6 @@ where
|
||||||
let server = server.with_executor(executor);
|
let server = server.with_executor(executor);
|
||||||
|
|
||||||
if self.tls_enabled {
|
if self.tls_enabled {
|
||||||
// #[cfg(feature = "tls")]
|
|
||||||
self.start_with_tls(server).await?;
|
self.start_with_tls(server).await?;
|
||||||
} else {
|
} else {
|
||||||
self.start_without_tls(server).await?;
|
self.start_without_tls(server).await?;
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue