From 7cfcd60243096c506781495f96eaf874e59c4c83 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Tue, 22 Aug 2023 21:15:34 +0900 Subject: [PATCH] refactor: cache manager wrapper to hide mutex lock --- rpxy-lib/src/handler/cache.rs | 127 ++++++++++++++++++++---------- rpxy-lib/src/handler/forwarder.rs | 1 - 2 files changed, 87 insertions(+), 41 deletions(-) diff --git a/rpxy-lib/src/handler/cache.rs b/rpxy-lib/src/handler/cache.rs index ee952e9..44cdc11 100644 --- a/rpxy-lib/src/handler/cache.rs +++ b/rpxy-lib/src/handler/cache.rs @@ -11,7 +11,10 @@ use sha2::{Digest, Sha256}; use std::{ fmt::Debug, path::{Path, PathBuf}, - sync::{Arc, Mutex}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, time::SystemTime, }; use tokio::{ @@ -147,6 +150,64 @@ impl CacheFileManager { let mut mgr = self.inner.write().await; mgr.create(cache_filename, body_bytes).await } + async fn count(&self) -> usize { + let mgr = self.inner.read().await; + mgr.cnt + } +} + +#[derive(Debug, Clone)] +/// Lru cache manager that is responsible to handle `Mutex` as an outer of `LruCache` +struct LruCacheManager { + inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + cnt: Arc, +} + +impl LruCacheManager { + /// Build LruCache + fn new(cache_max_entry: usize) -> Self { + Self { + inner: Arc::new(Mutex::new(LruCache::new( + std::num::NonZeroUsize::new(cache_max_entry).unwrap(), + ))), + cnt: Arc::new(AtomicUsize::default()), + } + } + /// Count entries + fn count(&self) -> usize { + self.cnt.load(Ordering::Relaxed) + } + /// Evict an entry + fn evict(&self, cache_key: &str) -> Option<(String, CacheObject)> { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked to evict a cache entry"); + return None; + }; + let res = lock.pop_entry(cache_key); + self.cnt.store(lock.len(), Ordering::Relaxed); + res + } + /// Get an entry + fn get(&self, cache_key: &str) -> Result> { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked for checking cache entry"); + return Err(RpxyError::Cache("Mutex can't be locked for checking cache entry")); + }; + let Some(cached_object) = lock.get(cache_key) else { + return Ok(None); + }; + Ok(Some(cached_object.clone())) + } + /// Push an entry + fn push(&self, cache_key: &str, cache_object: CacheObject) -> Result> { + let Ok(mut lock) = self.inner.lock() else { + error!("Failed to acquire mutex lock for writing cache entry"); + return Err(RpxyError::Cache("Failed to acquire mutex lock for writing cache entry")); + }; + let res = Ok(lock.push(cache_key.to_string(), cache_object)); + self.cnt.store(lock.len(), Ordering::Relaxed); + res + } } #[derive(Clone, Debug)] @@ -154,7 +215,7 @@ pub struct RpxyCache { /// Managing cache file objects through RwLock's lock mechanism for file lock cache_file_manager: CacheFileManager, /// Lru cache storing http message caching policy - inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + inner: LruCacheManager, /// Async runtime runtime_handle: tokio::runtime::Handle, /// Maximum size of each cache file object @@ -172,9 +233,7 @@ impl RpxyCache { let path = globals.proxy_config.cache_dir.as_ref().unwrap(); let cache_file_manager = CacheFileManager::new(path, &globals.runtime_handle).await; - let inner = Arc::new(Mutex::new(LruCache::new( - std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry).unwrap(), - ))); + let inner = LruCacheManager::new(globals.proxy_config.cache_max_entry); let max_each_size = globals.proxy_config.cache_max_each_size; let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory; @@ -194,29 +253,25 @@ impl RpxyCache { }) } - fn evict_cache_entry(&self, cache_key: &str) -> Option<(String, CacheObject)> { - let Ok(mut lock) = self.inner.lock() else { - error!("Mutex can't be locked to evict a cache entry"); - return None; - }; - lock.pop_entry(cache_key) + /// Count cache entries + pub async fn count(&self) -> (usize, usize, usize) { + let total = self.inner.count(); + let file = self.cache_file_manager.count().await; + let on_memory = total - file; + (total, on_memory, file) } /// Get cached response pub async fn get(&self, req: &Request) -> Option> { - debug!("Current cache entries: {:?}", self.inner); + debug!( + "Current cache status: (total, on-memory, file) = {:?}", + self.count().await + ); let cache_key = req.uri().to_string(); // First check cache chance - let cached_object = { - let Ok(mut lock) = self.inner.lock() else { - error!("Mutex can't be locked for checking cache entry"); - return None; - }; - let Some(cached_object) = lock.get(&cache_key) else { - return None; - }; - cached_object.clone() + let Ok(Some(cached_object)) = self.inner.get(&cache_key) else { + return None; }; // Secondly check the cache freshness as an HTTP message @@ -227,9 +282,9 @@ impl RpxyCache { // However, there is no guarantee that newly got objects will be still cacheable. // So, we have to evict stale cache entries and cache file objects if found. debug!("Stale cache entry: {cache_key}"); - let _evicted_entry = self.evict_cache_entry(&cache_key); + let _evicted_entry = self.inner.evict(&cache_key); // For cache file - if let CacheFileOrOnMemory::File(path) = cached_object.target { + if let CacheFileOrOnMemory::File(path) = &cached_object.target { self.cache_file_manager.evict(&path).await; } return None; @@ -242,7 +297,7 @@ impl RpxyCache { Ok(res_body) => res_body, Err(e) => { warn!("Failed to read from file cache: {e}"); - let _evicted_entry = self.evict_cache_entry(&cache_key); + let _evicted_entry = self.inner.evict(&cache_key); self.cache_file_manager.evict(&path).await; return None; } @@ -258,6 +313,7 @@ impl RpxyCache { } } + /// Put response into the cache pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> { let my_cache = self.inner.clone(); let mut mgr = self.cache_file_manager.clone(); @@ -273,16 +329,13 @@ impl RpxyCache { return Err(RpxyError::Cache("Too large to cache")); } let cache_key = derive_cache_key_from_uri(&uri); - let cache_filename = derive_filename_from_uri(&uri); - debug!("Cache file of {:?} bytes to be written", bytes_clone.len()); + debug!("Object of size {:?} bytes to be cached", bytes_clone.len()); let cache_object = if bytes_clone.len() > max_each_size_on_memory { - let Ok(target) = mgr.create(&cache_filename, &bytes_clone).await else { - error!("Failed to put the body into the file object or cache entry"); - return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry")); - }; - debug!("Cached a new file: {} - {}", cache_key, cache_filename); + let cache_filename = derive_filename_from_uri(&uri); + let target = mgr.create(&cache_filename, &bytes_clone).await?; + debug!("Cached a new cache file: {} - {}", cache_key, cache_filename); CacheObject { policy: policy_clone, target, @@ -294,14 +347,8 @@ impl RpxyCache { target: CacheFileOrOnMemory::OnMemory(bytes_clone.to_vec()), } }; - let push_opt = { - let Ok(mut lock) = my_cache.lock() else { - error!("Failed to acquire mutex lock for writing cache entry"); - return Err(RpxyError::Cache("Failed to acquire mutex lock for writing cache entry")); - }; - lock.push(cache_key.clone(), cache_object) - }; - if let Some((k, v)) = push_opt { + + if let Some((k, v)) = my_cache.push(&cache_key, cache_object)? { if k != cache_key { info!("Over the cache capacity. Evict least recent used entry"); if let CacheFileOrOnMemory::File(path) = v.target { @@ -338,7 +385,7 @@ where let new_policy = CachePolicy::new(req, res); if new_policy.is_storable() { - debug!("Response is cacheable: {:?}\n{:?}", req, res.headers()); + // debug!("Response is cacheable: {:?}\n{:?}", req, res.headers()); Ok(Some(new_policy)) } else { Ok(None) diff --git a/rpxy-lib/src/handler/forwarder.rs b/rpxy-lib/src/handler/forwarder.rs index ce49433..43cf098 100644 --- a/rpxy-lib/src/handler/forwarder.rs +++ b/rpxy-lib/src/handler/forwarder.rs @@ -87,7 +87,6 @@ where return res; }; let (parts, body) = res.unwrap().into_parts(); - // TODO: Inefficient? let Ok(mut bytes) = hyper::body::aggregate(body).await else { return Err(RpxyError::Cache("Failed to write byte buffer")); };