From f5197d08692e59795f5ecf2f00bc90c109ce60c7 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Sat, 9 Dec 2023 03:34:00 +0900 Subject: [PATCH] wip: refactoring the cache logic --- rpxy-lib/Cargo.toml | 8 +- rpxy-lib/src/error.rs | 9 ++ rpxy-lib/src/forwarder/cache.rs | 161 ++++++++++++++++++++++++++ rpxy-lib/src/forwarder/client.rs | 102 +++++++++++++--- rpxy-lib/src/forwarder/mod.rs | 1 + submodules/rusty-http-cache-semantics | 2 +- 6 files changed, 261 insertions(+), 22 deletions(-) create mode 100644 rpxy-lib/src/forwarder/cache.rs diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index 92449ab..fe715e0 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -24,7 +24,7 @@ http3-s2n = [ sticky-cookie = ["base64", "sha2", "chrono"] native-tls-backend = ["hyper-tls"] rustls-backend = [] -cache = [] #"http-cache-semantics", "lru"] +cache = ["http-cache-semantics", "lru"] native-roots = [] #"hyper-rustls/native-tokio"] [dependencies] @@ -66,7 +66,7 @@ hyper-tls = { version = "0.6.0", features = ["alpn"], optional = true } # tls and cert management for server hot_reload = "0.1.4" -rustls = { version = "0.21.9", default-features = false } +rustls = { version = "0.21.10", default-features = false } tokio-rustls = { version = "0.24.1", features = ["early-data"] } webpki = "0.22.4" x509-parser = "0.15.1" @@ -88,8 +88,8 @@ s2n-quic-rustls = { version = "0.32.0", optional = true } socket2 = { version = "0.5.5", features = ["all"], optional = true } # # cache -# http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } -# lru = { version = "0.12.1", optional = true } +http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } +lru = { version = "0.12.1", optional = true } # cookie handling for sticky cookie chrono = { version = "0.4.31", default-features = false, features = [ diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 843845d..4cbc463 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -84,6 +84,15 @@ pub enum RpxyError { #[error("Failed to fetch from upstream: {0}")] FailedToFetchFromUpstream(String), + // Cache errors, + #[cfg(feature = "cache")] + #[error("Invalid null request and/or response")] + NullRequestOrResponse, + + #[cfg(feature = "cache")] + #[error("Failed to write byte buffer")] + FailedToWriteByteBufferForCache, + // Upstream connection setting errors #[error("Unsupported upstream option")] UnsupportedUpstreamOption, diff --git a/rpxy-lib/src/forwarder/cache.rs b/rpxy-lib/src/forwarder/cache.rs new file mode 100644 index 0000000..73bed7b --- /dev/null +++ b/rpxy-lib/src/forwarder/cache.rs @@ -0,0 +1,161 @@ +use crate::{error::*, globals::Globals, log::*}; +use http::{Request, Response}; +use http_cache_semantics::CachePolicy; +use lru::LruCache; +use std::{ + path::{Path, PathBuf}, + sync::{atomic::AtomicUsize, Arc, Mutex}, +}; +use tokio::{fs, sync::RwLock}; + +/* ---------------------------------------------- */ +#[derive(Clone, Debug)] +pub struct RpxyCache { + /// Lru cache storing http message caching policy + inner: LruCacheManager, + /// Managing cache file objects through RwLock's lock mechanism for file lock + file_store: FileStore, + /// Async runtime + runtime_handle: tokio::runtime::Handle, + /// Maximum size of each cache file object + max_each_size: usize, + /// Maximum size of cache object on memory + max_each_size_on_memory: usize, +} + +impl RpxyCache { + /// Generate cache storage + pub async fn new(globals: &Globals) -> Option { + if !globals.proxy_config.cache_enabled { + return None; + } + let path = globals.proxy_config.cache_dir.as_ref().unwrap(); + let file_store = FileStore::new(path, &globals.runtime_handle).await; + let inner = LruCacheManager::new(globals.proxy_config.cache_max_entry); + + let max_each_size = globals.proxy_config.cache_max_each_size; + let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory; + if max_each_size < max_each_size_on_memory { + warn!( + "Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache" + ); + max_each_size_on_memory = max_each_size; + } + + Some(Self { + file_store, + inner, + runtime_handle: globals.runtime_handle.clone(), + max_each_size, + max_each_size_on_memory, + }) + } +} + +/* ---------------------------------------------- */ +#[derive(Debug, Clone)] +/// Cache file manager outer that is responsible to handle `RwLock` +struct FileStore { + inner: Arc>, +} +impl FileStore { + /// Build manager + async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { + Self { + inner: Arc::new(RwLock::new(FileStoreInner::new(path, runtime_handle).await)), + } + } +} + +#[derive(Debug)] +/// Manager inner for cache on file system +struct FileStoreInner { + /// Directory of temporary files + cache_dir: PathBuf, + /// Counter of current cached files + cnt: usize, + /// Async runtime + runtime_handle: tokio::runtime::Handle, +} + +impl FileStoreInner { + /// Build new cache file manager. + /// This first creates cache file dir if not exists, and cleans up the file inside the directory. + /// TODO: Persistent cache is really difficult. `sqlite` or something like that is needed. + async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { + let path_buf = path.as_ref().to_path_buf(); + if let Err(e) = fs::remove_dir_all(path).await { + warn!("Failed to clean up the cache dir: {e}"); + }; + fs::create_dir_all(&path_buf).await.unwrap(); + Self { + cache_dir: path_buf.clone(), + cnt: 0, + runtime_handle: runtime_handle.clone(), + } + } +} + +/* ---------------------------------------------- */ + +#[derive(Clone, Debug)] +/// Cache target in hybrid manner of on-memory and file system +pub enum CacheFileOrOnMemory { + /// Pointer to the temporary cache file + File(PathBuf), + /// Cached body itself + OnMemory(Vec), +} + +#[derive(Clone, Debug)] +/// Cache object definition +struct CacheObject { + /// Cache policy to determine if the stored cache can be used as a response to a new incoming request + pub policy: CachePolicy, + /// Cache target: on-memory object or temporary file + pub target: CacheFileOrOnMemory, + /// SHA256 hash of target to strongly bind the cache metadata (this object) and file target + pub hash: Vec, +} + +/* ---------------------------------------------- */ +#[derive(Debug, Clone)] +/// Lru cache manager that is responsible to handle `Mutex` as an outer of `LruCache` +struct LruCacheManager { + inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + cnt: Arc, +} + +impl LruCacheManager { + /// Build LruCache + fn new(cache_max_entry: usize) -> Self { + Self { + inner: Arc::new(Mutex::new(LruCache::new( + std::num::NonZeroUsize::new(cache_max_entry).unwrap(), + ))), + cnt: Arc::new(AtomicUsize::default()), + } + } +} + +/* ---------------------------------------------- */ +pub fn get_policy_if_cacheable( + req: Option<&Request>, + res: Option<&Response>, +) -> RpxyResult> +// where +// B1: core::fmt::Debug, +{ + // deduce cache policy from req and res + let (Some(req), Some(res)) = (req, res) else { + return Err(RpxyError::NullRequestOrResponse); + }; + + let new_policy = CachePolicy::new(req, res); + if new_policy.is_storable() { + // debug!("Response is cacheable: {:?}\n{:?}", req, res.headers()); + Ok(Some(new_policy)) + } else { + Ok(None) + } +} diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index 22c2320..820523e 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -9,13 +9,20 @@ use crate::{ }; use async_trait::async_trait; use http::{Request, Response, Version}; -use hyper::body::Body; +use hyper::body::{Body, Incoming}; use hyper_util::client::legacy::{ connect::{Connect, HttpConnector}, Client, }; use std::sync::Arc; +#[cfg(feature = "cache")] +use super::cache::{get_policy_if_cacheable, RpxyCache}; +#[cfg(feature = "cache")] +use crate::hyper_ext::body::{full, BoxBody}; +#[cfg(feature = "cache")] +use http_body_util::BodyExt; + #[async_trait] /// Definition of the forwarder that simply forward requests from downstream client to upstream app servers. pub trait ForwardRequest { @@ -25,27 +32,71 @@ pub trait ForwardRequest { /// Forwarder http client struct responsible to cache handling pub struct Forwarder { - // #[cfg(feature = "cache")] - // cache: Option, + #[cfg(feature = "cache")] + cache: Option, inner: Client, inner_h2: Client, // `h2c` or http/2-only client is defined separately } #[async_trait] -impl ForwardRequest> for Forwarder +impl ForwardRequest> for Forwarder where C: Send + Sync + Connect + Clone + 'static, B1: Body + Send + Sync + Unpin + 'static, ::Data: Send, ::Error: Into>, - B2: Body, { type Error = RpxyError; - async fn request(&self, req: Request) -> Result>, Self::Error> { + async fn request(&self, req: Request) -> Result>, Self::Error> { // TODO: cache handling + #[cfg(feature = "cache")] + { + let mut synth_req = None; + if self.cache.is_some() { + // if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { + // // if found, return it as response. + // info!("Cache hit - Return from cache"); + // return Ok(cached_response); + // }; - self.request_directly(req).await + // Synthetic request copy used just for caching (cannot clone request object...) + synth_req = Some(build_synth_req_for_cache(&req)); + } + let res = self.request_directly(req).await; + + if self.cache.is_none() { + return res.map(wrap_incoming_body_response::); + } + + // check cacheability and store it if cacheable + let Ok(Some(cache_policy)) = get_policy_if_cacheable(synth_req.as_ref(), res.as_ref().ok()) else { + return res.map(wrap_incoming_body_response::); + }; + let (parts, body) = res.unwrap().into_parts(); + let Ok(bytes) = body.collect().await.map(|v| v.to_bytes()) else { + return Err(RpxyError::FailedToWriteByteBufferForCache); + }; + + // if let Err(cache_err) = self + // .cache + // .as_ref() + // .unwrap() + // .put(synth_req.unwrap().uri(), &bytes, &cache_policy) + // .await + // { + // error!("{:?}", cache_err); + // }; + + // response with cached body + Ok(Response::from_parts(parts, IncomingOr::Right(full(bytes)))) + } + + // No cache handling + #[cfg(not(feature = "cache"))] + { + self.request_directly(req).await.map(wrap_incoming_body_response::) + } } } @@ -56,13 +107,15 @@ where ::Data: Send, ::Error: Into>, { - async fn request_directly(&self, req: Request) -> RpxyResult>> { + async fn request_directly(&self, req: Request) -> RpxyResult> { + // TODO: This 'match' condition is always evaluated at every 'request' invocation. So, it is inefficient. + // Needs to be reconsidered. Currently, this is a kind of work around. + // This possibly relates to https://github.com/hyperium/hyper/issues/2417. match req.version() { Version::HTTP_2 => self.inner_h2.request(req).await, // handles `h2c` requests _ => self.inner.request(req).await, } .map_err(|e| RpxyError::FailedToFetchFromUpstream(e.to_string())) - .map(wrap_incoming_body_response::) } } @@ -90,7 +143,9 @@ Please enable native-tls-backend or rustls-backend feature to enable TLS support Ok(Self { inner, - inner_h2: inner.clone(), + inner_h2, + #[cfg(feature = "cache")] + cache: RpxyCache::new(_globals).await, }) } } @@ -130,13 +185,12 @@ where .http2_only(true) .build::<_, B1>(connector_h2); - // #[cfg(feature = "cache")] - // { - // let cache = RpxyCache::new(_globals).await; - // Self { inner, inner_h2, cache } - // } - // #[cfg(not(feature = "cache"))] - Ok(Self { inner, inner_h2 }) + Ok(Self { + inner, + inner_h2, + #[cfg(feature = "cache")] + cache: RpxyCache::new(_globals).await, + }) } } @@ -172,3 +226,17 @@ where // let inner_h2 = Client::builder().http2_only(true).build::<_, Body>(connector_h2); } } + +#[cfg(feature = "cache")] +/// Build synthetic request to cache +fn build_synth_req_for_cache(req: &Request) -> Request<()> { + let mut builder = Request::builder() + .method(req.method()) + .uri(req.uri()) + .version(req.version()); + // TODO: omit extensions. is this approach correct? + for (header_key, header_value) in req.headers() { + builder = builder.header(header_key, header_value); + } + builder.body(()).unwrap() +} diff --git a/rpxy-lib/src/forwarder/mod.rs b/rpxy-lib/src/forwarder/mod.rs index 13d37eb..e901c7d 100644 --- a/rpxy-lib/src/forwarder/mod.rs +++ b/rpxy-lib/src/forwarder/mod.rs @@ -1,3 +1,4 @@ +mod cache; mod client; use crate::hyper_ext::body::{IncomingLike, IncomingOr}; diff --git a/submodules/rusty-http-cache-semantics b/submodules/rusty-http-cache-semantics index 3cd0917..88d23c2 160000 --- a/submodules/rusty-http-cache-semantics +++ b/submodules/rusty-http-cache-semantics @@ -1 +1 @@ -Subproject commit 3cd09170305753309d86e88b9427827cca0de0dd +Subproject commit 88d23c2f5a3ac36295dff4a804968c43932ba46b