From 07d3accb912a0a0aca72a41e47c09b64a8bcce6b Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Wed, 16 Aug 2023 23:04:04 +0900 Subject: [PATCH] feat: totally updated cache structure using lru crate instead of moka (i.e., using simpler crate) --- CHANGELOG.md | 1 + TODO.md | 5 +- config-example.toml | 4 +- rpxy-bin/src/config/toml.rs | 12 +- rpxy-bin/src/constants.rs | 8 + rpxy-lib/Cargo.toml | 3 +- rpxy-lib/src/constants.rs | 4 - rpxy-lib/src/globals.rs | 4 + rpxy-lib/src/handler/cache.rs | 234 +++++++++++++++--------------- rpxy-lib/src/handler/forwarder.rs | 14 +- 10 files changed, 157 insertions(+), 132 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ff5cd18..492db57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Improvement - Feat: Enabled `h2c` (HTTP/2 cleartext) requests to upstream app servers (in the previous versions, only HTTP/1.1 is allowed for cleartext requests) +- Feat: Initial implementation of caching feature using file + on memory cache. (Caveats: No persistance of the cache. Once config is updated, the cache is totally eliminated.) - Refactor: logs of minor improvements ### Bugfix diff --git a/TODO.md b/TODO.md index 8ec6d5d..78a364e 100644 --- a/TODO.md +++ b/TODO.md @@ -1,7 +1,10 @@ # TODO List - [Done in 0.6.0] But we need more sophistication on `Forwarder` struct. ~~Fix strategy for `h2c` requests on forwarded requests upstream. This needs to update forwarder definition. Also, maybe forwarder would have a cache corresponding to the following task.~~ -- [Try in v0.6.0] **Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55))** +- [Initial implementation in v0.6.0] ~~**Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55))**~~ Using `lru` crate might be inefficient in terms of the speed. Also, this cache feature should be a separated `feature` (But I think okay to be included in `default`). + - Consider more sophisticated architecture for cache + - Persistent cache (if possible). + - etc etc - Improvement of path matcher - More flexible option for rewriting path - Refactoring diff --git a/config-example.toml b/config-example.toml index 41d70e7..694ff7c 100644 --- a/config-example.toml +++ b/config-example.toml @@ -110,4 +110,6 @@ max_idle_timeout = 10 # secs. 0 represents an infinite timeout. # If this specified, file cache feature is enabled [experimental.cache] -cache_dir = './cache' # optional. default is "./cache" relative to the current working directory +cache_dir = './cache' # optional. default is "./cache" relative to the current working directory +max_cache_entry = 1000 # optional. default is 1k +max_cache_each_size = 65535 # optional. default is 64k diff --git a/rpxy-bin/src/config/toml.rs b/rpxy-bin/src/config/toml.rs index 60f13b3..984553a 100644 --- a/rpxy-bin/src/config/toml.rs +++ b/rpxy-bin/src/config/toml.rs @@ -36,6 +36,8 @@ pub struct Http3Option { #[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct CacheOption { pub cache_dir: Option, + pub max_cache_entry: Option, + pub max_cache_each_size: Option, } #[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] @@ -173,7 +175,15 @@ impl TryInto for &ConfigToml { proxy_config.cache_dir = match &cache_option.cache_dir { Some(cache_dir) => Some(PathBuf::from(cache_dir)), None => Some(PathBuf::from(CACHE_DIR)), - } + }; + proxy_config.cache_max_entry = match &cache_option.max_cache_entry { + Some(num) => Some(*num), + None => Some(MAX_CACHE_ENTRY), + }; + proxy_config.cache_max_each_size = match &cache_option.max_cache_each_size { + Some(num) => Some(*num), + None => Some(MAX_CACHE_EACH_SIZE), + }; } } diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs index a7e811a..54fa7dd 100644 --- a/rpxy-bin/src/constants.rs +++ b/rpxy-bin/src/constants.rs @@ -1,4 +1,12 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; pub const CONFIG_WATCH_DELAY_SECS: u32 = 20; + +// Cache directory pub const CACHE_DIR: &str = "./cache"; +// # of entries in cache +pub const MAX_CACHE_ENTRY: usize = 1_000; +// max size for each file in bytes +pub const MAX_CACHE_EACH_SIZE: usize = 65_535; + +// TODO: max cache size in total diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index 3b90659..7d48b44 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -75,8 +75,7 @@ s2n-quic-rustls = { path = "../submodules/s2n-quic/quic/s2n-quic-rustls/", optio # cache http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/" } -moka = { version = "0.11.3", features = ["future", "sync"] } -fs4 = { version = "0.6.6", features = ["tokio", "tokio-async"] } +lru = { version = "0.11.0" } # cookie handling for sticky cookie chrono = { version = "0.4.26", default-features = false, features = [ diff --git a/rpxy-lib/src/constants.rs b/rpxy-lib/src/constants.rs index 4abf403..b7b0bff 100644 --- a/rpxy-lib/src/constants.rs +++ b/rpxy-lib/src/constants.rs @@ -31,7 +31,3 @@ pub mod H3 { #[cfg(feature = "sticky-cookie")] /// For load-balancing with sticky cookie pub const STICKY_COOKIE_NAME: &str = "rpxy_srv_id"; - -pub const MAX_CACHE_ENTRY: u64 = 10_000; -// TODO: max cache size per entry -// TODO: max cache size in total diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index 9442507..8a782b6 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -55,6 +55,8 @@ pub struct ProxyConfig { pub cache_enabled: bool, pub cache_dir: Option, + pub cache_max_entry: Option, + pub cache_max_each_size: Option, // All need to make packet acceptor #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] @@ -93,6 +95,8 @@ impl Default for ProxyConfig { cache_enabled: false, cache_dir: None, + cache_max_entry: None, + cache_max_each_size: None, #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] http3: false, diff --git a/rpxy-lib/src/handler/cache.rs b/rpxy-lib/src/handler/cache.rs index 5e09efa..c676c45 100644 --- a/rpxy-lib/src/handler/cache.rs +++ b/rpxy-lib/src/handler/cache.rs @@ -1,18 +1,17 @@ -use crate::{constants::MAX_CACHE_ENTRY, error::*, globals::Globals, log::*, CryptoSource}; +use crate::{error::*, globals::Globals, log::*, CryptoSource}; use base64::{engine::general_purpose, Engine as _}; use bytes::{Buf, Bytes, BytesMut}; -use fs4::tokio::AsyncFileExt; use http_cache_semantics::CachePolicy; use hyper::{ http::{Request, Response}, Body, }; -use moka::future::Cache as MokaCache; +use lru::LruCache; use sha2::{Digest, Sha256}; use std::{ fmt::Debug, path::{Path, PathBuf}, - sync::Arc, + sync::{Arc, Mutex}, time::SystemTime, }; use tokio::{ @@ -21,34 +20,10 @@ use tokio::{ sync::RwLock, }; -// #[async_trait] -// pub trait CacheTarget { -// type TargetInput; -// type TargetOutput; -// type Error; -// /// Get target object from somewhere -// async fn get(&self) -> Self::TargetOutput; -// /// Write target object into somewhere -// async fn put(&self, taget: Self::TargetOutput) -> Result<(), Self::Error>; -// /// Remove target object from somewhere (when evicted self) -// async fn remove(&self) -> Result<(), Self::Error>; -// } - -fn derive_filename_from_uri(uri: &hyper::Uri) -> String { - let mut hasher = Sha256::new(); - hasher.update(uri.to_string()); - let digest = hasher.finalize(); - general_purpose::URL_SAFE_NO_PAD.encode(digest) -} - -fn derive_moka_key_from_uri(uri: &hyper::Uri) -> String { - uri.to_string() -} - #[derive(Clone, Debug)] struct CacheObject { pub policy: CachePolicy, - pub target: Option, + pub target: PathBuf, } #[derive(Debug)] @@ -74,16 +49,13 @@ impl CacheFileManager { } } - async fn write(&mut self, cache_filename: &str, body_bytes: &Bytes, policy: &CachePolicy) -> Result { + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes, policy: &CachePolicy) -> Result { let cache_filepath = self.cache_dir.join(cache_filename); let Ok(mut file) = File::create(&cache_filepath).await else { return Err(RpxyError::Cache("Failed to create file")); }; - // TODO: ここでちゃんと書けないパターンっぽい?あるいは書いた後消されるパターンが起きている模様。 - // evictしたときファイルは消えてentryが残ってるっぽい let mut bytes_clone = body_bytes.clone(); while bytes_clone.has_remaining() { - warn!("remaining {}", bytes_clone.remaining()); if let Err(e) = file.write_buf(&mut bytes_clone).await { error!("Failed to write file cache: {e}"); return Err(RpxyError::Cache("Failed to write file cache: {e}")); @@ -92,7 +64,7 @@ impl CacheFileManager { self.cnt += 1; Ok(CacheObject { policy: policy.clone(), - target: Some(cache_filepath), + target: cache_filepath, }) } @@ -131,10 +103,12 @@ impl CacheFileManager { pub struct RpxyCache { /// Managing cache file objects through RwLock's lock mechanism for file lock cache_file_manager: Arc>, - /// Moka's cache storing http message caching policy - inner: MokaCache, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + /// Lru cache storing http message caching policy + inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう /// Async runtime runtime_handle: tokio::runtime::Handle, + /// Maximum size of each cache file object + max_each_size: usize, } impl RpxyCache { @@ -146,120 +120,152 @@ impl RpxyCache { let path = globals.proxy_config.cache_dir.as_ref().unwrap(); let cache_file_manager = Arc::new(RwLock::new(CacheFileManager::new(path, &globals.runtime_handle).await)); - let mgr_clone = cache_file_manager.clone(); - - let runtime_handle = globals.runtime_handle.clone(); - let eviction_listener = move |k, v: CacheObject, cause| { - debug!("Cache entry is being evicted : {k} {:?}", cause); - runtime_handle.block_on(async { - if let Some(filepath) = v.target { - debug!("Evict file object: {k}"); - // Acquire the write lock - let mut mgr = mgr_clone.write().await; - if let Err(e) = mgr.remove(filepath).await { - warn!("Eviction failed during file object removal: {:?}", e); - }; - } - }) - }; + let inner = Arc::new(Mutex::new(LruCache::new( + std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry.unwrap()).unwrap(), + ))); Some(Self { cache_file_manager, - inner: MokaCache::builder() - .max_capacity(MAX_CACHE_ENTRY) - .eviction_listener_with_queued_delivery_mode(eviction_listener) - .build(), // TODO: make this configurable, and along with size + inner, runtime_handle: globals.runtime_handle.clone(), + max_each_size: globals.proxy_config.cache_max_each_size.unwrap(), }) } + fn evict_cache_entry(&self, cache_key: &str) -> Option<(String, CacheObject)> { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked to evict a cache entry"); + return None; + }; + lock.pop_entry(cache_key) + } + + async fn evict_cache_file(&self, filepath: impl AsRef) { + // Acquire the write lock + let mut mgr = self.cache_file_manager.write().await; + if let Err(e) = mgr.remove(filepath).await { + warn!("Eviction failed during file object removal: {:?}", e); + }; + } + /// Get cached response pub async fn get(&self, req: &Request) -> Option> { debug!("Current cache entries: {:?}", self.inner); - let moka_key = req.uri().to_string(); + let cache_key = req.uri().to_string(); // First check cache chance - let Some(cached_object) = self.inner.get(&moka_key) else { - return None; - }; - - let now = SystemTime::now(); - if let http_cache_semantics::BeforeRequest::Fresh(res_parts) = cached_object.policy.before_request(req, now) { - let Some(filepath) = cached_object.target else { + let cached_object = { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked for checking cache entry"); return None; }; - - let mgr = self.cache_file_manager.read().await; - let res_body = match mgr.read(&filepath).await { - Ok(res_body) => res_body, - Err(e) => { - warn!("Failed to read from cache: {e}"); - self.inner.invalidate(&moka_key).await; - return None; - } + let Some(cached_object) = lock.get(&cache_key) else { + return None; }; - debug!("Cache hit: {moka_key}"); + cached_object.clone() + }; - Some(Response::from_parts(res_parts, res_body)) - } else { + // Secondly check the cache freshness as an HTTP message + let now = SystemTime::now(); + let http_cache_semantics::BeforeRequest::Fresh(res_parts) = cached_object.policy.before_request(req, now) else { // Evict stale cache entry. // This might be okay to keep as is since it would be updated later. // However, there is no guarantee that newly got objects will be still cacheable. // So, we have to evict stale cache entries and cache file objects if found. - debug!("Stale cache entry and file object: {moka_key}"); - self.inner.invalidate(&moka_key).await; - // let my_cache = self.inner.clone(); - // self.runtime_handle.spawn(async move { - // eviction listener will be activated during invalidation. - // my_cache.invalidate(&moka_key).await; - // }); - None - } - } - - pub fn is_cacheable(&self, req: Option<&Request>, res: Option<&Response>) -> Result> - where - R: Debug, - { - // deduce cache policy from req and res - let (Some(req), Some(res)) = (req, res) else { - return Err(RpxyError::Cache("Invalid null request and/or response")); + debug!("Stale cache entry and file object: {cache_key}"); + let _evicted_entry = self.evict_cache_entry(&cache_key); + self.evict_cache_file(&cached_object.target).await; + return None; }; - let new_policy = CachePolicy::new(req, res); - if new_policy.is_storable() { - debug!("Response is cacheable: {:?}\n{:?}", req, res.headers()); - Ok(Some(new_policy)) - } else { - Ok(None) - } + // Finally retrieve the file object + let mgr = self.cache_file_manager.read().await; + let res_body = match mgr.read(&cached_object.target).await { + Ok(res_body) => res_body, + Err(e) => { + warn!("Failed to read from file cache: {e}"); + let _evicted_entry = self.evict_cache_entry(&cache_key); + self.evict_cache_file(&cached_object.target).await; + return None; + } + }; + + debug!("Cache hit: {cache_key}"); + Some(Response::from_parts(res_parts, res_body)) } pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> { let my_cache = self.inner.clone(); + let mgr = self.cache_file_manager.clone(); let uri = uri.clone(); let bytes_clone = body_bytes.clone(); let policy_clone = policy.clone(); - let mgr_clone = self.cache_file_manager.clone(); + let max_each_size = self.max_each_size; self.runtime_handle.spawn(async move { - let moka_key = derive_moka_key_from_uri(&uri); + if bytes_clone.len() > max_each_size { + warn!("Too large to cache"); + return Err(RpxyError::Cache("Too large to cache")); + } + let cache_key = derive_cache_key_from_uri(&uri); let cache_filename = derive_filename_from_uri(&uri); - warn!("{:?} bytes to be written", bytes_clone.len()); - if let Err(e) = my_cache - .try_get_with(moka_key, async { - let mut mgr = mgr_clone.write().await; - mgr.write(&cache_filename, &bytes_clone, &policy_clone).await - }) - .await - { - error!("Failed to put the body into the file object or cache entry: {e}"); - }; + debug!("Cache file of {:?} bytes to be written", bytes_clone.len()); - debug!("Current cache entries: {:?}", my_cache); + let mut mgr = mgr.write().await; + let Ok(cache_object) = mgr.create(&cache_filename, &bytes_clone, &policy_clone).await else { + error!("Failed to put the body into the file object or cache entry"); + return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry")); + }; + let push_opt = { + let Ok(mut lock) = my_cache.lock() else { + error!("Failed to acquire mutex lock for writing cache entry"); + return Err(RpxyError::Cache("Failed to acquire mutex lock for writing cache entry")); + }; + lock.push(cache_key.clone(), cache_object) + }; + if let Some((k, v)) = push_opt { + if k != cache_key { + info!("Over the cache capacity. Evict least recent used entry"); + if let Err(e) = mgr.remove(&v.target).await { + warn!("Eviction failed during file object removal over the capacity: {:?}", e); + }; + } + } + + debug!("Cached a new file: {} - {}", cache_key, cache_filename); + Ok(()) }); Ok(()) } } + +fn derive_filename_from_uri(uri: &hyper::Uri) -> String { + let mut hasher = Sha256::new(); + hasher.update(uri.to_string()); + let digest = hasher.finalize(); + general_purpose::URL_SAFE_NO_PAD.encode(digest) +} + +fn derive_cache_key_from_uri(uri: &hyper::Uri) -> String { + uri.to_string() +} + +pub fn get_policy_if_cacheable(req: Option<&Request>, res: Option<&Response>) -> Result> +where + R: Debug, +{ + // deduce cache policy from req and res + let (Some(req), Some(res)) = (req, res) else { + return Err(RpxyError::Cache("Invalid null request and/or response")); + }; + + let new_policy = CachePolicy::new(req, res); + if new_policy.is_storable() { + debug!("Response is cacheable: {:?}\n{:?}", req, res.headers()); + Ok(Some(new_policy)) + } else { + Ok(None) + } +} diff --git a/rpxy-lib/src/handler/forwarder.rs b/rpxy-lib/src/handler/forwarder.rs index 28fc263..ed480bf 100644 --- a/rpxy-lib/src/handler/forwarder.rs +++ b/rpxy-lib/src/handler/forwarder.rs @@ -1,4 +1,4 @@ -use super::cache::RpxyCache; +use super::cache::{get_policy_if_cacheable, RpxyCache}; use crate::{error::RpxyError, globals::Globals, log::*, CryptoSource}; use async_trait::async_trait; use bytes::Buf; @@ -55,7 +55,7 @@ where if self.cache.is_some() { if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { // if found, return it as response. - debug!("Cache hit - Return from cache"); + info!("Cache hit - Return from cache"); return Ok(cached_response); }; @@ -76,13 +76,9 @@ where } // check cacheability and store it if cacheable - let Ok(Some(cache_policy)) = self - .cache - .as_ref() - .unwrap() - .is_cacheable(synth_req.as_ref(), res.as_ref().ok()) else { - return res; - }; + let Ok(Some(cache_policy)) = get_policy_if_cacheable(synth_req.as_ref(), res.as_ref().ok()) else { + return res; + }; let (parts, body) = res.unwrap().into_parts(); // TODO: Inefficient? let Ok(mut bytes) = hyper::body::aggregate(body).await else {