From a7aac1a0d443ca43b37caf86e037029ce09ec9c0 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Tue, 22 Aug 2023 18:45:14 +0900 Subject: [PATCH 1/2] refactor: cache file manager wrapper to hide the rwlock operations --- rpxy-lib/src/handler/cache.rs | 106 ++++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 36 deletions(-) diff --git a/rpxy-lib/src/handler/cache.rs b/rpxy-lib/src/handler/cache.rs index 22ed51b..ee952e9 100644 --- a/rpxy-lib/src/handler/cache.rs +++ b/rpxy-lib/src/handler/cache.rs @@ -21,41 +21,53 @@ use tokio::{ }; #[derive(Clone, Debug)] +/// Cache target in hybrid manner of on-memory and file system pub enum CacheFileOrOnMemory { + /// Pointer to the temporary cache file File(PathBuf), + /// Cached body itself OnMemory(Vec), } #[derive(Clone, Debug)] +/// Cache object definition struct CacheObject { + /// Cache policy to determine if the stored cache can be used as a response to a new incoming request pub policy: CachePolicy, + /// Cache target: on-memory object or temporary file pub target: CacheFileOrOnMemory, } #[derive(Debug)] -struct CacheFileManager { +/// Manager inner for cache on file system +struct CacheFileManagerInner { + /// Directory of temporary files cache_dir: PathBuf, + /// Counter of current cached files cnt: usize, + /// Async runtime runtime_handle: tokio::runtime::Handle, } -impl CacheFileManager { - async fn new(path: &PathBuf, runtime_handle: &tokio::runtime::Handle) -> Self { - // Create cache file dir - // Clean up the file dir before init - // TODO: Persistent cache is really difficult. maybe SQLite is needed. +impl CacheFileManagerInner { + /// Build new cache file manager. + /// This first creates cache file dir if not exists, and cleans up the file inside the directory. + /// TODO: Persistent cache is really difficult. `sqlite` or something like that is needed. + async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { + let path_buf = path.as_ref().to_path_buf(); if let Err(e) = fs::remove_dir_all(path).await { warn!("Failed to clean up the cache dir: {e}"); }; - fs::create_dir_all(path).await.unwrap(); + fs::create_dir_all(&path_buf).await.unwrap(); Self { - cache_dir: path.clone(), + cache_dir: path_buf.clone(), cnt: 0, runtime_handle: runtime_handle.clone(), } } - async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes, policy: &CachePolicy) -> Result { + /// Create a new temporary file cache + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> Result { let cache_filepath = self.cache_dir.join(cache_filename); let Ok(mut file) = File::create(&cache_filepath).await else { return Err(RpxyError::Cache("Failed to create file")); @@ -68,12 +80,10 @@ impl CacheFileManager { }; } self.cnt += 1; - Ok(CacheObject { - policy: policy.clone(), - target: CacheFileOrOnMemory::File(cache_filepath), - }) + Ok(CacheFileOrOnMemory::File(cache_filepath)) } + /// Retrieve a stored temporary file cache async fn read(&self, path: impl AsRef) -> Result { let Ok(mut file) = File::open(&path).await else { warn!("Cache file object cannot be opened"); @@ -96,6 +106,7 @@ impl CacheFileManager { Ok(res_body) } + /// Remove file async fn remove(&mut self, path: impl AsRef) -> Result<()> { fs::remove_file(path.as_ref()).await?; self.cnt -= 1; @@ -105,10 +116,43 @@ impl CacheFileManager { } } +#[derive(Debug, Clone)] +/// Cache file manager outer that is responsible to handle `RwLock` +struct CacheFileManager { + inner: Arc>, +} + +impl CacheFileManager { + /// Build manager + async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { + Self { + inner: Arc::new(RwLock::new(CacheFileManagerInner::new(path, runtime_handle).await)), + } + } + /// Evict a temporary file cache + async fn evict(&self, path: impl AsRef) { + // Acquire the write lock + let mut inner = self.inner.write().await; + if let Err(e) = inner.remove(path).await { + warn!("Eviction failed during file object removal: {:?}", e); + }; + } + /// Read a temporary file cache + async fn read(&self, path: impl AsRef) -> Result { + let mgr = self.inner.read().await; + mgr.read(&path).await + } + /// Create a temporary file cache + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> Result { + let mut mgr = self.inner.write().await; + mgr.create(cache_filename, body_bytes).await + } +} + #[derive(Clone, Debug)] pub struct RpxyCache { /// Managing cache file objects through RwLock's lock mechanism for file lock - cache_file_manager: Arc>, + cache_file_manager: CacheFileManager, /// Lru cache storing http message caching policy inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう /// Async runtime @@ -127,7 +171,7 @@ impl RpxyCache { } let path = globals.proxy_config.cache_dir.as_ref().unwrap(); - let cache_file_manager = Arc::new(RwLock::new(CacheFileManager::new(path, &globals.runtime_handle).await)); + let cache_file_manager = CacheFileManager::new(path, &globals.runtime_handle).await; let inner = Arc::new(Mutex::new(LruCache::new( std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry).unwrap(), ))); @@ -158,14 +202,6 @@ impl RpxyCache { lock.pop_entry(cache_key) } - async fn evict_cache_file(&self, filepath: impl AsRef) { - // Acquire the write lock - let mut mgr = self.cache_file_manager.write().await; - if let Err(e) = mgr.remove(filepath).await { - warn!("Eviction failed during file object removal: {:?}", e); - }; - } - /// Get cached response pub async fn get(&self, req: &Request) -> Option> { debug!("Current cache entries: {:?}", self.inner); @@ -190,11 +226,11 @@ impl RpxyCache { // This might be okay to keep as is since it would be updated later. // However, there is no guarantee that newly got objects will be still cacheable. // So, we have to evict stale cache entries and cache file objects if found. - debug!("Stale cache entry and file object: {cache_key}"); + debug!("Stale cache entry: {cache_key}"); let _evicted_entry = self.evict_cache_entry(&cache_key); // For cache file if let CacheFileOrOnMemory::File(path) = cached_object.target { - self.evict_cache_file(&path).await; + self.cache_file_manager.evict(&path).await; } return None; }; @@ -202,13 +238,12 @@ impl RpxyCache { // Finally retrieve the file/on-memory object match cached_object.target { CacheFileOrOnMemory::File(path) => { - let mgr = self.cache_file_manager.read().await; - let res_body = match mgr.read(&path).await { + let res_body = match self.cache_file_manager.read(&path).await { Ok(res_body) => res_body, Err(e) => { warn!("Failed to read from file cache: {e}"); let _evicted_entry = self.evict_cache_entry(&cache_key); - self.evict_cache_file(&path).await; + self.cache_file_manager.evict(&path).await; return None; } }; @@ -225,7 +260,7 @@ impl RpxyCache { pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> { let my_cache = self.inner.clone(); - let mgr = self.cache_file_manager.clone(); + let mut mgr = self.cache_file_manager.clone(); let uri = uri.clone(); let bytes_clone = body_bytes.clone(); let policy_clone = policy.clone(); @@ -243,13 +278,15 @@ impl RpxyCache { debug!("Cache file of {:?} bytes to be written", bytes_clone.len()); let cache_object = if bytes_clone.len() > max_each_size_on_memory { - let mut mgr = mgr.write().await; - let Ok(cache_object) = mgr.create(&cache_filename, &bytes_clone, &policy_clone).await else { + let Ok(target) = mgr.create(&cache_filename, &bytes_clone).await else { error!("Failed to put the body into the file object or cache entry"); return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry")); }; debug!("Cached a new file: {} - {}", cache_key, cache_filename); - cache_object + CacheObject { + policy: policy_clone, + target, + } } else { debug!("Cached a new object on memory: {}", cache_key); CacheObject { @@ -268,10 +305,7 @@ impl RpxyCache { if k != cache_key { info!("Over the cache capacity. Evict least recent used entry"); if let CacheFileOrOnMemory::File(path) = v.target { - let mut mgr = mgr.write().await; - if let Err(e) = mgr.remove(&path).await { - warn!("Eviction failed during file object removal over the capacity: {:?}", e); - }; + mgr.evict(&path).await; } } } From 7cfcd60243096c506781495f96eaf874e59c4c83 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Tue, 22 Aug 2023 21:15:34 +0900 Subject: [PATCH 2/2] refactor: cache manager wrapper to hide mutex lock --- rpxy-lib/src/handler/cache.rs | 127 ++++++++++++++++++++---------- rpxy-lib/src/handler/forwarder.rs | 1 - 2 files changed, 87 insertions(+), 41 deletions(-) diff --git a/rpxy-lib/src/handler/cache.rs b/rpxy-lib/src/handler/cache.rs index ee952e9..44cdc11 100644 --- a/rpxy-lib/src/handler/cache.rs +++ b/rpxy-lib/src/handler/cache.rs @@ -11,7 +11,10 @@ use sha2::{Digest, Sha256}; use std::{ fmt::Debug, path::{Path, PathBuf}, - sync::{Arc, Mutex}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, time::SystemTime, }; use tokio::{ @@ -147,6 +150,64 @@ impl CacheFileManager { let mut mgr = self.inner.write().await; mgr.create(cache_filename, body_bytes).await } + async fn count(&self) -> usize { + let mgr = self.inner.read().await; + mgr.cnt + } +} + +#[derive(Debug, Clone)] +/// Lru cache manager that is responsible to handle `Mutex` as an outer of `LruCache` +struct LruCacheManager { + inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + cnt: Arc, +} + +impl LruCacheManager { + /// Build LruCache + fn new(cache_max_entry: usize) -> Self { + Self { + inner: Arc::new(Mutex::new(LruCache::new( + std::num::NonZeroUsize::new(cache_max_entry).unwrap(), + ))), + cnt: Arc::new(AtomicUsize::default()), + } + } + /// Count entries + fn count(&self) -> usize { + self.cnt.load(Ordering::Relaxed) + } + /// Evict an entry + fn evict(&self, cache_key: &str) -> Option<(String, CacheObject)> { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked to evict a cache entry"); + return None; + }; + let res = lock.pop_entry(cache_key); + self.cnt.store(lock.len(), Ordering::Relaxed); + res + } + /// Get an entry + fn get(&self, cache_key: &str) -> Result> { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked for checking cache entry"); + return Err(RpxyError::Cache("Mutex can't be locked for checking cache entry")); + }; + let Some(cached_object) = lock.get(cache_key) else { + return Ok(None); + }; + Ok(Some(cached_object.clone())) + } + /// Push an entry + fn push(&self, cache_key: &str, cache_object: CacheObject) -> Result> { + let Ok(mut lock) = self.inner.lock() else { + error!("Failed to acquire mutex lock for writing cache entry"); + return Err(RpxyError::Cache("Failed to acquire mutex lock for writing cache entry")); + }; + let res = Ok(lock.push(cache_key.to_string(), cache_object)); + self.cnt.store(lock.len(), Ordering::Relaxed); + res + } } #[derive(Clone, Debug)] @@ -154,7 +215,7 @@ pub struct RpxyCache { /// Managing cache file objects through RwLock's lock mechanism for file lock cache_file_manager: CacheFileManager, /// Lru cache storing http message caching policy - inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + inner: LruCacheManager, /// Async runtime runtime_handle: tokio::runtime::Handle, /// Maximum size of each cache file object @@ -172,9 +233,7 @@ impl RpxyCache { let path = globals.proxy_config.cache_dir.as_ref().unwrap(); let cache_file_manager = CacheFileManager::new(path, &globals.runtime_handle).await; - let inner = Arc::new(Mutex::new(LruCache::new( - std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry).unwrap(), - ))); + let inner = LruCacheManager::new(globals.proxy_config.cache_max_entry); let max_each_size = globals.proxy_config.cache_max_each_size; let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory; @@ -194,29 +253,25 @@ impl RpxyCache { }) } - fn evict_cache_entry(&self, cache_key: &str) -> Option<(String, CacheObject)> { - let Ok(mut lock) = self.inner.lock() else { - error!("Mutex can't be locked to evict a cache entry"); - return None; - }; - lock.pop_entry(cache_key) + /// Count cache entries + pub async fn count(&self) -> (usize, usize, usize) { + let total = self.inner.count(); + let file = self.cache_file_manager.count().await; + let on_memory = total - file; + (total, on_memory, file) } /// Get cached response pub async fn get(&self, req: &Request) -> Option> { - debug!("Current cache entries: {:?}", self.inner); + debug!( + "Current cache status: (total, on-memory, file) = {:?}", + self.count().await + ); let cache_key = req.uri().to_string(); // First check cache chance - let cached_object = { - let Ok(mut lock) = self.inner.lock() else { - error!("Mutex can't be locked for checking cache entry"); - return None; - }; - let Some(cached_object) = lock.get(&cache_key) else { - return None; - }; - cached_object.clone() + let Ok(Some(cached_object)) = self.inner.get(&cache_key) else { + return None; }; // Secondly check the cache freshness as an HTTP message @@ -227,9 +282,9 @@ impl RpxyCache { // However, there is no guarantee that newly got objects will be still cacheable. // So, we have to evict stale cache entries and cache file objects if found. debug!("Stale cache entry: {cache_key}"); - let _evicted_entry = self.evict_cache_entry(&cache_key); + let _evicted_entry = self.inner.evict(&cache_key); // For cache file - if let CacheFileOrOnMemory::File(path) = cached_object.target { + if let CacheFileOrOnMemory::File(path) = &cached_object.target { self.cache_file_manager.evict(&path).await; } return None; @@ -242,7 +297,7 @@ impl RpxyCache { Ok(res_body) => res_body, Err(e) => { warn!("Failed to read from file cache: {e}"); - let _evicted_entry = self.evict_cache_entry(&cache_key); + let _evicted_entry = self.inner.evict(&cache_key); self.cache_file_manager.evict(&path).await; return None; } @@ -258,6 +313,7 @@ impl RpxyCache { } } + /// Put response into the cache pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> { let my_cache = self.inner.clone(); let mut mgr = self.cache_file_manager.clone(); @@ -273,16 +329,13 @@ impl RpxyCache { return Err(RpxyError::Cache("Too large to cache")); } let cache_key = derive_cache_key_from_uri(&uri); - let cache_filename = derive_filename_from_uri(&uri); - debug!("Cache file of {:?} bytes to be written", bytes_clone.len()); + debug!("Object of size {:?} bytes to be cached", bytes_clone.len()); let cache_object = if bytes_clone.len() > max_each_size_on_memory { - let Ok(target) = mgr.create(&cache_filename, &bytes_clone).await else { - error!("Failed to put the body into the file object or cache entry"); - return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry")); - }; - debug!("Cached a new file: {} - {}", cache_key, cache_filename); + let cache_filename = derive_filename_from_uri(&uri); + let target = mgr.create(&cache_filename, &bytes_clone).await?; + debug!("Cached a new cache file: {} - {}", cache_key, cache_filename); CacheObject { policy: policy_clone, target, @@ -294,14 +347,8 @@ impl RpxyCache { target: CacheFileOrOnMemory::OnMemory(bytes_clone.to_vec()), } }; - let push_opt = { - let Ok(mut lock) = my_cache.lock() else { - error!("Failed to acquire mutex lock for writing cache entry"); - return Err(RpxyError::Cache("Failed to acquire mutex lock for writing cache entry")); - }; - lock.push(cache_key.clone(), cache_object) - }; - if let Some((k, v)) = push_opt { + + if let Some((k, v)) = my_cache.push(&cache_key, cache_object)? { if k != cache_key { info!("Over the cache capacity. Evict least recent used entry"); if let CacheFileOrOnMemory::File(path) = v.target { @@ -338,7 +385,7 @@ where let new_policy = CachePolicy::new(req, res); if new_policy.is_storable() { - debug!("Response is cacheable: {:?}\n{:?}", req, res.headers()); + // debug!("Response is cacheable: {:?}\n{:?}", req, res.headers()); Ok(Some(new_policy)) } else { Ok(None) diff --git a/rpxy-lib/src/handler/forwarder.rs b/rpxy-lib/src/handler/forwarder.rs index ce49433..43cf098 100644 --- a/rpxy-lib/src/handler/forwarder.rs +++ b/rpxy-lib/src/handler/forwarder.rs @@ -87,7 +87,6 @@ where return res; }; let (parts, body) = res.unwrap().into_parts(); - // TODO: Inefficient? let Ok(mut bytes) = hyper::body::aggregate(body).await else { return Err(RpxyError::Cache("Failed to write byte buffer")); };