refactor: connection handling timeout
This commit is contained in:
parent
7645830c77
commit
8c0bbf17e2
6 changed files with 34 additions and 33 deletions
|
|
@ -176,9 +176,9 @@ impl TryInto<ProxyConfig> for &ConfigToml {
|
||||||
|
|
||||||
if let Some(timeout) = exp.connection_handling_timeout {
|
if let Some(timeout) = exp.connection_handling_timeout {
|
||||||
if timeout == 0u64 {
|
if timeout == 0u64 {
|
||||||
proxy_config.connection_handling_timeout = Duration::from_secs(u64::MAX);
|
proxy_config.connection_handling_timeout = None;
|
||||||
} else {
|
} else {
|
||||||
proxy_config.connection_handling_timeout = Duration::from_secs(timeout);
|
proxy_config.connection_handling_timeout = Some(Duration::from_secs(timeout));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -47,7 +47,7 @@ pub struct ProxyConfig {
|
||||||
pub sni_consistency: bool, // Handler
|
pub sni_consistency: bool, // Handler
|
||||||
/// Connection handling timeout
|
/// Connection handling timeout
|
||||||
/// timeout to handle a connection, total time of receive request, serve, and send response. this might limits the max length of response.
|
/// timeout to handle a connection, total time of receive request, serve, and send response. this might limits the max length of response.
|
||||||
pub connection_handling_timeout: Duration,
|
pub connection_handling_timeout: Option<Duration>,
|
||||||
|
|
||||||
#[cfg(feature = "cache")]
|
#[cfg(feature = "cache")]
|
||||||
pub cache_enabled: bool,
|
pub cache_enabled: bool,
|
||||||
|
|
@ -94,7 +94,7 @@ impl Default for ProxyConfig {
|
||||||
keepalive: true,
|
keepalive: true,
|
||||||
|
|
||||||
sni_consistency: true,
|
sni_consistency: true,
|
||||||
connection_handling_timeout: Duration::from_secs(u64::MAX),
|
connection_handling_timeout: None,
|
||||||
|
|
||||||
#[cfg(feature = "cache")]
|
#[cfg(feature = "cache")]
|
||||||
cache_enabled: false,
|
cache_enabled: false,
|
||||||
|
|
|
||||||
|
|
@ -55,10 +55,10 @@ where
|
||||||
if proxy_config.https_port.is_some() {
|
if proxy_config.https_port.is_some() {
|
||||||
info!("Listen port: {} (for TLS)", proxy_config.https_port.unwrap());
|
info!("Listen port: {} (for TLS)", proxy_config.https_port.unwrap());
|
||||||
}
|
}
|
||||||
if proxy_config.connection_handling_timeout.as_secs() < u64::MAX {
|
if proxy_config.connection_handling_timeout.is_some() {
|
||||||
info!(
|
info!(
|
||||||
"Force connection handling timeout: {} sec",
|
"Force connection handling timeout: {:?} sec",
|
||||||
proxy_config.connection_handling_timeout.as_secs()
|
proxy_config.connection_handling_timeout.unwrap_or_default().as_secs()
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
|
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
|
||||||
|
|
|
||||||
|
|
@ -70,12 +70,12 @@ where
|
||||||
let self_inner = self.clone();
|
let self_inner = self.clone();
|
||||||
let tls_server_name_inner = tls_server_name.clone();
|
let tls_server_name_inner = tls_server_name.clone();
|
||||||
self.globals.runtime_handle.spawn(async move {
|
self.globals.runtime_handle.spawn(async move {
|
||||||
if let Err(e) = tokio::time::timeout(
|
let fut = self_inner.h3_serve_stream(req, stream, client_addr, tls_server_name_inner);
|
||||||
self_inner.globals.proxy_config.connection_handling_timeout,
|
if let Some(connection_handling_timeout) = self_inner.globals.proxy_config.connection_handling_timeout {
|
||||||
self_inner.h3_serve_stream(req, stream, client_addr, tls_server_name_inner),
|
if let Err(e) = tokio::time::timeout(connection_handling_timeout, fut).await {
|
||||||
)
|
warn!("HTTP/3 error on serve stream: {}", e);
|
||||||
.await
|
};
|
||||||
{
|
} else if let Err(e) = fut.await {
|
||||||
warn!("HTTP/3 error on serve stream: {}", e);
|
warn!("HTTP/3 error on serve stream: {}", e);
|
||||||
}
|
}
|
||||||
request_count.decrement();
|
request_count.decrement();
|
||||||
|
|
|
||||||
|
|
@ -91,9 +91,7 @@ where
|
||||||
let handling_timeout = self.globals.proxy_config.connection_handling_timeout;
|
let handling_timeout = self.globals.proxy_config.connection_handling_timeout;
|
||||||
|
|
||||||
self.globals.runtime_handle.clone().spawn(async move {
|
self.globals.runtime_handle.clone().spawn(async move {
|
||||||
timeout(
|
let fut = server_clone.serve_connection_with_upgrades(
|
||||||
handling_timeout,
|
|
||||||
server_clone.serve_connection_with_upgrades(
|
|
||||||
stream,
|
stream,
|
||||||
service_fn(move |req: Request<Incoming>| {
|
service_fn(move |req: Request<Incoming>| {
|
||||||
serve_request(
|
serve_request(
|
||||||
|
|
@ -105,10 +103,13 @@ where
|
||||||
tls_server_name.clone(),
|
tls_server_name.clone(),
|
||||||
)
|
)
|
||||||
}),
|
}),
|
||||||
),
|
);
|
||||||
)
|
|
||||||
.await
|
if let Some(handling_timeout) = handling_timeout {
|
||||||
.ok();
|
timeout(handling_timeout, fut).await.ok();
|
||||||
|
} else {
|
||||||
|
fut.await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
request_count.decrement();
|
request_count.decrement();
|
||||||
debug!("Request processed: current # {}", request_count.current());
|
debug!("Request processed: current # {}", request_count.current());
|
||||||
|
|
|
||||||
|
|
@ -13,5 +13,5 @@ publish = false
|
||||||
bytes = { version = "1", default-features = false }
|
bytes = { version = "1", default-features = false }
|
||||||
futures = { version = "0.3", default-features = false }
|
futures = { version = "0.3", default-features = false }
|
||||||
h3 = { path = "../h3/h3/" }
|
h3 = { path = "../h3/h3/" }
|
||||||
s2n-quic = "1.32.0"
|
s2n-quic = "1.33.0"
|
||||||
s2n-quic-core = "0.32.0"
|
s2n-quic-core = "0.33.0"
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue