From f5197d08692e59795f5ecf2f00bc90c109ce60c7 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Sat, 9 Dec 2023 03:34:00 +0900 Subject: [PATCH 01/12] wip: refactoring the cache logic --- rpxy-lib/Cargo.toml | 8 +- rpxy-lib/src/error.rs | 9 ++ rpxy-lib/src/forwarder/cache.rs | 161 ++++++++++++++++++++++++++ rpxy-lib/src/forwarder/client.rs | 102 +++++++++++++--- rpxy-lib/src/forwarder/mod.rs | 1 + submodules/rusty-http-cache-semantics | 2 +- 6 files changed, 261 insertions(+), 22 deletions(-) create mode 100644 rpxy-lib/src/forwarder/cache.rs diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index 92449ab..fe715e0 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -24,7 +24,7 @@ http3-s2n = [ sticky-cookie = ["base64", "sha2", "chrono"] native-tls-backend = ["hyper-tls"] rustls-backend = [] -cache = [] #"http-cache-semantics", "lru"] +cache = ["http-cache-semantics", "lru"] native-roots = [] #"hyper-rustls/native-tokio"] [dependencies] @@ -66,7 +66,7 @@ hyper-tls = { version = "0.6.0", features = ["alpn"], optional = true } # tls and cert management for server hot_reload = "0.1.4" -rustls = { version = "0.21.9", default-features = false } +rustls = { version = "0.21.10", default-features = false } tokio-rustls = { version = "0.24.1", features = ["early-data"] } webpki = "0.22.4" x509-parser = "0.15.1" @@ -88,8 +88,8 @@ s2n-quic-rustls = { version = "0.32.0", optional = true } socket2 = { version = "0.5.5", features = ["all"], optional = true } # # cache -# http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } -# lru = { version = "0.12.1", optional = true } +http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } +lru = { version = "0.12.1", optional = true } # cookie handling for sticky cookie chrono = { version = "0.4.31", default-features = false, features = [ diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 843845d..4cbc463 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -84,6 +84,15 @@ pub enum RpxyError { #[error("Failed to fetch from upstream: {0}")] FailedToFetchFromUpstream(String), + // Cache errors, + #[cfg(feature = "cache")] + #[error("Invalid null request and/or response")] + NullRequestOrResponse, + + #[cfg(feature = "cache")] + #[error("Failed to write byte buffer")] + FailedToWriteByteBufferForCache, + // Upstream connection setting errors #[error("Unsupported upstream option")] UnsupportedUpstreamOption, diff --git a/rpxy-lib/src/forwarder/cache.rs b/rpxy-lib/src/forwarder/cache.rs new file mode 100644 index 0000000..73bed7b --- /dev/null +++ b/rpxy-lib/src/forwarder/cache.rs @@ -0,0 +1,161 @@ +use crate::{error::*, globals::Globals, log::*}; +use http::{Request, Response}; +use http_cache_semantics::CachePolicy; +use lru::LruCache; +use std::{ + path::{Path, PathBuf}, + sync::{atomic::AtomicUsize, Arc, Mutex}, +}; +use tokio::{fs, sync::RwLock}; + +/* ---------------------------------------------- */ +#[derive(Clone, Debug)] +pub struct RpxyCache { + /// Lru cache storing http message caching policy + inner: LruCacheManager, + /// Managing cache file objects through RwLock's lock mechanism for file lock + file_store: FileStore, + /// Async runtime + runtime_handle: tokio::runtime::Handle, + /// Maximum size of each cache file object + max_each_size: usize, + /// Maximum size of cache object on memory + max_each_size_on_memory: usize, +} + +impl RpxyCache { + /// Generate cache storage + pub async fn new(globals: &Globals) -> Option { + if !globals.proxy_config.cache_enabled { + return None; + } + let path = globals.proxy_config.cache_dir.as_ref().unwrap(); + let file_store = FileStore::new(path, &globals.runtime_handle).await; + let inner = LruCacheManager::new(globals.proxy_config.cache_max_entry); + + let max_each_size = globals.proxy_config.cache_max_each_size; + let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory; + if max_each_size < max_each_size_on_memory { + warn!( + "Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache" + ); + max_each_size_on_memory = max_each_size; + } + + Some(Self { + file_store, + inner, + runtime_handle: globals.runtime_handle.clone(), + max_each_size, + max_each_size_on_memory, + }) + } +} + +/* ---------------------------------------------- */ +#[derive(Debug, Clone)] +/// Cache file manager outer that is responsible to handle `RwLock` +struct FileStore { + inner: Arc>, +} +impl FileStore { + /// Build manager + async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { + Self { + inner: Arc::new(RwLock::new(FileStoreInner::new(path, runtime_handle).await)), + } + } +} + +#[derive(Debug)] +/// Manager inner for cache on file system +struct FileStoreInner { + /// Directory of temporary files + cache_dir: PathBuf, + /// Counter of current cached files + cnt: usize, + /// Async runtime + runtime_handle: tokio::runtime::Handle, +} + +impl FileStoreInner { + /// Build new cache file manager. + /// This first creates cache file dir if not exists, and cleans up the file inside the directory. + /// TODO: Persistent cache is really difficult. `sqlite` or something like that is needed. + async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { + let path_buf = path.as_ref().to_path_buf(); + if let Err(e) = fs::remove_dir_all(path).await { + warn!("Failed to clean up the cache dir: {e}"); + }; + fs::create_dir_all(&path_buf).await.unwrap(); + Self { + cache_dir: path_buf.clone(), + cnt: 0, + runtime_handle: runtime_handle.clone(), + } + } +} + +/* ---------------------------------------------- */ + +#[derive(Clone, Debug)] +/// Cache target in hybrid manner of on-memory and file system +pub enum CacheFileOrOnMemory { + /// Pointer to the temporary cache file + File(PathBuf), + /// Cached body itself + OnMemory(Vec), +} + +#[derive(Clone, Debug)] +/// Cache object definition +struct CacheObject { + /// Cache policy to determine if the stored cache can be used as a response to a new incoming request + pub policy: CachePolicy, + /// Cache target: on-memory object or temporary file + pub target: CacheFileOrOnMemory, + /// SHA256 hash of target to strongly bind the cache metadata (this object) and file target + pub hash: Vec, +} + +/* ---------------------------------------------- */ +#[derive(Debug, Clone)] +/// Lru cache manager that is responsible to handle `Mutex` as an outer of `LruCache` +struct LruCacheManager { + inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + cnt: Arc, +} + +impl LruCacheManager { + /// Build LruCache + fn new(cache_max_entry: usize) -> Self { + Self { + inner: Arc::new(Mutex::new(LruCache::new( + std::num::NonZeroUsize::new(cache_max_entry).unwrap(), + ))), + cnt: Arc::new(AtomicUsize::default()), + } + } +} + +/* ---------------------------------------------- */ +pub fn get_policy_if_cacheable( + req: Option<&Request>, + res: Option<&Response>, +) -> RpxyResult> +// where +// B1: core::fmt::Debug, +{ + // deduce cache policy from req and res + let (Some(req), Some(res)) = (req, res) else { + return Err(RpxyError::NullRequestOrResponse); + }; + + let new_policy = CachePolicy::new(req, res); + if new_policy.is_storable() { + // debug!("Response is cacheable: {:?}\n{:?}", req, res.headers()); + Ok(Some(new_policy)) + } else { + Ok(None) + } +} diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index 22c2320..820523e 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -9,13 +9,20 @@ use crate::{ }; use async_trait::async_trait; use http::{Request, Response, Version}; -use hyper::body::Body; +use hyper::body::{Body, Incoming}; use hyper_util::client::legacy::{ connect::{Connect, HttpConnector}, Client, }; use std::sync::Arc; +#[cfg(feature = "cache")] +use super::cache::{get_policy_if_cacheable, RpxyCache}; +#[cfg(feature = "cache")] +use crate::hyper_ext::body::{full, BoxBody}; +#[cfg(feature = "cache")] +use http_body_util::BodyExt; + #[async_trait] /// Definition of the forwarder that simply forward requests from downstream client to upstream app servers. pub trait ForwardRequest { @@ -25,27 +32,71 @@ pub trait ForwardRequest { /// Forwarder http client struct responsible to cache handling pub struct Forwarder { - // #[cfg(feature = "cache")] - // cache: Option, + #[cfg(feature = "cache")] + cache: Option, inner: Client, inner_h2: Client, // `h2c` or http/2-only client is defined separately } #[async_trait] -impl ForwardRequest> for Forwarder +impl ForwardRequest> for Forwarder where C: Send + Sync + Connect + Clone + 'static, B1: Body + Send + Sync + Unpin + 'static, ::Data: Send, ::Error: Into>, - B2: Body, { type Error = RpxyError; - async fn request(&self, req: Request) -> Result>, Self::Error> { + async fn request(&self, req: Request) -> Result>, Self::Error> { // TODO: cache handling + #[cfg(feature = "cache")] + { + let mut synth_req = None; + if self.cache.is_some() { + // if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { + // // if found, return it as response. + // info!("Cache hit - Return from cache"); + // return Ok(cached_response); + // }; - self.request_directly(req).await + // Synthetic request copy used just for caching (cannot clone request object...) + synth_req = Some(build_synth_req_for_cache(&req)); + } + let res = self.request_directly(req).await; + + if self.cache.is_none() { + return res.map(wrap_incoming_body_response::); + } + + // check cacheability and store it if cacheable + let Ok(Some(cache_policy)) = get_policy_if_cacheable(synth_req.as_ref(), res.as_ref().ok()) else { + return res.map(wrap_incoming_body_response::); + }; + let (parts, body) = res.unwrap().into_parts(); + let Ok(bytes) = body.collect().await.map(|v| v.to_bytes()) else { + return Err(RpxyError::FailedToWriteByteBufferForCache); + }; + + // if let Err(cache_err) = self + // .cache + // .as_ref() + // .unwrap() + // .put(synth_req.unwrap().uri(), &bytes, &cache_policy) + // .await + // { + // error!("{:?}", cache_err); + // }; + + // response with cached body + Ok(Response::from_parts(parts, IncomingOr::Right(full(bytes)))) + } + + // No cache handling + #[cfg(not(feature = "cache"))] + { + self.request_directly(req).await.map(wrap_incoming_body_response::) + } } } @@ -56,13 +107,15 @@ where ::Data: Send, ::Error: Into>, { - async fn request_directly(&self, req: Request) -> RpxyResult>> { + async fn request_directly(&self, req: Request) -> RpxyResult> { + // TODO: This 'match' condition is always evaluated at every 'request' invocation. So, it is inefficient. + // Needs to be reconsidered. Currently, this is a kind of work around. + // This possibly relates to https://github.com/hyperium/hyper/issues/2417. match req.version() { Version::HTTP_2 => self.inner_h2.request(req).await, // handles `h2c` requests _ => self.inner.request(req).await, } .map_err(|e| RpxyError::FailedToFetchFromUpstream(e.to_string())) - .map(wrap_incoming_body_response::) } } @@ -90,7 +143,9 @@ Please enable native-tls-backend or rustls-backend feature to enable TLS support Ok(Self { inner, - inner_h2: inner.clone(), + inner_h2, + #[cfg(feature = "cache")] + cache: RpxyCache::new(_globals).await, }) } } @@ -130,13 +185,12 @@ where .http2_only(true) .build::<_, B1>(connector_h2); - // #[cfg(feature = "cache")] - // { - // let cache = RpxyCache::new(_globals).await; - // Self { inner, inner_h2, cache } - // } - // #[cfg(not(feature = "cache"))] - Ok(Self { inner, inner_h2 }) + Ok(Self { + inner, + inner_h2, + #[cfg(feature = "cache")] + cache: RpxyCache::new(_globals).await, + }) } } @@ -172,3 +226,17 @@ where // let inner_h2 = Client::builder().http2_only(true).build::<_, Body>(connector_h2); } } + +#[cfg(feature = "cache")] +/// Build synthetic request to cache +fn build_synth_req_for_cache(req: &Request) -> Request<()> { + let mut builder = Request::builder() + .method(req.method()) + .uri(req.uri()) + .version(req.version()); + // TODO: omit extensions. is this approach correct? + for (header_key, header_value) in req.headers() { + builder = builder.header(header_key, header_value); + } + builder.body(()).unwrap() +} diff --git a/rpxy-lib/src/forwarder/mod.rs b/rpxy-lib/src/forwarder/mod.rs index 13d37eb..e901c7d 100644 --- a/rpxy-lib/src/forwarder/mod.rs +++ b/rpxy-lib/src/forwarder/mod.rs @@ -1,3 +1,4 @@ +mod cache; mod client; use crate::hyper_ext::body::{IncomingLike, IncomingOr}; diff --git a/submodules/rusty-http-cache-semantics b/submodules/rusty-http-cache-semantics index 3cd0917..88d23c2 160000 --- a/submodules/rusty-http-cache-semantics +++ b/submodules/rusty-http-cache-semantics @@ -1 +1 @@ -Subproject commit 3cd09170305753309d86e88b9427827cca0de0dd +Subproject commit 88d23c2f5a3ac36295dff4a804968c43932ba46b From cdcb1b13dacc88fb004d1166070f05f087434fed Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Sat, 9 Dec 2023 03:41:32 +0900 Subject: [PATCH 02/12] wip: chore: fix bug for unused --- rpxy-lib/src/forwarder/client.rs | 11 +++++++---- rpxy-lib/src/forwarder/mod.rs | 1 + rpxy-lib/src/hyper_ext/mod.rs | 1 + 3 files changed, 9 insertions(+), 4 deletions(-) diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index 820523e..9aab75a 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -2,7 +2,7 @@ use crate::{ error::{RpxyError, RpxyResult}, globals::Globals, hyper_ext::{ - body::{wrap_incoming_body_response, IncomingOr}, + body::{wrap_incoming_body_response, BoxBody, IncomingOr}, rt::LocalExecutor, }, log::*, @@ -19,7 +19,7 @@ use std::sync::Arc; #[cfg(feature = "cache")] use super::cache::{get_policy_if_cacheable, RpxyCache}; #[cfg(feature = "cache")] -use crate::hyper_ext::body::{full, BoxBody}; +use crate::hyper_ext::body::{full, wrap_synthetic_body_response}; #[cfg(feature = "cache")] use http_body_util::BodyExt; @@ -89,13 +89,16 @@ where // }; // response with cached body - Ok(Response::from_parts(parts, IncomingOr::Right(full(bytes)))) + Ok(wrap_synthetic_body_response(Response::from_parts(parts, full(bytes)))) } // No cache handling #[cfg(not(feature = "cache"))] { - self.request_directly(req).await.map(wrap_incoming_body_response::) + self + .request_directly(req) + .await + .map(wrap_incoming_body_response::) } } } diff --git a/rpxy-lib/src/forwarder/mod.rs b/rpxy-lib/src/forwarder/mod.rs index e901c7d..286cb40 100644 --- a/rpxy-lib/src/forwarder/mod.rs +++ b/rpxy-lib/src/forwarder/mod.rs @@ -1,3 +1,4 @@ +#[cfg(feature = "cache")] mod cache; mod client; diff --git a/rpxy-lib/src/hyper_ext/mod.rs b/rpxy-lib/src/hyper_ext/mod.rs index e1b5ae8..e6c81e7 100644 --- a/rpxy-lib/src/hyper_ext/mod.rs +++ b/rpxy-lib/src/hyper_ext/mod.rs @@ -8,6 +8,7 @@ pub(crate) mod rt { } pub(crate) mod body { pub(crate) use super::body_incoming_like::IncomingLike; + #[allow(unused)] pub(crate) use super::body_type::{ empty, full, wrap_incoming_body_response, wrap_synthetic_body_response, BoxBody, IncomingOr, }; From d473b44556ed051ae812aa81de79a3cad2872a92 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Sat, 9 Dec 2023 10:17:31 +0900 Subject: [PATCH 03/12] add comment --- rpxy-lib/src/forwarder/client.rs | 5 +++++ rpxy-lib/src/proxy/proxy_h3.rs | 2 +- 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index 9aab75a..5718f2e 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -74,10 +74,15 @@ where return res.map(wrap_incoming_body_response::); }; let (parts, body) = res.unwrap().into_parts(); + let Ok(bytes) = body.collect().await.map(|v| v.to_bytes()) else { return Err(RpxyError::FailedToWriteByteBufferForCache); }; + // TODO: this is inefficient. needs to be reconsidered to avoid unnecessary copy and should spawn async task to store cache. + // We may need to use the same logic as h3. + // Is bytes.clone() enough? + // if let Err(cache_err) = self // .cache // .as_ref() diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index 8abb710..d194b1f 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -122,7 +122,7 @@ where size += body.remaining(); if size > max_body_size { error!( - "Exceeds max request body size for HTTP/3: received {}, maximum_allowd {}", + "Exceeds max request body size for HTTP/3: received {}, maximum_allowed {}", size, max_body_size ); return Err(RpxyError::H3TooLargeBody); From ed33c5d4f119b26fb75b2ea64ccff22ed3bf0913 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Sat, 9 Dec 2023 12:14:59 +0900 Subject: [PATCH 04/12] wip: implement on-memory cache as is --- rpxy-lib/Cargo.toml | 6 ++--- rpxy-lib/src/error.rs | 4 ++++ rpxy-lib/src/forwarder/cache.rs | 42 ++++++++++++++++++++++++++++++--- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index fe715e0..65c7c58 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -24,7 +24,7 @@ http3-s2n = [ sticky-cookie = ["base64", "sha2", "chrono"] native-tls-backend = ["hyper-tls"] rustls-backend = [] -cache = ["http-cache-semantics", "lru"] +cache = ["http-cache-semantics", "lru", "sha2", "base64"] native-roots = [] #"hyper-rustls/native-tokio"] [dependencies] @@ -87,9 +87,10 @@ s2n-quic-rustls = { version = "0.32.0", optional = true } # for UDP socket wit SO_REUSEADDR when h3 with quinn socket2 = { version = "0.5.5", features = ["all"], optional = true } -# # cache +# cache http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } lru = { version = "0.12.1", optional = true } +sha2 = { version = "0.10.8", default-features = false, optional = true } # cookie handling for sticky cookie chrono = { version = "0.4.31", default-features = false, features = [ @@ -98,7 +99,6 @@ chrono = { version = "0.4.31", default-features = false, features = [ "clock", ], optional = true } base64 = { version = "0.21.5", optional = true } -sha2 = { version = "0.10.8", default-features = false, optional = true } [dev-dependencies] diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 4cbc463..2763d1e 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -93,6 +93,10 @@ pub enum RpxyError { #[error("Failed to write byte buffer")] FailedToWriteByteBufferForCache, + #[cfg(feature = "cache")] + #[error("Failed to acquire mutex lock for cache")] + FailedToAcquiredMutexLockForCache, + // Upstream connection setting errors #[error("Unsupported upstream option")] UnsupportedUpstreamOption, diff --git a/rpxy-lib/src/forwarder/cache.rs b/rpxy-lib/src/forwarder/cache.rs index 73bed7b..ea29a41 100644 --- a/rpxy-lib/src/forwarder/cache.rs +++ b/rpxy-lib/src/forwarder/cache.rs @@ -4,14 +4,18 @@ use http_cache_semantics::CachePolicy; use lru::LruCache; use std::{ path::{Path, PathBuf}, - sync::{atomic::AtomicUsize, Arc, Mutex}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, }; use tokio::{fs, sync::RwLock}; /* ---------------------------------------------- */ #[derive(Clone, Debug)] +/// Cache main manager pub struct RpxyCache { - /// Lru cache storing http message caching policy + /// Inner lru cache manager storing http message caching policy inner: LruCacheManager, /// Managing cache file objects through RwLock's lock mechanism for file lock file_store: FileStore, @@ -122,7 +126,9 @@ struct CacheObject { #[derive(Debug, Clone)] /// Lru cache manager that is responsible to handle `Mutex` as an outer of `LruCache` struct LruCacheManager { + /// Inner lru cache manager main object inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + /// Counter of current cached object (total) cnt: Arc, } @@ -133,12 +139,42 @@ impl LruCacheManager { inner: Arc::new(Mutex::new(LruCache::new( std::num::NonZeroUsize::new(cache_max_entry).unwrap(), ))), - cnt: Arc::new(AtomicUsize::default()), + cnt: Default::default(), } } + + /// Count entries + fn count(&self) -> usize { + self.cnt.load(Ordering::Relaxed) + } + + /// Evict an entry + fn evict(&self, cache_key: &str) -> Option<(String, CacheObject)> { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked to evict a cache entry"); + return None; + }; + let res = lock.pop_entry(cache_key); + // This may be inconsistent with the actual number of entries + self.cnt.store(lock.len(), Ordering::Relaxed); + res + } + + /// Push an entry + fn push(&self, cache_key: &str, cache_object: CacheObject) -> RpxyResult> { + let Ok(mut lock) = self.inner.lock() else { + error!("Failed to acquire mutex lock for writing cache entry"); + return Err(RpxyError::FailedToAcquiredMutexLockForCache); + }; + let res = Ok(lock.push(cache_key.to_string(), cache_object)); + // This may be inconsistent with the actual number of entries + self.cnt.store(lock.len(), Ordering::Relaxed); + res + } } /* ---------------------------------------------- */ +/// Generate cache policy if the response is cacheable pub fn get_policy_if_cacheable( req: Option<&Request>, res: Option<&Response>, From cc48394e7302076aac1794fc180374850a2de643 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Mon, 11 Dec 2023 16:52:54 +0900 Subject: [PATCH 05/12] wip: feat: update h3 response reader to use async stream --- rpxy-lib/src/error.rs | 12 ++++ rpxy-lib/src/forwarder/cache.rs | 104 ++++++++++++++++++++++++++++++- rpxy-lib/src/forwarder/client.rs | 3 + rpxy-lib/src/proxy/proxy_h3.rs | 34 +++++++--- 4 files changed, 145 insertions(+), 8 deletions(-) diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 2763d1e..343cf04 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -97,6 +97,18 @@ pub enum RpxyError { #[error("Failed to acquire mutex lock for cache")] FailedToAcquiredMutexLockForCache, + #[cfg(feature = "cache")] + #[error("Failed to create file cache")] + FailedToCreateFileCache, + + #[cfg(feature = "cache")] + #[error("Failed to write file cache")] + FailedToWriteFileCache, + + #[cfg(feature = "cache")] + #[error("Failed to open cache file")] + FailedToOpenCacheFile, + // Upstream connection setting errors #[error("Unsupported upstream option")] UnsupportedUpstreamOption, diff --git a/rpxy-lib/src/forwarder/cache.rs b/rpxy-lib/src/forwarder/cache.rs index ea29a41..03755e6 100644 --- a/rpxy-lib/src/forwarder/cache.rs +++ b/rpxy-lib/src/forwarder/cache.rs @@ -1,15 +1,22 @@ use crate::{error::*, globals::Globals, log::*}; +use bytes::{Buf, Bytes, BytesMut}; use http::{Request, Response}; +use http_body_util::StreamBody; use http_cache_semantics::CachePolicy; use lru::LruCache; use std::{ + convert::Infallible, path::{Path, PathBuf}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, Mutex, }, }; -use tokio::{fs, sync::RwLock}; +use tokio::{ + fs::{self, File}, + io::{AsyncReadExt, AsyncWriteExt}, + sync::RwLock, +}; /* ---------------------------------------------- */ #[derive(Clone, Debug)] @@ -54,6 +61,14 @@ impl RpxyCache { max_each_size_on_memory, }) } + + /// Count cache entries + pub async fn count(&self) -> (usize, usize, usize) { + let total = self.inner.count(); + let file = self.file_store.count().await; + let on_memory = total - file; + (total, on_memory, file) + } } /* ---------------------------------------------- */ @@ -71,6 +86,32 @@ impl FileStore { } } +impl FileStore { + /// Count file cache entries + async fn count(&self) -> usize { + let inner = self.inner.read().await; + inner.cnt + } + /// Create a temporary file cache + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> RpxyResult { + let mut inner = self.inner.write().await; + inner.create(cache_filename, body_bytes).await + } + // /// Evict a temporary file cache + // async fn evict(&self, path: impl AsRef) { + // // Acquire the write lock + // let mut inner = self.inner.write().await; + // if let Err(e) = inner.remove(path).await { + // warn!("Eviction failed during file object removal: {:?}", e); + // }; + // } + // /// Read a temporary file cache + // async fn read(&self, path: impl AsRef) -> RpxyResult { + // let inner = self.inner.read().await; + // inner.read(&path).await + // } +} + #[derive(Debug)] /// Manager inner for cache on file system struct FileStoreInner { @@ -98,6 +139,67 @@ impl FileStoreInner { runtime_handle: runtime_handle.clone(), } } + + /// Create a new temporary file cache + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> RpxyResult { + let cache_filepath = self.cache_dir.join(cache_filename); + let Ok(mut file) = File::create(&cache_filepath).await else { + return Err(RpxyError::FailedToCreateFileCache); + }; + let mut bytes_clone = body_bytes.clone(); + while bytes_clone.has_remaining() { + if let Err(e) = file.write_buf(&mut bytes_clone).await { + error!("Failed to write file cache: {e}"); + return Err(RpxyError::FailedToWriteFileCache); + }; + } + self.cnt += 1; + Ok(CacheFileOrOnMemory::File(cache_filepath)) + } + + /// Retrieve a stored temporary file cache + async fn read(&self, path: impl AsRef) -> RpxyResult<()> { + let Ok(mut file) = File::open(&path).await else { + warn!("Cache file object cannot be opened"); + return Err(RpxyError::FailedToOpenCacheFile); + }; + + /* ----------------------------- */ + // PoC for streaming body + use futures::channel::mpsc; + let (tx, rx) = mpsc::unbounded::, Infallible>>(); + + // let (body_sender, res_body) = Body::channel(); + self.runtime_handle.spawn(async move { + // let mut sender = body_sender; + let mut buf = BytesMut::new(); + loop { + match file.read_buf(&mut buf).await { + Ok(0) => break, + Ok(_) => tx + .unbounded_send(Ok(hyper::body::Frame::data(buf.copy_to_bytes(buf.remaining())))) + .map_err(|e| anyhow::anyhow!("Failed to read cache file: {e}"))?, + //sender.send_data(buf.copy_to_bytes(buf.remaining())).await?, + Err(_) => break, + }; + } + Ok(()) as anyhow::Result<()> + }); + + let mut rx = http_body_util::StreamBody::new(rx); + // TODO: 結局incominglikeなbodystreamを定義することになる。これだったらh3と合わせて自分で定義した方が良さそう。 + // typeが長すぎるのでwrapperを作った方がいい。 + // let response = Response::builder() + // .status(200) + // .header("content-type", "application/octet-stream") + // .body(rx) + // .unwrap(); + + todo!() + /* ----------------------------- */ + + // Ok(res_body) + } } /* ---------------------------------------------- */ diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index 5718f2e..c6c6218 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -75,9 +75,12 @@ where }; let (parts, body) = res.unwrap().into_parts(); + // TODO: This is inefficient since current strategy needs to copy the whole body onto memory to cache it. + // This should be handled by copying buffer simultaneously while forwarding response to downstream. let Ok(bytes) = body.collect().await.map(|v| v.to_bytes()) else { return Err(RpxyError::FailedToWriteByteBufferForCache); }; + let bytes_clone = bytes.clone(); // TODO: this is inefficient. needs to be reconsidered to avoid unnecessary copy and should spawn async task to store cache. // We may need to use the same logic as h3. diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index d194b1f..1846d67 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -153,20 +153,40 @@ where ) .await?; - let (new_res_parts, new_body) = res.into_parts(); + let (new_res_parts, mut new_body) = res.into_parts(); let new_res = Response::from_parts(new_res_parts, ()); match send_stream.send_response(new_res).await { Ok(_) => { debug!("HTTP/3 response to connection successful"); - // aggregate body without copying - let body_data = new_body - .collect() - .await + loop { + let frame = match new_body.frame().await { + Some(frame) => frame, + None => { + debug!("Response body finished"); + break; + } + } .map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?; - // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes() - send_stream.send_data(body_data.to_bytes()).await?; + if frame.is_data() { + let data = frame.into_data().unwrap_or_default(); + debug!("Write data to HTTP/3 stream"); + send_stream.send_data(data).await?; + } else if frame.is_trailers() { + let trailers = frame.into_trailers().unwrap_or_default(); + debug!("Write trailer to HTTP/3 stream"); + send_stream.send_trailers(trailers).await?; + } + } + // // aggregate body without copying + // let body_data = new_body + // .collect() + // .await + // .map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?; + + // // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes() + // send_stream.send_data(body_data.to_bytes()).await?; // TODO: needs handling trailer? should be included in body from handler. } From d526ce6cb478fd89c5b76759053ecb4a3799b682 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Mon, 11 Dec 2023 18:23:08 +0900 Subject: [PATCH 06/12] wip: refactor: reconsider timeouts of connections --- rpxy-lib/src/constants.rs | 4 ++-- rpxy-lib/src/forwarder/client.rs | 2 ++ rpxy-lib/src/globals.rs | 10 ++++---- rpxy-lib/src/message_handler/handler_main.rs | 15 ++++-------- rpxy-lib/src/message_handler/http_result.rs | 3 --- rpxy-lib/src/proxy/mod.rs | 2 ++ rpxy-lib/src/proxy/proxy_h3.rs | 13 ++++------ rpxy-lib/src/proxy/proxy_main.rs | 25 ++++++++++---------- 8 files changed, 35 insertions(+), 39 deletions(-) diff --git a/rpxy-lib/src/constants.rs b/rpxy-lib/src/constants.rs index ebec1fc..acc9381 100644 --- a/rpxy-lib/src/constants.rs +++ b/rpxy-lib/src/constants.rs @@ -4,8 +4,8 @@ pub const RESPONSE_HEADER_SERVER: &str = "rpxy"; pub const TCP_LISTEN_BACKLOG: u32 = 1024; // pub const HTTP_LISTEN_PORT: u16 = 8080; // pub const HTTPS_LISTEN_PORT: u16 = 8443; -pub const PROXY_TIMEOUT_SEC: u64 = 60; -pub const UPSTREAM_TIMEOUT_SEC: u64 = 60; +pub const PROXY_IDLE_TIMEOUT_SEC: u64 = 20; +pub const UPSTREAM_IDLE_TIMEOUT_SEC: u64 = 20; pub const TLS_HANDSHAKE_TIMEOUT_SEC: u64 = 15; // default as with firefox browser pub const MAX_CLIENTS: usize = 512; pub const MAX_CONCURRENT_STREAMS: u32 = 64; diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index c6c6218..8b86f9f 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -8,6 +8,7 @@ use crate::{ log::*, }; use async_trait::async_trait; +use chrono::Duration; use http::{Request, Response, Version}; use hyper::body::{Body, Incoming}; use hyper_util::client::legacy::{ @@ -184,6 +185,7 @@ where let mut http = HttpConnector::new(); http.enforce_http(false); http.set_reuse_address(true); + http.set_keepalive(Some(_globals.proxy_config.upstream_idle_timeout)); hyper_tls::HttpsConnector::from((http, tls.into())) }) }; diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index 86fdc46..9cd62b3 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -33,8 +33,10 @@ pub struct ProxyConfig { /// tcp listen backlog pub tcp_listen_backlog: u32, - pub proxy_timeout: Duration, // when serving requests at Proxy - pub upstream_timeout: Duration, // when serving requests at Handler + /// Idle timeout as an HTTP server, used as the keep alive interval and timeout for reading request header + pub proxy_idle_timeout: Duration, + /// Idle timeout as an HTTP client, used as the keep alive interval for upstream connections + pub upstream_idle_timeout: Duration, pub max_clients: usize, // when serving requests pub max_concurrent_streams: u32, // when instantiate server @@ -80,8 +82,8 @@ impl Default for ProxyConfig { tcp_listen_backlog: TCP_LISTEN_BACKLOG, // TODO: Reconsider each timeout values - proxy_timeout: Duration::from_secs(PROXY_TIMEOUT_SEC), - upstream_timeout: Duration::from_secs(UPSTREAM_TIMEOUT_SEC), + proxy_idle_timeout: Duration::from_secs(PROXY_IDLE_TIMEOUT_SEC), + upstream_idle_timeout: Duration::from_secs(UPSTREAM_IDLE_TIMEOUT_SEC), max_clients: MAX_CLIENTS, max_concurrent_streams: MAX_CONCURRENT_STREAMS, diff --git a/rpxy-lib/src/message_handler/handler_main.rs b/rpxy-lib/src/message_handler/handler_main.rs index a9fae01..251411b 100644 --- a/rpxy-lib/src/message_handler/handler_main.rs +++ b/rpxy-lib/src/message_handler/handler_main.rs @@ -19,7 +19,7 @@ use derive_builder::Builder; use http::{Request, Response, StatusCode}; use hyper_util::{client::legacy::connect::Connect, rt::TokioIo}; use std::{net::SocketAddr, sync::Arc}; -use tokio::{io::copy_bidirectional, time::timeout}; +use tokio::io::copy_bidirectional; #[allow(dead_code)] #[derive(Debug)] @@ -172,15 +172,10 @@ where ////////////// // Forward request to a chosen backend - let mut res_backend = { - let Ok(result) = timeout(self.globals.proxy_config.upstream_timeout, self.forwarder.request(req)).await else { - return Err(HttpError::TimeoutUpstreamRequest); - }; - match result { - Ok(res) => res, - Err(e) => { - return Err(HttpError::FailedToGetResponseFromBackend(e.to_string())); - } + let mut res_backend = match self.forwarder.request(req).await { + Ok(v) => v, + Err(e) => { + return Err(HttpError::FailedToGetResponseFromBackend(e.to_string())); } }; ////////////// diff --git a/rpxy-lib/src/message_handler/http_result.rs b/rpxy-lib/src/message_handler/http_result.rs index 857ab55..ec48200 100644 --- a/rpxy-lib/src/message_handler/http_result.rs +++ b/rpxy-lib/src/message_handler/http_result.rs @@ -22,8 +22,6 @@ pub enum HttpError { NoUpstreamCandidates, #[error("Failed to generate upstream request for backend application: {0}")] FailedToGenerateUpstreamRequest(String), - #[error("Timeout in upstream request")] - TimeoutUpstreamRequest, #[error("Failed to get response from backend: {0}")] FailedToGetResponseFromBackend(String), @@ -53,7 +51,6 @@ impl From for StatusCode { HttpError::FailedToRedirect(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::NoUpstreamCandidates => StatusCode::NOT_FOUND, HttpError::FailedToGenerateUpstreamRequest(_) => StatusCode::INTERNAL_SERVER_ERROR, - HttpError::TimeoutUpstreamRequest => StatusCode::GATEWAY_TIMEOUT, HttpError::FailedToAddSetCookeInResponse(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::FailedToGenerateDownstreamResponse(_) => StatusCode::INTERNAL_SERVER_ERROR, HttpError::FailedToUpgrade(_) => StatusCode::INTERNAL_SERVER_ERROR, diff --git a/rpxy-lib/src/proxy/mod.rs b/rpxy-lib/src/proxy/mod.rs index 389df43..d1aa5c3 100644 --- a/rpxy-lib/src/proxy/mod.rs +++ b/rpxy-lib/src/proxy/mod.rs @@ -19,9 +19,11 @@ pub(crate) fn connection_builder(globals: &Arc) -> Arc| { serve_request( @@ -104,10 +102,9 @@ where tls_server_name.clone(), ) }), - ), - ) - .await - .ok(); + ) + .await + .ok(); request_count.decrement(); debug!("Request processed: current # {}", request_count.current()); @@ -201,8 +198,7 @@ where return Err(RpxyError::FailedToTlsHandshake(e.to_string())); } }; - self_inner.serve_connection(stream, client_addr, server_name); - Ok(()) as RpxyResult<()> + Ok((stream, client_addr, server_name)) }; self.globals.runtime_handle.spawn( async move { @@ -214,8 +210,13 @@ where error!("Timeout to handshake TLS"); return; }; - if let Err(e) = v { - error!("{}", e); + match v { + Ok((stream, client_addr, server_name)) => { + self_inner.serve_connection(stream, client_addr, server_name); + } + Err(e) => { + error!("{}", e); + } } }); } From b8f3034014231b8aab945f027a499c3773b653fc Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Mon, 11 Dec 2023 18:40:31 +0900 Subject: [PATCH 07/12] wip: fix keep alive timeouts --- rpxy-lib/Cargo.toml | 1 + rpxy-lib/src/hyper_ext/mod.rs | 2 ++ rpxy-lib/src/proxy/mod.rs | 7 ++++++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index 65c7c58..f30f4bb 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -41,6 +41,7 @@ tokio = { version = "1.34.0", default-features = false, features = [ "macros", "fs", ] } +pin-project-lite = "0.2.13" async-trait = "0.1.74" # Error handling diff --git a/rpxy-lib/src/hyper_ext/mod.rs b/rpxy-lib/src/hyper_ext/mod.rs index e6c81e7..cfa2b70 100644 --- a/rpxy-lib/src/hyper_ext/mod.rs +++ b/rpxy-lib/src/hyper_ext/mod.rs @@ -1,10 +1,12 @@ mod body_incoming_like; mod body_type; mod executor; +mod tokio_timer; mod watch; pub(crate) mod rt { pub(crate) use super::executor::LocalExecutor; + pub(crate) use super::tokio_timer::{TokioSleep, TokioTimer}; } pub(crate) mod body { pub(crate) use super::body_incoming_like::IncomingLike; diff --git a/rpxy-lib/src/proxy/mod.rs b/rpxy-lib/src/proxy/mod.rs index d1aa5c3..a7c1ec8 100644 --- a/rpxy-lib/src/proxy/mod.rs +++ b/rpxy-lib/src/proxy/mod.rs @@ -6,7 +6,10 @@ mod proxy_quic_quinn; mod proxy_quic_s2n; mod socket; -use crate::{globals::Globals, hyper_ext::rt::LocalExecutor}; +use crate::{ + globals::Globals, + hyper_ext::rt::{LocalExecutor, TokioTimer}, +}; use hyper_util::server::{self, conn::auto::Builder as ConnectionBuilder}; use std::sync::Arc; @@ -20,10 +23,12 @@ pub(crate) fn connection_builder(globals: &Arc) -> Arc Date: Mon, 11 Dec 2023 18:41:17 +0900 Subject: [PATCH 08/12] add tokio timer --- rpxy-lib/src/hyper_ext/mod.rs | 3 +- rpxy-lib/src/hyper_ext/tokio_timer.rs | 55 +++++++++++++++++++++++++++ 2 files changed, 57 insertions(+), 1 deletion(-) create mode 100644 rpxy-lib/src/hyper_ext/tokio_timer.rs diff --git a/rpxy-lib/src/hyper_ext/mod.rs b/rpxy-lib/src/hyper_ext/mod.rs index cfa2b70..922776c 100644 --- a/rpxy-lib/src/hyper_ext/mod.rs +++ b/rpxy-lib/src/hyper_ext/mod.rs @@ -4,13 +4,14 @@ mod executor; mod tokio_timer; mod watch; +#[allow(unused)] pub(crate) mod rt { pub(crate) use super::executor::LocalExecutor; pub(crate) use super::tokio_timer::{TokioSleep, TokioTimer}; } +#[allow(unused)] pub(crate) mod body { pub(crate) use super::body_incoming_like::IncomingLike; - #[allow(unused)] pub(crate) use super::body_type::{ empty, full, wrap_incoming_body_response, wrap_synthetic_body_response, BoxBody, IncomingOr, }; diff --git a/rpxy-lib/src/hyper_ext/tokio_timer.rs b/rpxy-lib/src/hyper_ext/tokio_timer.rs new file mode 100644 index 0000000..53a1af7 --- /dev/null +++ b/rpxy-lib/src/hyper_ext/tokio_timer.rs @@ -0,0 +1,55 @@ +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, + time::{Duration, Instant}, +}; + +use hyper::rt::{Sleep, Timer}; +use pin_project_lite::pin_project; + +#[derive(Clone, Debug)] +pub struct TokioTimer; + +impl Timer for TokioTimer { + fn sleep(&self, duration: Duration) -> Pin> { + Box::pin(TokioSleep { + inner: tokio::time::sleep(duration), + }) + } + + fn sleep_until(&self, deadline: Instant) -> Pin> { + Box::pin(TokioSleep { + inner: tokio::time::sleep_until(deadline.into()), + }) + } + + fn reset(&self, sleep: &mut Pin>, new_deadline: Instant) { + if let Some(sleep) = sleep.as_mut().downcast_mut_pin::() { + sleep.reset(new_deadline) + } + } +} + +pin_project! { + pub(crate) struct TokioSleep { + #[pin] + pub(crate) inner: tokio::time::Sleep, + } +} + +impl Future for TokioSleep { + type Output = (); + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + self.project().inner.poll(cx) + } +} + +impl Sleep for TokioSleep {} + +impl TokioSleep { + pub fn reset(self: Pin<&mut Self>, deadline: Instant) { + self.project().inner.as_mut().reset(deadline.into()); + } +} From 008b62a9256a9b71e0752ab4433e0fb43600e20e Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Tue, 12 Dec 2023 19:58:33 +0900 Subject: [PATCH 09/12] wip: feat: define response body enum --- rpxy-lib/src/error.rs | 30 +----- rpxy-lib/src/forwarder/cache/cache_error.rs | 35 +++++++ .../{cache.rs => cache/cache_main.rs} | 97 ++++++++++++++++--- rpxy-lib/src/forwarder/cache/mod.rs | 5 + rpxy-lib/src/forwarder/client.rs | 53 ++++------ rpxy-lib/src/forwarder/mod.rs | 7 +- rpxy-lib/src/hyper_ext/body_type.rs | 67 ++++++++++--- rpxy-lib/src/hyper_ext/mod.rs | 4 +- rpxy-lib/src/message_handler/handler_main.rs | 6 +- .../src/message_handler/synthetic_response.rs | 10 +- rpxy-lib/src/proxy/proxy_h3.rs | 1 - rpxy-lib/src/proxy/proxy_main.rs | 4 +- 12 files changed, 215 insertions(+), 104 deletions(-) create mode 100644 rpxy-lib/src/forwarder/cache/cache_error.rs rename rpxy-lib/src/forwarder/{cache.rs => cache/cache_main.rs} (75%) create mode 100644 rpxy-lib/src/forwarder/cache/mod.rs diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 343cf04..f63a06c 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -84,35 +84,15 @@ pub enum RpxyError { #[error("Failed to fetch from upstream: {0}")] FailedToFetchFromUpstream(String), - // Cache errors, - #[cfg(feature = "cache")] - #[error("Invalid null request and/or response")] - NullRequestOrResponse, - - #[cfg(feature = "cache")] - #[error("Failed to write byte buffer")] - FailedToWriteByteBufferForCache, - - #[cfg(feature = "cache")] - #[error("Failed to acquire mutex lock for cache")] - FailedToAcquiredMutexLockForCache, - - #[cfg(feature = "cache")] - #[error("Failed to create file cache")] - FailedToCreateFileCache, - - #[cfg(feature = "cache")] - #[error("Failed to write file cache")] - FailedToWriteFileCache, - - #[cfg(feature = "cache")] - #[error("Failed to open cache file")] - FailedToOpenCacheFile, - // Upstream connection setting errors #[error("Unsupported upstream option")] UnsupportedUpstreamOption, + // Cache error map + #[cfg(feature = "cache")] + #[error("Cache error: {0}")] + CacheError(#[from] crate::forwarder::CacheError), + // Others #[error("Infallible")] Infallible(#[from] std::convert::Infallible), diff --git a/rpxy-lib/src/forwarder/cache/cache_error.rs b/rpxy-lib/src/forwarder/cache/cache_error.rs new file mode 100644 index 0000000..bb2ffa6 --- /dev/null +++ b/rpxy-lib/src/forwarder/cache/cache_error.rs @@ -0,0 +1,35 @@ +use thiserror::Error; + +pub type CacheResult = std::result::Result; + +/// Describes things that can go wrong in the Rpxy +#[derive(Debug, Error)] +pub enum CacheError { + // Cache errors, + #[error("Invalid null request and/or response")] + NullRequestOrResponse, + + #[error("Failed to write byte buffer")] + FailedToWriteByteBufferForCache, + + #[error("Failed to acquire mutex lock for cache")] + FailedToAcquiredMutexLockForCache, + + #[error("Failed to create file cache")] + FailedToCreateFileCache, + + #[error("Failed to write file cache")] + FailedToWriteFileCache, + + #[error("Failed to open cache file")] + FailedToOpenCacheFile, + + #[error("Too large to cache")] + TooLargeToCache, + + #[error("Failed to cache bytes: {0}")] + FailedToCacheBytes(String), + + #[error("Failed to send frame to cache {0}")] + FailedToSendFrameToCache(String), +} diff --git a/rpxy-lib/src/forwarder/cache.rs b/rpxy-lib/src/forwarder/cache/cache_main.rs similarity index 75% rename from rpxy-lib/src/forwarder/cache.rs rename to rpxy-lib/src/forwarder/cache/cache_main.rs index 03755e6..2bc4548 100644 --- a/rpxy-lib/src/forwarder/cache.rs +++ b/rpxy-lib/src/forwarder/cache/cache_main.rs @@ -1,8 +1,11 @@ -use crate::{error::*, globals::Globals, log::*}; +use super::cache_error::*; +use crate::{globals::Globals, hyper_ext::body::UnboundedStreamBody, log::*}; use bytes::{Buf, Bytes, BytesMut}; +use futures::channel::mpsc; use http::{Request, Response}; -use http_body_util::StreamBody; +use http_body_util::{BodyExt, StreamBody}; use http_cache_semantics::CachePolicy; +use hyper::body::{Body, Frame, Incoming}; use lru::LruCache; use std::{ convert::Infallible, @@ -69,6 +72,73 @@ impl RpxyCache { let on_memory = total - file; (total, on_memory, file) } + + /// Put response into the cache + pub async fn put( + &self, + uri: &hyper::Uri, + mut body: Incoming, + policy: &CachePolicy, + ) -> CacheResult { + let my_cache = self.inner.clone(); + let mut file_store = self.file_store.clone(); + let uri = uri.clone(); + let policy_clone = policy.clone(); + let max_each_size = self.max_each_size; + let max_each_size_on_memory = self.max_each_size_on_memory; + + let (body_tx, body_rx) = mpsc::unbounded::, hyper::Error>>(); + + self.runtime_handle.spawn(async move { + let mut size = 0usize; + loop { + let frame = match body.frame().await { + Some(frame) => frame, + None => { + debug!("Response body finished"); + break; + } + }; + let frame_size = frame.as_ref().map(|f| { + if f.is_data() { + f.data_ref().map(|bytes| bytes.remaining()).unwrap_or_default() + } else { + 0 + } + }); + size += frame_size.unwrap_or_default(); + + // check size + if size > max_each_size { + warn!("Too large to cache"); + return Err(CacheError::TooLargeToCache); + } + frame + .as_ref() + .map(|f| { + if f.is_data() { + let data_bytes = f.data_ref().unwrap().clone(); + println!("ddddde"); + // TODO: cache data bytes as file or on memory + // fileにするかmemoryにするかの判断はある程度までバッファしてやってという手を使うことになる。途中までキャッシュしたやつはどうするかとかいう判断も必要。 + // ファイルとObjectのbindをどうやってするか + } + }) + .map_err(|e| CacheError::FailedToCacheBytes(e.to_string()))?; + + // send data to use response downstream + body_tx + .unbounded_send(frame) + .map_err(|e| CacheError::FailedToSendFrameToCache(e.to_string()))?; + } + + Ok(()) as CacheResult<()> + }); + + let stream_body = StreamBody::new(body_rx); + + Ok(stream_body) + } } /* ---------------------------------------------- */ @@ -93,7 +163,7 @@ impl FileStore { inner.cnt } /// Create a temporary file cache - async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> RpxyResult { + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> CacheResult { let mut inner = self.inner.write().await; inner.create(cache_filename, body_bytes).await } @@ -106,7 +176,7 @@ impl FileStore { // }; // } // /// Read a temporary file cache - // async fn read(&self, path: impl AsRef) -> RpxyResult { + // async fn read(&self, path: impl AsRef) -> CacheResult { // let inner = self.inner.read().await; // inner.read(&path).await // } @@ -141,16 +211,16 @@ impl FileStoreInner { } /// Create a new temporary file cache - async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> RpxyResult { + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> CacheResult { let cache_filepath = self.cache_dir.join(cache_filename); let Ok(mut file) = File::create(&cache_filepath).await else { - return Err(RpxyError::FailedToCreateFileCache); + return Err(CacheError::FailedToCreateFileCache); }; let mut bytes_clone = body_bytes.clone(); while bytes_clone.has_remaining() { if let Err(e) = file.write_buf(&mut bytes_clone).await { error!("Failed to write file cache: {e}"); - return Err(RpxyError::FailedToWriteFileCache); + return Err(CacheError::FailedToWriteFileCache); }; } self.cnt += 1; @@ -158,15 +228,14 @@ impl FileStoreInner { } /// Retrieve a stored temporary file cache - async fn read(&self, path: impl AsRef) -> RpxyResult<()> { + async fn read(&self, path: impl AsRef) -> CacheResult<()> { let Ok(mut file) = File::open(&path).await else { warn!("Cache file object cannot be opened"); - return Err(RpxyError::FailedToOpenCacheFile); + return Err(CacheError::FailedToOpenCacheFile); }; /* ----------------------------- */ // PoC for streaming body - use futures::channel::mpsc; let (tx, rx) = mpsc::unbounded::, Infallible>>(); // let (body_sender, res_body) = Body::channel(); @@ -263,10 +332,10 @@ impl LruCacheManager { } /// Push an entry - fn push(&self, cache_key: &str, cache_object: CacheObject) -> RpxyResult> { + fn push(&self, cache_key: &str, cache_object: CacheObject) -> CacheResult> { let Ok(mut lock) = self.inner.lock() else { error!("Failed to acquire mutex lock for writing cache entry"); - return Err(RpxyError::FailedToAcquiredMutexLockForCache); + return Err(CacheError::FailedToAcquiredMutexLockForCache); }; let res = Ok(lock.push(cache_key.to_string(), cache_object)); // This may be inconsistent with the actual number of entries @@ -280,13 +349,13 @@ impl LruCacheManager { pub fn get_policy_if_cacheable( req: Option<&Request>, res: Option<&Response>, -) -> RpxyResult> +) -> CacheResult> // where // B1: core::fmt::Debug, { // deduce cache policy from req and res let (Some(req), Some(res)) = (req, res) else { - return Err(RpxyError::NullRequestOrResponse); + return Err(CacheError::NullRequestOrResponse); }; let new_policy = CachePolicy::new(req, res); diff --git a/rpxy-lib/src/forwarder/cache/mod.rs b/rpxy-lib/src/forwarder/cache/mod.rs new file mode 100644 index 0000000..cfe5a1b --- /dev/null +++ b/rpxy-lib/src/forwarder/cache/mod.rs @@ -0,0 +1,5 @@ +mod cache_error; +mod cache_main; + +pub use cache_error::CacheError; +pub use cache_main::{get_policy_if_cacheable, CacheFileOrOnMemory, RpxyCache}; diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index 8b86f9f..8d2e307 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -1,14 +1,10 @@ use crate::{ error::{RpxyError, RpxyResult}, globals::Globals, - hyper_ext::{ - body::{wrap_incoming_body_response, BoxBody, IncomingOr}, - rt::LocalExecutor, - }, + hyper_ext::{body::ResponseBody, rt::LocalExecutor}, log::*, }; use async_trait::async_trait; -use chrono::Duration; use http::{Request, Response, Version}; use hyper::body::{Body, Incoming}; use hyper_util::client::legacy::{ @@ -19,10 +15,6 @@ use std::sync::Arc; #[cfg(feature = "cache")] use super::cache::{get_policy_if_cacheable, RpxyCache}; -#[cfg(feature = "cache")] -use crate::hyper_ext::body::{full, wrap_synthetic_body_response}; -#[cfg(feature = "cache")] -use http_body_util::BodyExt; #[async_trait] /// Definition of the forwarder that simply forward requests from downstream client to upstream app servers. @@ -40,7 +32,7 @@ pub struct Forwarder { } #[async_trait] -impl ForwardRequest> for Forwarder +impl ForwardRequest for Forwarder where C: Send + Sync + Connect + Clone + 'static, B1: Body + Send + Sync + Unpin + 'static, @@ -49,7 +41,7 @@ where { type Error = RpxyError; - async fn request(&self, req: Request) -> Result>, Self::Error> { + async fn request(&self, req: Request) -> Result, Self::Error> { // TODO: cache handling #[cfg(feature = "cache")] { @@ -67,38 +59,27 @@ where let res = self.request_directly(req).await; if self.cache.is_none() { - return res.map(wrap_incoming_body_response::); + return res.map(|inner| inner.map(ResponseBody::Incoming)); } // check cacheability and store it if cacheable let Ok(Some(cache_policy)) = get_policy_if_cacheable(synth_req.as_ref(), res.as_ref().ok()) else { - return res.map(wrap_incoming_body_response::); + return res.map(|inner| inner.map(ResponseBody::Incoming)); }; let (parts, body) = res.unwrap().into_parts(); - // TODO: This is inefficient since current strategy needs to copy the whole body onto memory to cache it. - // This should be handled by copying buffer simultaneously while forwarding response to downstream. - let Ok(bytes) = body.collect().await.map(|v| v.to_bytes()) else { - return Err(RpxyError::FailedToWriteByteBufferForCache); - }; - let bytes_clone = bytes.clone(); + // Get streamed body without waiting for the arrival of the body, + // which is done simultaneously with caching. + let stream_body = self + .cache + .as_ref() + .unwrap() + .put(synth_req.unwrap().uri(), body, &cache_policy) + .await?; - // TODO: this is inefficient. needs to be reconsidered to avoid unnecessary copy and should spawn async task to store cache. - // We may need to use the same logic as h3. - // Is bytes.clone() enough? - - // if let Err(cache_err) = self - // .cache - // .as_ref() - // .unwrap() - // .put(synth_req.unwrap().uri(), &bytes, &cache_policy) - // .await - // { - // error!("{:?}", cache_err); - // }; - - // response with cached body - Ok(wrap_synthetic_body_response(Response::from_parts(parts, full(bytes)))) + // response with body being cached in background + let new_res = Response::from_parts(parts, ResponseBody::Streamed(stream_body)); + Ok(new_res) } // No cache handling @@ -107,7 +88,7 @@ where self .request_directly(req) .await - .map(wrap_incoming_body_response::) + .map(|inner| inner.map(ResponseBody::Incoming)) } } } diff --git a/rpxy-lib/src/forwarder/mod.rs b/rpxy-lib/src/forwarder/mod.rs index 286cb40..d53cd73 100644 --- a/rpxy-lib/src/forwarder/mod.rs +++ b/rpxy-lib/src/forwarder/mod.rs @@ -3,6 +3,9 @@ mod cache; mod client; use crate::hyper_ext::body::{IncomingLike, IncomingOr}; -pub type Forwarder = client::Forwarder>; -pub use client::ForwardRequest; +pub(crate) type Forwarder = client::Forwarder>; +pub(crate) use client::ForwardRequest; + +#[cfg(feature = "cache")] +pub(crate) use cache::CacheError; diff --git a/rpxy-lib/src/hyper_ext/body_type.rs b/rpxy-lib/src/hyper_ext/body_type.rs index 9616306..c1eb54b 100644 --- a/rpxy-lib/src/hyper_ext/body_type.rs +++ b/rpxy-lib/src/hyper_ext/body_type.rs @@ -1,24 +1,25 @@ -use http::Response; +// use http::Response; use http_body_util::{combinators, BodyExt, Either, Empty, Full}; -use hyper::body::{Bytes, Incoming}; +use hyper::body::{Body, Bytes, Incoming}; +use std::pin::Pin; /// Type for synthetic boxed body pub(crate) type BoxBody = combinators::BoxBody; /// Type for either passthrough body or given body type, specifically synthetic boxed body pub(crate) type IncomingOr = Either; -/// helper function to build http response with passthrough body -pub(crate) fn wrap_incoming_body_response(response: Response) -> Response> -where - B: hyper::body::Body, -{ - response.map(IncomingOr::Left) -} +// /// helper function to build http response with passthrough body +// pub(crate) fn wrap_incoming_body_response(response: Response) -> Response> +// where +// B: hyper::body::Body, +// { +// response.map(IncomingOr::Left) +// } -/// helper function to build http response with synthetic body -pub(crate) fn wrap_synthetic_body_response(response: Response) -> Response> { - response.map(IncomingOr::Right) -} +// /// helper function to build http response with synthetic body +// pub(crate) fn wrap_synthetic_body_response(response: Response) -> Response> { +// response.map(IncomingOr::Right) +// } /// helper function to build a empty body pub(crate) fn empty() -> BoxBody { @@ -29,3 +30,43 @@ pub(crate) fn empty() -> BoxBody { pub(crate) fn full(body: Bytes) -> BoxBody { Full::new(body).map_err(|never| match never {}).boxed() } + +/* ------------------------------------ */ +#[cfg(feature = "cache")] +use futures::channel::mpsc::UnboundedReceiver; +#[cfg(feature = "cache")] +use http_body_util::StreamBody; +#[cfg(feature = "cache")] +use hyper::body::Frame; + +#[cfg(feature = "cache")] +pub(crate) type UnboundedStreamBody = StreamBody, hyper::Error>>>; + +/// Response body use in this project +/// - Incoming: just a type that only forwards the upstream response body to downstream. +/// - BoxedCache: a type that is generated from cache, e.g.,, small byte object. +/// - StreamedCache: another type that is generated from cache as stream, e.g., large byte object. +pub(crate) enum ResponseBody { + Incoming(Incoming), + Boxed(BoxBody), + #[cfg(feature = "cache")] + Streamed(UnboundedStreamBody), +} + +impl Body for ResponseBody { + type Data = bytes::Bytes; + type Error = hyper::Error; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, Self::Error>>> { + match self.get_mut() { + ResponseBody::Incoming(incoming) => Pin::new(incoming).poll_frame(cx), + #[cfg(feature = "cache")] + ResponseBody::Boxed(boxed) => Pin::new(boxed).poll_frame(cx), + #[cfg(feature = "cache")] + ResponseBody::Streamed(streamed) => Pin::new(streamed).poll_frame(cx), + } + } +} diff --git a/rpxy-lib/src/hyper_ext/mod.rs b/rpxy-lib/src/hyper_ext/mod.rs index 922776c..8b3776c 100644 --- a/rpxy-lib/src/hyper_ext/mod.rs +++ b/rpxy-lib/src/hyper_ext/mod.rs @@ -12,7 +12,5 @@ pub(crate) mod rt { #[allow(unused)] pub(crate) mod body { pub(crate) use super::body_incoming_like::IncomingLike; - pub(crate) use super::body_type::{ - empty, full, wrap_incoming_body_response, wrap_synthetic_body_response, BoxBody, IncomingOr, - }; + pub(crate) use super::body_type::{empty, full, BoxBody, IncomingOr, ResponseBody, UnboundedStreamBody}; } diff --git a/rpxy-lib/src/message_handler/handler_main.rs b/rpxy-lib/src/message_handler/handler_main.rs index 251411b..b5ae87d 100644 --- a/rpxy-lib/src/message_handler/handler_main.rs +++ b/rpxy-lib/src/message_handler/handler_main.rs @@ -11,7 +11,7 @@ use crate::{ error::*, forwarder::{ForwardRequest, Forwarder}, globals::Globals, - hyper_ext::body::{BoxBody, IncomingLike, IncomingOr}, + hyper_ext::body::{IncomingLike, IncomingOr, ResponseBody}, log::*, name_exp::ServerName, }; @@ -58,7 +58,7 @@ where listen_addr: SocketAddr, tls_enabled: bool, tls_server_name: Option, - ) -> RpxyResult>> { + ) -> RpxyResult> { // preparing log data let mut log_data = HttpMessageLog::from(&req); log_data.client_addr(&client_addr); @@ -99,7 +99,7 @@ where listen_addr: SocketAddr, tls_enabled: bool, tls_server_name: Option, - ) -> HttpResult>> { + ) -> HttpResult> { // Here we start to inspect and parse with server_name let server_name = req .inspect_parse_host() diff --git a/rpxy-lib/src/message_handler/synthetic_response.rs b/rpxy-lib/src/message_handler/synthetic_response.rs index 60aeeec..a955a2d 100644 --- a/rpxy-lib/src/message_handler/synthetic_response.rs +++ b/rpxy-lib/src/message_handler/synthetic_response.rs @@ -1,16 +1,16 @@ use super::http_result::{HttpError, HttpResult}; use crate::{ error::*, - hyper_ext::body::{empty, BoxBody, IncomingOr}, + hyper_ext::body::{empty, ResponseBody}, name_exp::ServerName, }; use http::{Request, Response, StatusCode, Uri}; /// build http response with status code of 4xx and 5xx -pub(crate) fn synthetic_error_response(status_code: StatusCode) -> RpxyResult>> { +pub(crate) fn synthetic_error_response(status_code: StatusCode) -> RpxyResult> { let res = Response::builder() .status(status_code) - .body(IncomingOr::Right(empty())) + .body(ResponseBody::Boxed(empty())) .unwrap(); Ok(res) } @@ -20,7 +20,7 @@ pub(super) fn secure_redirection_response( server_name: &ServerName, tls_port: Option, req: &Request, -) -> HttpResult>> { +) -> HttpResult> { let server_name: String = server_name.try_into().unwrap_or_default(); let pq = match req.uri().path_and_query() { Some(x) => x.as_str(), @@ -36,7 +36,7 @@ pub(super) fn secure_redirection_response( let response = Response::builder() .status(StatusCode::MOVED_PERMANENTLY) .header("Location", dest_uri.to_string()) - .body(IncomingOr::Right(empty())) + .body(ResponseBody::Boxed(empty())) .map_err(|e| HttpError::FailedToRedirect(e.to_string()))?; Ok(response) } diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index 342c995..61328b2 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -138,7 +138,6 @@ where }); let new_req: Request> = Request::from_parts(req_parts, IncomingOr::Right(req_body)); - // Response> wrapped by RpxyResult let res = self .message_handler .handle_request( diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 96ec0be..2d7a649 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -5,7 +5,7 @@ use crate::{ error::*, globals::Globals, hyper_ext::{ - body::{BoxBody, IncomingOr}, + body::{IncomingOr, ResponseBody}, rt::LocalExecutor, }, log::*, @@ -32,7 +32,7 @@ async fn serve_request( listen_addr: SocketAddr, tls_enabled: bool, tls_server_name: Option, -) -> RpxyResult>> +) -> RpxyResult> where T: Send + Sync + Connect + Clone, U: CryptoSource + Clone, From 1c18f3836a38d552b64b2441dd5f3e5e5e11bc9a Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Tue, 12 Dec 2023 20:17:13 +0900 Subject: [PATCH 10/12] wip: feat: change request body from either to explicit enum --- rpxy-lib/src/error.rs | 2 + rpxy-lib/src/forwarder/cache/cache_main.rs | 2 +- rpxy-lib/src/forwarder/mod.rs | 4 +- rpxy-lib/src/hyper_ext/body_type.rs | 51 ++++++++++++-------- rpxy-lib/src/hyper_ext/mod.rs | 2 +- rpxy-lib/src/message_handler/handler_main.rs | 6 +-- rpxy-lib/src/proxy/proxy_h3.rs | 15 ++---- rpxy-lib/src/proxy/proxy_main.rs | 4 +- 8 files changed, 45 insertions(+), 41 deletions(-) diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index f63a06c..3b1afc9 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -28,6 +28,8 @@ pub enum RpxyError { HyperIncomingLikeNewClosed, #[error("New body write aborted")] HyperNewBodyWriteAborted, + #[error("Hyper error in serving request or response body type: {0}")] + HyperBodyError(#[from] hyper::Error), // http/3 errors #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] diff --git a/rpxy-lib/src/forwarder/cache/cache_main.rs b/rpxy-lib/src/forwarder/cache/cache_main.rs index 2bc4548..659ac41 100644 --- a/rpxy-lib/src/forwarder/cache/cache_main.rs +++ b/rpxy-lib/src/forwarder/cache/cache_main.rs @@ -118,7 +118,7 @@ impl RpxyCache { .map(|f| { if f.is_data() { let data_bytes = f.data_ref().unwrap().clone(); - println!("ddddde"); + debug!("cache data bytes of {} bytes", data_bytes.len()) // TODO: cache data bytes as file or on memory // fileにするかmemoryにするかの判断はある程度までバッファしてやってという手を使うことになる。途中までキャッシュしたやつはどうするかとかいう判断も必要。 // ファイルとObjectのbindをどうやってするか diff --git a/rpxy-lib/src/forwarder/mod.rs b/rpxy-lib/src/forwarder/mod.rs index d53cd73..26aa0c9 100644 --- a/rpxy-lib/src/forwarder/mod.rs +++ b/rpxy-lib/src/forwarder/mod.rs @@ -2,9 +2,9 @@ mod cache; mod client; -use crate::hyper_ext::body::{IncomingLike, IncomingOr}; +use crate::hyper_ext::body::RequestBody; -pub(crate) type Forwarder = client::Forwarder>; +pub(crate) type Forwarder = client::Forwarder; pub(crate) use client::ForwardRequest; #[cfg(feature = "cache")] diff --git a/rpxy-lib/src/hyper_ext/body_type.rs b/rpxy-lib/src/hyper_ext/body_type.rs index c1eb54b..a143eac 100644 --- a/rpxy-lib/src/hyper_ext/body_type.rs +++ b/rpxy-lib/src/hyper_ext/body_type.rs @@ -1,25 +1,11 @@ -// use http::Response; -use http_body_util::{combinators, BodyExt, Either, Empty, Full}; +use super::body::IncomingLike; +use crate::error::RpxyError; +use http_body_util::{combinators, BodyExt, Empty, Full}; use hyper::body::{Body, Bytes, Incoming}; use std::pin::Pin; /// Type for synthetic boxed body pub(crate) type BoxBody = combinators::BoxBody; -/// Type for either passthrough body or given body type, specifically synthetic boxed body -pub(crate) type IncomingOr = Either; - -// /// helper function to build http response with passthrough body -// pub(crate) fn wrap_incoming_body_response(response: Response) -> Response> -// where -// B: hyper::body::Body, -// { -// response.map(IncomingOr::Left) -// } - -// /// helper function to build http response with synthetic body -// pub(crate) fn wrap_synthetic_body_response(response: Response) -> Response> { -// response.map(IncomingOr::Right) -// } /// helper function to build a empty body pub(crate) fn empty() -> BoxBody { @@ -31,6 +17,30 @@ pub(crate) fn full(body: Bytes) -> BoxBody { Full::new(body).map_err(|never| match never {}).boxed() } +/* ------------------------------------ */ +/// Request body used in this project +/// - Incoming: just a type that only forwards the downstream request body to upstream. +/// - IncomingLike: a Incoming-like type in which channel is used +pub(crate) enum RequestBody { + Incoming(Incoming), + IncomingLike(IncomingLike), +} + +impl Body for RequestBody { + type Data = bytes::Bytes; + type Error = RpxyError; + + fn poll_frame( + self: Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll, Self::Error>>> { + match self.get_mut() { + RequestBody::Incoming(incoming) => Pin::new(incoming).poll_frame(cx).map_err(RpxyError::HyperBodyError), + RequestBody::IncomingLike(incoming_like) => Pin::new(incoming_like).poll_frame(cx), + } + } +} + /* ------------------------------------ */ #[cfg(feature = "cache")] use futures::channel::mpsc::UnboundedReceiver; @@ -44,8 +54,8 @@ pub(crate) type UnboundedStreamBody = StreamBody, @@ -68,5 +78,6 @@ impl Body for ResponseBody { #[cfg(feature = "cache")] ResponseBody::Streamed(streamed) => Pin::new(streamed).poll_frame(cx), } + .map_err(RpxyError::HyperBodyError) } } diff --git a/rpxy-lib/src/hyper_ext/mod.rs b/rpxy-lib/src/hyper_ext/mod.rs index 8b3776c..a4c5196 100644 --- a/rpxy-lib/src/hyper_ext/mod.rs +++ b/rpxy-lib/src/hyper_ext/mod.rs @@ -12,5 +12,5 @@ pub(crate) mod rt { #[allow(unused)] pub(crate) mod body { pub(crate) use super::body_incoming_like::IncomingLike; - pub(crate) use super::body_type::{empty, full, BoxBody, IncomingOr, ResponseBody, UnboundedStreamBody}; + pub(crate) use super::body_type::{empty, full, BoxBody, RequestBody, ResponseBody, UnboundedStreamBody}; } diff --git a/rpxy-lib/src/message_handler/handler_main.rs b/rpxy-lib/src/message_handler/handler_main.rs index b5ae87d..ceb5db4 100644 --- a/rpxy-lib/src/message_handler/handler_main.rs +++ b/rpxy-lib/src/message_handler/handler_main.rs @@ -11,7 +11,7 @@ use crate::{ error::*, forwarder::{ForwardRequest, Forwarder}, globals::Globals, - hyper_ext::body::{IncomingLike, IncomingOr, ResponseBody}, + hyper_ext::body::{RequestBody, ResponseBody}, log::*, name_exp::ServerName, }; @@ -53,7 +53,7 @@ where /// Responsible to passthrough responses from backend applications or generate synthetic error responses. pub async fn handle_request( &self, - req: Request>, + req: Request, client_addr: SocketAddr, // For access control listen_addr: SocketAddr, tls_enabled: bool, @@ -94,7 +94,7 @@ where async fn handle_request_inner( &self, log_data: &mut HttpMessageLog, - mut req: Request>, + mut req: Request, client_addr: SocketAddr, // For access control listen_addr: SocketAddr, tls_enabled: bool, diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index 61328b2..0295430 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -2,7 +2,7 @@ use super::proxy_main::Proxy; use crate::{ crypto::CryptoSource, error::*, - hyper_ext::body::{IncomingLike, IncomingOr}, + hyper_ext::body::{IncomingLike, RequestBody}, log::*, name_exp::ServerName, }; @@ -137,7 +137,7 @@ where Ok(()) as RpxyResult<()> }); - let new_req: Request> = Request::from_parts(req_parts, IncomingOr::Right(req_body)); + let new_req: Request = Request::from_parts(req_parts, RequestBody::IncomingLike(req_body)); let res = self .message_handler .handle_request( @@ -155,6 +155,7 @@ where match send_stream.send_response(new_res).await { Ok(_) => { debug!("HTTP/3 response to connection successful"); + // on-demand body streaming to downstream without expanding the object onto memory. loop { let frame = match new_body.frame().await { Some(frame) => frame, @@ -175,16 +176,6 @@ where send_stream.send_trailers(trailers).await?; } } - // // aggregate body without copying - // let body_data = new_body - // .collect() - // .await - // .map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?; - - // // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes() - // send_stream.send_data(body_data.to_bytes()).await?; - - // TODO: needs handling trailer? should be included in body from handler. } Err(err) => { error!("Unable to send response to connection peer: {:?}", err); diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 2d7a649..4fea840 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -5,7 +5,7 @@ use crate::{ error::*, globals::Globals, hyper_ext::{ - body::{IncomingOr, ResponseBody}, + body::{RequestBody, ResponseBody}, rt::LocalExecutor, }, log::*, @@ -39,7 +39,7 @@ where { handler .handle_request( - req.map(IncomingOr::Left), + req.map(RequestBody::Incoming), client_addr, listen_addr, tls_enabled, From 8dd6af6bc5d4dfb53959f7283f73689582ecab07 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Tue, 12 Dec 2023 22:15:34 +0900 Subject: [PATCH 11/12] wip: feat: refactored cache implementation for put --- rpxy-lib/src/forwarder/cache/cache_error.rs | 12 ++ rpxy-lib/src/forwarder/cache/cache_main.rs | 189 +++++++++++++------- rpxy-lib/src/forwarder/client.rs | 1 + 3 files changed, 142 insertions(+), 60 deletions(-) diff --git a/rpxy-lib/src/forwarder/cache/cache_error.rs b/rpxy-lib/src/forwarder/cache/cache_error.rs index bb2ffa6..5f6146a 100644 --- a/rpxy-lib/src/forwarder/cache/cache_error.rs +++ b/rpxy-lib/src/forwarder/cache/cache_error.rs @@ -15,6 +15,9 @@ pub enum CacheError { #[error("Failed to acquire mutex lock for cache")] FailedToAcquiredMutexLockForCache, + #[error("Failed to acquire mutex lock for check")] + FailedToAcquiredMutexLockForCheck, + #[error("Failed to create file cache")] FailedToCreateFileCache, @@ -32,4 +35,13 @@ pub enum CacheError { #[error("Failed to send frame to cache {0}")] FailedToSendFrameToCache(String), + + #[error("Failed to send frame from file cache {0}")] + FailedToSendFrameFromCache(String), + + #[error("Failed to remove cache file: {0}")] + FailedToRemoveCacheFile(String), + + #[error("Invalid cache target")] + InvalidCacheTarget, } diff --git a/rpxy-lib/src/forwarder/cache/cache_main.rs b/rpxy-lib/src/forwarder/cache/cache_main.rs index 659ac41..c16f1d6 100644 --- a/rpxy-lib/src/forwarder/cache/cache_main.rs +++ b/rpxy-lib/src/forwarder/cache/cache_main.rs @@ -1,14 +1,15 @@ use super::cache_error::*; use crate::{globals::Globals, hyper_ext::body::UnboundedStreamBody, log::*}; +use base64::{engine::general_purpose, Engine as _}; use bytes::{Buf, Bytes, BytesMut}; use futures::channel::mpsc; -use http::{Request, Response}; +use http::{Request, Response, Uri}; use http_body_util::{BodyExt, StreamBody}; use http_cache_semantics::CachePolicy; use hyper::body::{Body, Frame, Incoming}; use lru::LruCache; +use sha2::{Digest, Sha256}; use std::{ - convert::Infallible, path::{Path, PathBuf}, sync::{ atomic::{AtomicUsize, Ordering}, @@ -35,6 +36,8 @@ pub struct RpxyCache { max_each_size: usize, /// Maximum size of cache object on memory max_each_size_on_memory: usize, + /// Cache directory path + cache_dir: PathBuf, } impl RpxyCache { @@ -43,8 +46,8 @@ impl RpxyCache { if !globals.proxy_config.cache_enabled { return None; } - let path = globals.proxy_config.cache_dir.as_ref().unwrap(); - let file_store = FileStore::new(path, &globals.runtime_handle).await; + let cache_dir = globals.proxy_config.cache_dir.as_ref().unwrap(); + let file_store = FileStore::new(&globals.runtime_handle).await; let inner = LruCacheManager::new(globals.proxy_config.cache_max_entry); let max_each_size = globals.proxy_config.cache_max_each_size; @@ -56,12 +59,18 @@ impl RpxyCache { max_each_size_on_memory = max_each_size; } + if let Err(e) = fs::remove_dir_all(cache_dir).await { + warn!("Failed to clean up the cache dir: {e}"); + }; + fs::create_dir_all(&cache_dir).await.unwrap(); + Some(Self { file_store, inner, runtime_handle: globals.runtime_handle.clone(), max_each_size, max_each_size_on_memory, + cache_dir: cache_dir.clone(), }) } @@ -80,17 +89,20 @@ impl RpxyCache { mut body: Incoming, policy: &CachePolicy, ) -> CacheResult { - let my_cache = self.inner.clone(); + let cache_manager = self.inner.clone(); let mut file_store = self.file_store.clone(); let uri = uri.clone(); let policy_clone = policy.clone(); let max_each_size = self.max_each_size; let max_each_size_on_memory = self.max_each_size_on_memory; + let cache_dir = self.cache_dir.clone(); let (body_tx, body_rx) = mpsc::unbounded::, hyper::Error>>(); self.runtime_handle.spawn(async move { let mut size = 0usize; + let mut buf = BytesMut::new(); + loop { let frame = match body.frame().await { Some(frame) => frame, @@ -118,10 +130,9 @@ impl RpxyCache { .map(|f| { if f.is_data() { let data_bytes = f.data_ref().unwrap().clone(); - debug!("cache data bytes of {} bytes", data_bytes.len()) - // TODO: cache data bytes as file or on memory - // fileにするかmemoryにするかの判断はある程度までバッファしてやってという手を使うことになる。途中までキャッシュしたやつはどうするかとかいう判断も必要。 - // ファイルとObjectのbindをどうやってするか + debug!("cache data bytes of {} bytes", data_bytes.len()); + // We do not use stream-type buffering since it needs to lock file during operation. + buf.extend(data_bytes.as_ref()); } }) .map_err(|e| CacheError::FailedToCacheBytes(e.to_string()))?; @@ -132,6 +143,35 @@ impl RpxyCache { .map_err(|e| CacheError::FailedToSendFrameToCache(e.to_string()))?; } + let buf = buf.freeze(); + // Calculate hash of the cached data, after all data is received. + // In-operation calculation is possible but it blocks sending data. + let mut hasher = Sha256::new(); + hasher.update(buf.as_ref()); + let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref()); + debug!("Cached data: {} bytes, hash = {:?}", size, hash_bytes); + + // Create cache object + let cache_key = derive_cache_key_from_uri(&uri); + let cache_object = CacheObject { + policy: policy_clone, + target: CacheFileOrOnMemory::build(&cache_dir, &uri, &buf, max_each_size_on_memory), + hash: hash_bytes, + }; + + if let Some((k, v)) = cache_manager.push(&cache_key, &cache_object)? { + if k != cache_key { + info!("Over the cache capacity. Evict least recent used entry"); + if let CacheFileOrOnMemory::File(path) = v.target { + file_store.evict(&path).await; + } + } + } + // store cache object to file + if let CacheFileOrOnMemory::File(_) = cache_object.target { + file_store.create(&cache_object, &buf).await?; + } + Ok(()) as CacheResult<()> }); @@ -145,36 +185,35 @@ impl RpxyCache { #[derive(Debug, Clone)] /// Cache file manager outer that is responsible to handle `RwLock` struct FileStore { + /// Inner file store main object inner: Arc>, } impl FileStore { /// Build manager - async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { + async fn new(runtime_handle: &tokio::runtime::Handle) -> Self { Self { - inner: Arc::new(RwLock::new(FileStoreInner::new(path, runtime_handle).await)), + inner: Arc::new(RwLock::new(FileStoreInner::new(runtime_handle).await)), } } -} -impl FileStore { /// Count file cache entries async fn count(&self) -> usize { let inner = self.inner.read().await; inner.cnt } /// Create a temporary file cache - async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> CacheResult { + async fn create(&mut self, ref cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> { let mut inner = self.inner.write().await; - inner.create(cache_filename, body_bytes).await + inner.create(cache_object, body_bytes).await + } + /// Evict a temporary file cache + async fn evict(&self, path: impl AsRef) { + // Acquire the write lock + let mut inner = self.inner.write().await; + if let Err(e) = inner.remove(path).await { + warn!("Eviction failed during file object removal: {:?}", e); + }; } - // /// Evict a temporary file cache - // async fn evict(&self, path: impl AsRef) { - // // Acquire the write lock - // let mut inner = self.inner.write().await; - // if let Err(e) = inner.remove(path).await { - // warn!("Eviction failed during file object removal: {:?}", e); - // }; - // } // /// Read a temporary file cache // async fn read(&self, path: impl AsRef) -> CacheResult { // let inner = self.inner.read().await; @@ -185,8 +224,6 @@ impl FileStore { #[derive(Debug)] /// Manager inner for cache on file system struct FileStoreInner { - /// Directory of temporary files - cache_dir: PathBuf, /// Counter of current cached files cnt: usize, /// Async runtime @@ -197,22 +234,21 @@ impl FileStoreInner { /// Build new cache file manager. /// This first creates cache file dir if not exists, and cleans up the file inside the directory. /// TODO: Persistent cache is really difficult. `sqlite` or something like that is needed. - async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { - let path_buf = path.as_ref().to_path_buf(); - if let Err(e) = fs::remove_dir_all(path).await { - warn!("Failed to clean up the cache dir: {e}"); - }; - fs::create_dir_all(&path_buf).await.unwrap(); + async fn new(runtime_handle: &tokio::runtime::Handle) -> Self { Self { - cache_dir: path_buf.clone(), cnt: 0, runtime_handle: runtime_handle.clone(), } } /// Create a new temporary file cache - async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> CacheResult { - let cache_filepath = self.cache_dir.join(cache_filename); + async fn create(&mut self, cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> { + let cache_filepath = match cache_object.target { + CacheFileOrOnMemory::File(ref path) => path.clone(), + CacheFileOrOnMemory::OnMemory(_) => { + return Err(CacheError::InvalidCacheTarget); + } + }; let Ok(mut file) = File::create(&cache_filepath).await else { return Err(CacheError::FailedToCreateFileCache); }; @@ -224,50 +260,47 @@ impl FileStoreInner { }; } self.cnt += 1; - Ok(CacheFileOrOnMemory::File(cache_filepath)) + Ok(()) } /// Retrieve a stored temporary file cache - async fn read(&self, path: impl AsRef) -> CacheResult<()> { + async fn read(&self, path: impl AsRef) -> CacheResult { let Ok(mut file) = File::open(&path).await else { warn!("Cache file object cannot be opened"); return Err(CacheError::FailedToOpenCacheFile); }; - /* ----------------------------- */ - // PoC for streaming body - let (tx, rx) = mpsc::unbounded::, Infallible>>(); + let (body_tx, body_rx) = mpsc::unbounded::, hyper::Error>>(); - // let (body_sender, res_body) = Body::channel(); self.runtime_handle.spawn(async move { // let mut sender = body_sender; let mut buf = BytesMut::new(); loop { match file.read_buf(&mut buf).await { Ok(0) => break, - Ok(_) => tx - .unbounded_send(Ok(hyper::body::Frame::data(buf.copy_to_bytes(buf.remaining())))) - .map_err(|e| anyhow::anyhow!("Failed to read cache file: {e}"))?, - //sender.send_data(buf.copy_to_bytes(buf.remaining())).await?, + Ok(_) => body_tx + .unbounded_send(Ok(Frame::data(buf.copy_to_bytes(buf.remaining())))) + .map_err(|e| CacheError::FailedToSendFrameFromCache(e.to_string()))?, Err(_) => break, }; } - Ok(()) as anyhow::Result<()> + Ok(()) as CacheResult<()> }); - let mut rx = http_body_util::StreamBody::new(rx); - // TODO: 結局incominglikeなbodystreamを定義することになる。これだったらh3と合わせて自分で定義した方が良さそう。 - // typeが長すぎるのでwrapperを作った方がいい。 - // let response = Response::builder() - // .status(200) - // .header("content-type", "application/octet-stream") - // .body(rx) - // .unwrap(); + let stream_body = StreamBody::new(body_rx); - todo!() - /* ----------------------------- */ + Ok(stream_body) + } - // Ok(res_body) + /// Remove file + async fn remove(&mut self, path: impl AsRef) -> CacheResult<()> { + fs::remove_file(path.as_ref()) + .await + .map_err(|e| CacheError::FailedToRemoveCacheFile(e.to_string()))?; + self.cnt -= 1; + debug!("Removed a cache file at {:?} (file count: {})", path.as_ref(), self.cnt); + + Ok(()) } } @@ -279,7 +312,20 @@ pub enum CacheFileOrOnMemory { /// Pointer to the temporary cache file File(PathBuf), /// Cached body itself - OnMemory(Vec), + OnMemory(Bytes), +} + +impl CacheFileOrOnMemory { + /// Get cache object target + fn build(cache_dir: &Path, uri: &Uri, object: &Bytes, max_each_size_on_memory: usize) -> Self { + if object.len() > max_each_size_on_memory { + let cache_filename = derive_filename_from_uri(uri); + let cache_filepath = cache_dir.join(cache_filename); + CacheFileOrOnMemory::File(cache_filepath) + } else { + CacheFileOrOnMemory::OnMemory(object.clone()) + } + } } #[derive(Clone, Debug)] @@ -290,7 +336,7 @@ struct CacheObject { /// Cache target: on-memory object or temporary file pub target: CacheFileOrOnMemory, /// SHA256 hash of target to strongly bind the cache metadata (this object) and file target - pub hash: Vec, + pub hash: Bytes, } /* ---------------------------------------------- */ @@ -332,16 +378,28 @@ impl LruCacheManager { } /// Push an entry - fn push(&self, cache_key: &str, cache_object: CacheObject) -> CacheResult> { + fn push(&self, cache_key: &str, cache_object: &CacheObject) -> CacheResult> { let Ok(mut lock) = self.inner.lock() else { error!("Failed to acquire mutex lock for writing cache entry"); return Err(CacheError::FailedToAcquiredMutexLockForCache); }; - let res = Ok(lock.push(cache_key.to_string(), cache_object)); + let res = Ok(lock.push(cache_key.to_string(), cache_object.clone())); // This may be inconsistent with the actual number of entries self.cnt.store(lock.len(), Ordering::Relaxed); res } + + /// Get an entry + fn get(&self, cache_key: &str) -> CacheResult> { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked for checking cache entry"); + return Err(CacheError::FailedToAcquiredMutexLockForCheck); + }; + let Some(cached_object) = lock.get(cache_key) else { + return Ok(None); + }; + Ok(Some(cached_object.clone())) + } } /* ---------------------------------------------- */ @@ -366,3 +424,14 @@ pub fn get_policy_if_cacheable( Ok(None) } } + +fn derive_filename_from_uri(uri: &hyper::Uri) -> String { + let mut hasher = Sha256::new(); + hasher.update(uri.to_string()); + let digest = hasher.finalize(); + general_purpose::URL_SAFE_NO_PAD.encode(digest) +} + +fn derive_cache_key_from_uri(uri: &hyper::Uri) -> String { + uri.to_string() +} diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index 8d2e307..c6f1ca9 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -47,6 +47,7 @@ where { let mut synth_req = None; if self.cache.is_some() { + // TODO: try reading from cache // if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { // // if found, return it as response. // info!("Cache hit - Return from cache"); From bd29c9dc1da19918bc9e253b20435787a3346ff9 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Tue, 12 Dec 2023 22:50:24 +0900 Subject: [PATCH 12/12] wip: feat: implemented cache --- rpxy-lib/src/backend/mod.rs | 7 +- rpxy-lib/src/forwarder/cache/cache_error.rs | 3 + rpxy-lib/src/forwarder/cache/cache_main.rs | 114 +++++++++++++++++--- rpxy-lib/src/forwarder/client.rs | 12 +-- 4 files changed, 113 insertions(+), 23 deletions(-) diff --git a/rpxy-lib/src/backend/mod.rs b/rpxy-lib/src/backend/mod.rs index 788960d..097810a 100644 --- a/rpxy-lib/src/backend/mod.rs +++ b/rpxy-lib/src/backend/mod.rs @@ -3,10 +3,11 @@ mod load_balance; mod upstream; mod upstream_opts; -// #[cfg(feature = "sticky-cookie")] -// pub use self::load_balance::{StickyCookie, StickyCookieValue}; +#[cfg(feature = "sticky-cookie")] +pub(crate) use self::load_balance::{StickyCookie, StickyCookieValue}; +#[allow(unused)] pub(crate) use self::{ - load_balance::{LoadBalance, LoadBalanceContext, StickyCookie, StickyCookieValue}, + load_balance::{LoadBalance, LoadBalanceContext}, upstream::{PathManager, Upstream, UpstreamCandidates}, upstream_opts::UpstreamOption, }; diff --git a/rpxy-lib/src/forwarder/cache/cache_error.rs b/rpxy-lib/src/forwarder/cache/cache_error.rs index 5f6146a..35eae83 100644 --- a/rpxy-lib/src/forwarder/cache/cache_error.rs +++ b/rpxy-lib/src/forwarder/cache/cache_error.rs @@ -44,4 +44,7 @@ pub enum CacheError { #[error("Invalid cache target")] InvalidCacheTarget, + + #[error("Hash mismatched in cache file")] + HashMismatchedInCacheFile, } diff --git a/rpxy-lib/src/forwarder/cache/cache_main.rs b/rpxy-lib/src/forwarder/cache/cache_main.rs index c16f1d6..3c85d0c 100644 --- a/rpxy-lib/src/forwarder/cache/cache_main.rs +++ b/rpxy-lib/src/forwarder/cache/cache_main.rs @@ -1,12 +1,16 @@ use super::cache_error::*; -use crate::{globals::Globals, hyper_ext::body::UnboundedStreamBody, log::*}; +use crate::{ + globals::Globals, + hyper_ext::body::{full, BoxBody, ResponseBody, UnboundedStreamBody}, + log::*, +}; use base64::{engine::general_purpose, Engine as _}; use bytes::{Buf, Bytes, BytesMut}; use futures::channel::mpsc; use http::{Request, Response, Uri}; use http_body_util::{BodyExt, StreamBody}; use http_cache_semantics::CachePolicy; -use hyper::body::{Body, Frame, Incoming}; +use hyper::body::{Frame, Incoming}; use lru::LruCache; use sha2::{Digest, Sha256}; use std::{ @@ -15,6 +19,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, Mutex, }, + time::SystemTime, }; use tokio::{ fs::{self, File}, @@ -179,6 +184,66 @@ impl RpxyCache { Ok(stream_body) } + + /// Get cached response + pub async fn get(&self, req: &Request) -> Option> { + debug!( + "Current cache status: (total, on-memory, file) = {:?}", + self.count().await + ); + let cache_key = derive_cache_key_from_uri(req.uri()); + + // First check cache chance + let Ok(Some(cached_object)) = self.inner.get(&cache_key) else { + return None; + }; + + // Secondly check the cache freshness as an HTTP message + let now = SystemTime::now(); + let http_cache_semantics::BeforeRequest::Fresh(res_parts) = cached_object.policy.before_request(req, now) else { + // Evict stale cache entry. + // This might be okay to keep as is since it would be updated later. + // However, there is no guarantee that newly got objects will be still cacheable. + // So, we have to evict stale cache entries and cache file objects if found. + debug!("Stale cache entry: {cache_key}"); + let _evicted_entry = self.inner.evict(&cache_key); + // For cache file + if let CacheFileOrOnMemory::File(path) = &cached_object.target { + self.file_store.evict(&path).await; + } + return None; + }; + + // Finally retrieve the file/on-memory object + let response_body = match cached_object.target { + CacheFileOrOnMemory::File(path) => { + let stream_body = match self.file_store.read(path.clone(), &cached_object.hash).await { + Ok(s) => s, + Err(e) => { + warn!("Failed to read from file cache: {e}"); + let _evicted_entry = self.inner.evict(&cache_key); + self.file_store.evict(path).await; + return None; + } + }; + debug!("Cache hit from file: {cache_key}"); + ResponseBody::Streamed(stream_body) + } + CacheFileOrOnMemory::OnMemory(object) => { + debug!("Cache hit from on memory: {cache_key}"); + let mut hasher = Sha256::new(); + hasher.update(object.as_ref()); + let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref()); + if hash_bytes != cached_object.hash { + warn!("Hash mismatched. Cache object is corrupted"); + let _evicted_entry = self.inner.evict(&cache_key); + return None; + } + ResponseBody::Boxed(BoxBody::new(full(object))) + } + }; + Some(Response::from_parts(res_parts, response_body)) + } } /* ---------------------------------------------- */ @@ -202,7 +267,7 @@ impl FileStore { inner.cnt } /// Create a temporary file cache - async fn create(&mut self, ref cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> { + async fn create(&mut self, cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> { let mut inner = self.inner.write().await; inner.create(cache_object, body_bytes).await } @@ -214,14 +279,18 @@ impl FileStore { warn!("Eviction failed during file object removal: {:?}", e); }; } - // /// Read a temporary file cache - // async fn read(&self, path: impl AsRef) -> CacheResult { - // let inner = self.inner.read().await; - // inner.read(&path).await - // } + /// Read a temporary file cache + async fn read( + &self, + path: impl AsRef + Send + Sync + 'static, + hash: &Bytes, + ) -> CacheResult { + let inner = self.inner.read().await; + inner.read(path, hash).await + } } -#[derive(Debug)] +#[derive(Debug, Clone)] /// Manager inner for cache on file system struct FileStoreInner { /// Counter of current cached files @@ -264,26 +333,43 @@ impl FileStoreInner { } /// Retrieve a stored temporary file cache - async fn read(&self, path: impl AsRef) -> CacheResult { + async fn read( + &self, + path: impl AsRef + Send + Sync + 'static, + hash: &Bytes, + ) -> CacheResult { let Ok(mut file) = File::open(&path).await else { warn!("Cache file object cannot be opened"); return Err(CacheError::FailedToOpenCacheFile); }; + let hash_clone = hash.clone(); + let mut self_clone = self.clone(); let (body_tx, body_rx) = mpsc::unbounded::, hyper::Error>>(); self.runtime_handle.spawn(async move { - // let mut sender = body_sender; + let mut hasher = Sha256::new(); let mut buf = BytesMut::new(); loop { match file.read_buf(&mut buf).await { Ok(0) => break, - Ok(_) => body_tx - .unbounded_send(Ok(Frame::data(buf.copy_to_bytes(buf.remaining())))) - .map_err(|e| CacheError::FailedToSendFrameFromCache(e.to_string()))?, + Ok(_) => { + let bytes = buf.copy_to_bytes(buf.remaining()); + hasher.update(bytes.as_ref()); + body_tx + .unbounded_send(Ok(Frame::data(bytes))) + .map_err(|e| CacheError::FailedToSendFrameFromCache(e.to_string()))? + } Err(_) => break, }; } + let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref()); + if hash_bytes != hash_clone { + warn!("Hash mismatched. Cache object is corrupted. Force to remove the cache file."); + // only file can be evicted + let _evicted_entry = self_clone.remove(&path).await; + return Err(CacheError::HashMismatchedInCacheFile); + } Ok(()) as CacheResult<()> }); diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index c6f1ca9..c8a8ec7 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -47,12 +47,12 @@ where { let mut synth_req = None; if self.cache.is_some() { - // TODO: try reading from cache - // if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { - // // if found, return it as response. - // info!("Cache hit - Return from cache"); - // return Ok(cached_response); - // }; + // try reading from cache + if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { + // if found, return it as response. + info!("Cache hit - Return from cache"); + return Ok(cached_response); + }; // Synthetic request copy used just for caching (cannot clone request object...) synth_req = Some(build_synth_req_for_cache(&req));