From a7aac1a0d443ca43b37caf86e037029ce09ec9c0 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Tue, 22 Aug 2023 18:45:14 +0900 Subject: [PATCH] refactor: cache file manager wrapper to hide the rwlock operations --- rpxy-lib/src/handler/cache.rs | 106 ++++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 36 deletions(-) diff --git a/rpxy-lib/src/handler/cache.rs b/rpxy-lib/src/handler/cache.rs index 22ed51b..ee952e9 100644 --- a/rpxy-lib/src/handler/cache.rs +++ b/rpxy-lib/src/handler/cache.rs @@ -21,41 +21,53 @@ use tokio::{ }; #[derive(Clone, Debug)] +/// Cache target in hybrid manner of on-memory and file system pub enum CacheFileOrOnMemory { + /// Pointer to the temporary cache file File(PathBuf), + /// Cached body itself OnMemory(Vec), } #[derive(Clone, Debug)] +/// Cache object definition struct CacheObject { + /// Cache policy to determine if the stored cache can be used as a response to a new incoming request pub policy: CachePolicy, + /// Cache target: on-memory object or temporary file pub target: CacheFileOrOnMemory, } #[derive(Debug)] -struct CacheFileManager { +/// Manager inner for cache on file system +struct CacheFileManagerInner { + /// Directory of temporary files cache_dir: PathBuf, + /// Counter of current cached files cnt: usize, + /// Async runtime runtime_handle: tokio::runtime::Handle, } -impl CacheFileManager { - async fn new(path: &PathBuf, runtime_handle: &tokio::runtime::Handle) -> Self { - // Create cache file dir - // Clean up the file dir before init - // TODO: Persistent cache is really difficult. maybe SQLite is needed. +impl CacheFileManagerInner { + /// Build new cache file manager. + /// This first creates cache file dir if not exists, and cleans up the file inside the directory. + /// TODO: Persistent cache is really difficult. `sqlite` or something like that is needed. + async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { + let path_buf = path.as_ref().to_path_buf(); if let Err(e) = fs::remove_dir_all(path).await { warn!("Failed to clean up the cache dir: {e}"); }; - fs::create_dir_all(path).await.unwrap(); + fs::create_dir_all(&path_buf).await.unwrap(); Self { - cache_dir: path.clone(), + cache_dir: path_buf.clone(), cnt: 0, runtime_handle: runtime_handle.clone(), } } - async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes, policy: &CachePolicy) -> Result { + /// Create a new temporary file cache + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> Result { let cache_filepath = self.cache_dir.join(cache_filename); let Ok(mut file) = File::create(&cache_filepath).await else { return Err(RpxyError::Cache("Failed to create file")); @@ -68,12 +80,10 @@ impl CacheFileManager { }; } self.cnt += 1; - Ok(CacheObject { - policy: policy.clone(), - target: CacheFileOrOnMemory::File(cache_filepath), - }) + Ok(CacheFileOrOnMemory::File(cache_filepath)) } + /// Retrieve a stored temporary file cache async fn read(&self, path: impl AsRef) -> Result { let Ok(mut file) = File::open(&path).await else { warn!("Cache file object cannot be opened"); @@ -96,6 +106,7 @@ impl CacheFileManager { Ok(res_body) } + /// Remove file async fn remove(&mut self, path: impl AsRef) -> Result<()> { fs::remove_file(path.as_ref()).await?; self.cnt -= 1; @@ -105,10 +116,43 @@ impl CacheFileManager { } } +#[derive(Debug, Clone)] +/// Cache file manager outer that is responsible to handle `RwLock` +struct CacheFileManager { + inner: Arc>, +} + +impl CacheFileManager { + /// Build manager + async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { + Self { + inner: Arc::new(RwLock::new(CacheFileManagerInner::new(path, runtime_handle).await)), + } + } + /// Evict a temporary file cache + async fn evict(&self, path: impl AsRef) { + // Acquire the write lock + let mut inner = self.inner.write().await; + if let Err(e) = inner.remove(path).await { + warn!("Eviction failed during file object removal: {:?}", e); + }; + } + /// Read a temporary file cache + async fn read(&self, path: impl AsRef) -> Result { + let mgr = self.inner.read().await; + mgr.read(&path).await + } + /// Create a temporary file cache + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> Result { + let mut mgr = self.inner.write().await; + mgr.create(cache_filename, body_bytes).await + } +} + #[derive(Clone, Debug)] pub struct RpxyCache { /// Managing cache file objects through RwLock's lock mechanism for file lock - cache_file_manager: Arc>, + cache_file_manager: CacheFileManager, /// Lru cache storing http message caching policy inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう /// Async runtime @@ -127,7 +171,7 @@ impl RpxyCache { } let path = globals.proxy_config.cache_dir.as_ref().unwrap(); - let cache_file_manager = Arc::new(RwLock::new(CacheFileManager::new(path, &globals.runtime_handle).await)); + let cache_file_manager = CacheFileManager::new(path, &globals.runtime_handle).await; let inner = Arc::new(Mutex::new(LruCache::new( std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry).unwrap(), ))); @@ -158,14 +202,6 @@ impl RpxyCache { lock.pop_entry(cache_key) } - async fn evict_cache_file(&self, filepath: impl AsRef) { - // Acquire the write lock - let mut mgr = self.cache_file_manager.write().await; - if let Err(e) = mgr.remove(filepath).await { - warn!("Eviction failed during file object removal: {:?}", e); - }; - } - /// Get cached response pub async fn get(&self, req: &Request) -> Option> { debug!("Current cache entries: {:?}", self.inner); @@ -190,11 +226,11 @@ impl RpxyCache { // This might be okay to keep as is since it would be updated later. // However, there is no guarantee that newly got objects will be still cacheable. // So, we have to evict stale cache entries and cache file objects if found. - debug!("Stale cache entry and file object: {cache_key}"); + debug!("Stale cache entry: {cache_key}"); let _evicted_entry = self.evict_cache_entry(&cache_key); // For cache file if let CacheFileOrOnMemory::File(path) = cached_object.target { - self.evict_cache_file(&path).await; + self.cache_file_manager.evict(&path).await; } return None; }; @@ -202,13 +238,12 @@ impl RpxyCache { // Finally retrieve the file/on-memory object match cached_object.target { CacheFileOrOnMemory::File(path) => { - let mgr = self.cache_file_manager.read().await; - let res_body = match mgr.read(&path).await { + let res_body = match self.cache_file_manager.read(&path).await { Ok(res_body) => res_body, Err(e) => { warn!("Failed to read from file cache: {e}"); let _evicted_entry = self.evict_cache_entry(&cache_key); - self.evict_cache_file(&path).await; + self.cache_file_manager.evict(&path).await; return None; } }; @@ -225,7 +260,7 @@ impl RpxyCache { pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> { let my_cache = self.inner.clone(); - let mgr = self.cache_file_manager.clone(); + let mut mgr = self.cache_file_manager.clone(); let uri = uri.clone(); let bytes_clone = body_bytes.clone(); let policy_clone = policy.clone(); @@ -243,13 +278,15 @@ impl RpxyCache { debug!("Cache file of {:?} bytes to be written", bytes_clone.len()); let cache_object = if bytes_clone.len() > max_each_size_on_memory { - let mut mgr = mgr.write().await; - let Ok(cache_object) = mgr.create(&cache_filename, &bytes_clone, &policy_clone).await else { + let Ok(target) = mgr.create(&cache_filename, &bytes_clone).await else { error!("Failed to put the body into the file object or cache entry"); return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry")); }; debug!("Cached a new file: {} - {}", cache_key, cache_filename); - cache_object + CacheObject { + policy: policy_clone, + target, + } } else { debug!("Cached a new object on memory: {}", cache_key); CacheObject { @@ -268,10 +305,7 @@ impl RpxyCache { if k != cache_key { info!("Over the cache capacity. Evict least recent used entry"); if let CacheFileOrOnMemory::File(path) = v.target { - let mut mgr = mgr.write().await; - if let Err(e) = mgr.remove(&path).await { - warn!("Eviction failed during file object removal over the capacity: {:?}", e); - }; + mgr.evict(&path).await; } } }