From d526ce6cb478fd89c5b76759053ecb4a3799b682 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Mon, 11 Dec 2023 18:23:08 +0900 Subject: [PATCH] wip: refactor: reconsider timeouts of connections --- rpxy-lib/src/constants.rs | 4 ++-- rpxy-lib/src/forwarder/client.rs | 2 ++ rpxy-lib/src/globals.rs | 10 ++++---- rpxy-lib/src/message_handler/handler_main.rs | 15 ++++-------- rpxy-lib/src/message_handler/http_result.rs | 3 --- rpxy-lib/src/proxy/mod.rs | 2 ++ rpxy-lib/src/proxy/proxy_h3.rs | 13 ++++------ rpxy-lib/src/proxy/proxy_main.rs | 25 ++++++++++---------- 8 files changed, 35 insertions(+), 39 deletions(-) diff --git a/rpxy-lib/src/constants.rs b/rpxy-lib/src/constants.rs index ebec1fc..acc9381 100644 --- a/rpxy-lib/src/constants.rs +++ b/rpxy-lib/src/constants.rs @@ -4,8 +4,8 @@ pub const RESPONSE_HEADER_SERVER: &str = "rpxy"; pub const TCP_LISTEN_BACKLOG: u32 = 1024; // pub const HTTP_LISTEN_PORT: u16 = 8080; // pub const HTTPS_LISTEN_PORT: u16 = 8443; -pub const PROXY_TIMEOUT_SEC: u64 = 60; -pub const UPSTREAM_TIMEOUT_SEC: u64 = 60; +pub const PROXY_IDLE_TIMEOUT_SEC: u64 = 20; +pub const UPSTREAM_IDLE_TIMEOUT_SEC: u64 = 20; pub const TLS_HANDSHAKE_TIMEOUT_SEC: u64 = 15; // default as with firefox browser pub const MAX_CLIENTS: usize = 512; pub const MAX_CONCURRENT_STREAMS: u32 = 64; diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index c6c6218..8b86f9f 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -8,6 +8,7 @@ use crate::{ log::*, }; use async_trait::async_trait; +use chrono::Duration; use http::{Request, Response, Version}; use hyper::body::{Body, Incoming}; use hyper_util::client::legacy::{ @@ -184,6 +185,7 @@ where let mut http = HttpConnector::new(); http.enforce_http(false); http.set_reuse_address(true); + http.set_keepalive(Some(_globals.proxy_config.upstream_idle_timeout)); hyper_tls::HttpsConnector::from((http, tls.into())) }) }; diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index 86fdc46..9cd62b3 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -33,8 +33,10 @@ pub struct ProxyConfig { /// tcp listen backlog pub tcp_listen_backlog: u32, - pub proxy_timeout: Duration, // when serving requests at Proxy - pub upstream_timeout: Duration, // when serving requests at Handler + /// Idle timeout as an HTTP server, used as the keep alive interval and timeout for reading request header + pub proxy_idle_timeout: Duration, + /// Idle timeout as an HTTP client, used as the keep alive interval for upstream connections + pub upstream_idle_timeout: Duration, pub max_clients: usize, // when serving requests pub max_concurrent_streams: u32, // when instantiate server @@ -80,8 +82,8 @@ impl Default for ProxyConfig { tcp_listen_backlog: TCP_LISTEN_BACKLOG, // TODO: Reconsider each timeout values - proxy_timeout: Duration::from_secs(PROXY_TIMEOUT_SEC), - upstream_timeout: Duration::from_secs(UPSTREAM_TIMEOUT_SEC), + proxy_idle_timeout: Duration::from_secs(PROXY_IDLE_TIMEOUT_SEC), + upstream_idle_timeout: Duration::from_secs(UPSTREAM_IDLE_TIMEOUT_SEC), max_clients: MAX_CLIENTS, max_concurrent_streams: MAX_CONCURRENT_STREAMS, diff --git a/rpxy-lib/src/message_handler/handler_main.rs b/rpxy-lib/src/message_handler/handler_main.rs index a9fae01..251411b 100644 --- a/rpxy-lib/src/message_handler/handler_main.rs +++ b/rpxy-lib/src/message_handler/handler_main.rs @@ -19,7 +19,7 @@ use derive_builder::Builder; use http::{Request, Response, StatusCode}; use hyper_util::{client::legacy::connect::Connect, rt::TokioIo}; use std::{net::SocketAddr, sync::Arc}; -use tokio::{io::copy_bidirectional, time::timeout}; +use tokio::io::copy_bidirectional; #[allow(dead_code)] #[derive(Debug)] @@ -172,15 +172,10 @@ where ////////////// // Forward request to a chosen backend - let mut res_backend = { - let Ok(result) = timeout(self.globals.proxy_config.upstream_timeout, self.forwarder.request(req)).await else { - return Err(HttpError::TimeoutUpstreamRequest); - }; - match result { - Ok(res) => res, - Err(e) => { - return Err(HttpError::FailedToGetResponseFromBackend(e.to_string())); - } + let mut res_backend = match self.forwarder.request(req).await { + Ok(v) => v, + Err(e) => { + return Err(HttpError::FailedToGetResponseFromBackend(e.to_string())); } }; ////////////// diff --git a/rpxy-lib/src/message_handler/http_result.rs b/rpxy-lib/src/message_handler/http_result.rs index 857ab55..ec48200 100644 --- a/rpxy-lib/src/message_handler/http_result.rs +++ b/rpxy-lib/src/message_handler/http_result.rs @@ -22,8 +22,6 @@ pub enum HttpError { NoUpstreamCandidates, #[error("Failed to generate upstream request for backend application: {0}")] FailedToGenerateUpstreamRequest(String), - #[error("Timeout in upstream request")] - TimeoutUpstreamRequest, #[error("Failed to get response from backend: {0}")] FailedToGetResponseFromBackend(String), @@ -53,7 +51,6 @@ impl From for StatusCode { HttpError::FailedToRedirect(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::NoUpstreamCandidates => StatusCode::NOT_FOUND, HttpError::FailedToGenerateUpstreamRequest(_) => StatusCode::INTERNAL_SERVER_ERROR, - HttpError::TimeoutUpstreamRequest => StatusCode::GATEWAY_TIMEOUT, HttpError::FailedToAddSetCookeInResponse(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::FailedToGenerateDownstreamResponse(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::FailedToUpgrade(_) => StatusCode::INTERNAL_SERVER_ERROR, diff --git a/rpxy-lib/src/proxy/mod.rs b/rpxy-lib/src/proxy/mod.rs index 389df43..d1aa5c3 100644 --- a/rpxy-lib/src/proxy/mod.rs +++ b/rpxy-lib/src/proxy/mod.rs @@ -19,9 +19,11 @@ pub(crate) fn connection_builder(globals: &Arc) -> Arc| { serve_request( @@ -104,10 +102,9 @@ where tls_server_name.clone(), ) }), - ), - ) - .await - .ok(); + ) + .await + .ok(); request_count.decrement(); debug!("Request processed: current # {}", request_count.current()); @@ -201,8 +198,7 @@ where return Err(RpxyError::FailedToTlsHandshake(e.to_string())); } }; - self_inner.serve_connection(stream, client_addr, server_name); - Ok(()) as RpxyResult<()> + Ok((stream, client_addr, server_name)) }; self.globals.runtime_handle.spawn( async move { @@ -214,8 +210,13 @@ where error!("Timeout to handshake TLS"); return; }; - if let Err(e) = v { - error!("{}", e); + match v { + Ok((stream, client_addr, server_name)) => { + self_inner.serve_connection(stream, client_addr, server_name); + } + Err(e) => { + error!("{}", e); + } } }); }