wip: refactor: reconsider timeouts of connections

This commit is contained in:
Jun Kurihara 2023-12-11 18:23:08 +09:00
commit d526ce6cb4
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
8 changed files with 35 additions and 39 deletions

View file

@ -4,8 +4,8 @@ pub const RESPONSE_HEADER_SERVER: &str = "rpxy";
pub const TCP_LISTEN_BACKLOG: u32 = 1024; pub const TCP_LISTEN_BACKLOG: u32 = 1024;
// pub const HTTP_LISTEN_PORT: u16 = 8080; // pub const HTTP_LISTEN_PORT: u16 = 8080;
// pub const HTTPS_LISTEN_PORT: u16 = 8443; // pub const HTTPS_LISTEN_PORT: u16 = 8443;
pub const PROXY_TIMEOUT_SEC: u64 = 60; pub const PROXY_IDLE_TIMEOUT_SEC: u64 = 20;
pub const UPSTREAM_TIMEOUT_SEC: u64 = 60; pub const UPSTREAM_IDLE_TIMEOUT_SEC: u64 = 20;
pub const TLS_HANDSHAKE_TIMEOUT_SEC: u64 = 15; // default as with firefox browser pub const TLS_HANDSHAKE_TIMEOUT_SEC: u64 = 15; // default as with firefox browser
pub const MAX_CLIENTS: usize = 512; pub const MAX_CLIENTS: usize = 512;
pub const MAX_CONCURRENT_STREAMS: u32 = 64; pub const MAX_CONCURRENT_STREAMS: u32 = 64;

View file

@ -8,6 +8,7 @@ use crate::{
log::*, log::*,
}; };
use async_trait::async_trait; use async_trait::async_trait;
use chrono::Duration;
use http::{Request, Response, Version}; use http::{Request, Response, Version};
use hyper::body::{Body, Incoming}; use hyper::body::{Body, Incoming};
use hyper_util::client::legacy::{ use hyper_util::client::legacy::{
@ -184,6 +185,7 @@ where
let mut http = HttpConnector::new(); let mut http = HttpConnector::new();
http.enforce_http(false); http.enforce_http(false);
http.set_reuse_address(true); http.set_reuse_address(true);
http.set_keepalive(Some(_globals.proxy_config.upstream_idle_timeout));
hyper_tls::HttpsConnector::from((http, tls.into())) hyper_tls::HttpsConnector::from((http, tls.into()))
}) })
}; };

View file

@ -33,8 +33,10 @@ pub struct ProxyConfig {
/// tcp listen backlog /// tcp listen backlog
pub tcp_listen_backlog: u32, pub tcp_listen_backlog: u32,
pub proxy_timeout: Duration, // when serving requests at Proxy /// Idle timeout as an HTTP server, used as the keep alive interval and timeout for reading request header
pub upstream_timeout: Duration, // when serving requests at Handler pub proxy_idle_timeout: Duration,
/// Idle timeout as an HTTP client, used as the keep alive interval for upstream connections
pub upstream_idle_timeout: Duration,
pub max_clients: usize, // when serving requests pub max_clients: usize, // when serving requests
pub max_concurrent_streams: u32, // when instantiate server pub max_concurrent_streams: u32, // when instantiate server
@ -80,8 +82,8 @@ impl Default for ProxyConfig {
tcp_listen_backlog: TCP_LISTEN_BACKLOG, tcp_listen_backlog: TCP_LISTEN_BACKLOG,
// TODO: Reconsider each timeout values // TODO: Reconsider each timeout values
proxy_timeout: Duration::from_secs(PROXY_TIMEOUT_SEC), proxy_idle_timeout: Duration::from_secs(PROXY_IDLE_TIMEOUT_SEC),
upstream_timeout: Duration::from_secs(UPSTREAM_TIMEOUT_SEC), upstream_idle_timeout: Duration::from_secs(UPSTREAM_IDLE_TIMEOUT_SEC),
max_clients: MAX_CLIENTS, max_clients: MAX_CLIENTS,
max_concurrent_streams: MAX_CONCURRENT_STREAMS, max_concurrent_streams: MAX_CONCURRENT_STREAMS,

View file

@ -19,7 +19,7 @@ use derive_builder::Builder;
use http::{Request, Response, StatusCode}; use http::{Request, Response, StatusCode};
use hyper_util::{client::legacy::connect::Connect, rt::TokioIo}; use hyper_util::{client::legacy::connect::Connect, rt::TokioIo};
use std::{net::SocketAddr, sync::Arc}; use std::{net::SocketAddr, sync::Arc};
use tokio::{io::copy_bidirectional, time::timeout}; use tokio::io::copy_bidirectional;
#[allow(dead_code)] #[allow(dead_code)]
#[derive(Debug)] #[derive(Debug)]
@ -172,15 +172,10 @@ where
////////////// //////////////
// Forward request to a chosen backend // Forward request to a chosen backend
let mut res_backend = { let mut res_backend = match self.forwarder.request(req).await {
let Ok(result) = timeout(self.globals.proxy_config.upstream_timeout, self.forwarder.request(req)).await else { Ok(v) => v,
return Err(HttpError::TimeoutUpstreamRequest); Err(e) => {
}; return Err(HttpError::FailedToGetResponseFromBackend(e.to_string()));
match result {
Ok(res) => res,
Err(e) => {
return Err(HttpError::FailedToGetResponseFromBackend(e.to_string()));
}
} }
}; };
////////////// //////////////

View file

@ -22,8 +22,6 @@ pub enum HttpError {
NoUpstreamCandidates, NoUpstreamCandidates,
#[error("Failed to generate upstream request for backend application: {0}")] #[error("Failed to generate upstream request for backend application: {0}")]
FailedToGenerateUpstreamRequest(String), FailedToGenerateUpstreamRequest(String),
#[error("Timeout in upstream request")]
TimeoutUpstreamRequest,
#[error("Failed to get response from backend: {0}")] #[error("Failed to get response from backend: {0}")]
FailedToGetResponseFromBackend(String), FailedToGetResponseFromBackend(String),
@ -53,7 +51,6 @@ impl From<HttpError> for StatusCode {
HttpError::FailedToRedirect(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::FailedToRedirect(_) => StatusCode::INTERNAL_SERVER_ERROR,
HttpError::NoUpstreamCandidates => StatusCode::NOT_FOUND, HttpError::NoUpstreamCandidates => StatusCode::NOT_FOUND,
HttpError::FailedToGenerateUpstreamRequest(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::FailedToGenerateUpstreamRequest(_) => StatusCode::INTERNAL_SERVER_ERROR,
HttpError::TimeoutUpstreamRequest => StatusCode::GATEWAY_TIMEOUT,
HttpError::FailedToAddSetCookeInResponse(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::FailedToAddSetCookeInResponse(_) => StatusCode::INTERNAL_SERVER_ERROR,
HttpError::FailedToGenerateDownstreamResponse(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::FailedToGenerateDownstreamResponse(_) => StatusCode::INTERNAL_SERVER_ERROR,
HttpError::FailedToUpgrade(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::FailedToUpgrade(_) => StatusCode::INTERNAL_SERVER_ERROR,

View file

@ -19,9 +19,11 @@ pub(crate) fn connection_builder(globals: &Arc<Globals>) -> Arc<ConnectionBuilde
http_server http_server
.http1() .http1()
.keep_alive(globals.proxy_config.keepalive) .keep_alive(globals.proxy_config.keepalive)
.header_read_timeout(globals.proxy_config.proxy_idle_timeout)
.pipeline_flush(true); .pipeline_flush(true);
http_server http_server
.http2() .http2()
.keep_alive_interval(Some(globals.proxy_config.proxy_idle_timeout))
.max_concurrent_streams(globals.proxy_config.max_concurrent_streams); .max_concurrent_streams(globals.proxy_config.max_concurrent_streams);
Arc::new(http_server) Arc::new(http_server)
} }

View file

@ -10,8 +10,7 @@ use bytes::{Buf, Bytes};
use http::{Request, Response}; use http::{Request, Response};
use http_body_util::BodyExt; use http_body_util::BodyExt;
use hyper_util::client::legacy::connect::Connect; use hyper_util::client::legacy::connect::Connect;
use std::{net::SocketAddr, time::Duration}; use std::net::SocketAddr;
use tokio::time::timeout;
#[cfg(feature = "http3-quinn")] #[cfg(feature = "http3-quinn")]
use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream}; use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream};
@ -71,13 +70,11 @@ where
let self_inner = self.clone(); let self_inner = self.clone();
let tls_server_name_inner = tls_server_name.clone(); let tls_server_name_inner = tls_server_name.clone();
self.globals.runtime_handle.spawn(async move { self.globals.runtime_handle.spawn(async move {
if let Err(e) = timeout( if let Err(e) = self_inner
self_inner.globals.proxy_config.proxy_timeout + Duration::from_secs(1), // timeout per stream are considered as same as one in http2 .h3_serve_stream(req, stream, client_addr, tls_server_name_inner)
self_inner.h3_serve_stream(req, stream, client_addr, tls_server_name_inner), .await
)
.await
{ {
error!("HTTP/3 failed to process stream: {}", e); warn!("HTTP/3 error on serve stream: {}", e);
} }
request_count.decrement(); request_count.decrement();
debug!("Request processed: current # {}", request_count.current()); debug!("Request processed: current # {}", request_count.current());

View file

@ -86,13 +86,11 @@ where
let server_clone = self.connection_builder.clone(); let server_clone = self.connection_builder.clone();
let message_handler_clone = self.message_handler.clone(); let message_handler_clone = self.message_handler.clone();
let timeout_sec = self.globals.proxy_config.proxy_timeout;
let tls_enabled = self.tls_enabled; let tls_enabled = self.tls_enabled;
let listening_on = self.listening_on; let listening_on = self.listening_on;
self.globals.runtime_handle.clone().spawn(async move { self.globals.runtime_handle.clone().spawn(async move {
timeout( server_clone
timeout_sec + Duration::from_secs(1), .serve_connection_with_upgrades(
server_clone.serve_connection_with_upgrades(
stream, stream,
service_fn(move |req: Request<Incoming>| { service_fn(move |req: Request<Incoming>| {
serve_request( serve_request(
@ -104,10 +102,9 @@ where
tls_server_name.clone(), tls_server_name.clone(),
) )
}), }),
), )
) .await
.await .ok();
.ok();
request_count.decrement(); request_count.decrement();
debug!("Request processed: current # {}", request_count.current()); debug!("Request processed: current # {}", request_count.current());
@ -201,8 +198,7 @@ where
return Err(RpxyError::FailedToTlsHandshake(e.to_string())); return Err(RpxyError::FailedToTlsHandshake(e.to_string()));
} }
}; };
self_inner.serve_connection(stream, client_addr, server_name); Ok((stream, client_addr, server_name))
Ok(()) as RpxyResult<()>
}; };
self.globals.runtime_handle.spawn( async move { self.globals.runtime_handle.spawn( async move {
@ -214,8 +210,13 @@ where
error!("Timeout to handshake TLS"); error!("Timeout to handshake TLS");
return; return;
}; };
if let Err(e) = v { match v {
error!("{}", e); Ok((stream, client_addr, server_name)) => {
self_inner.serve_connection(stream, client_addr, server_name);
}
Err(e) => {
error!("{}", e);
}
} }
}); });
} }