refactor: cache file manager wrapper to hide the rwlock operations

This commit is contained in:
Jun Kurihara 2023-08-22 18:45:14 +09:00
commit a7aac1a0d4
No known key found for this signature in database
GPG key ID: D992B3E3DE1DED23

View file

@ -21,41 +21,53 @@ use tokio::{
}; };
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
/// Cache target in hybrid manner of on-memory and file system
pub enum CacheFileOrOnMemory { pub enum CacheFileOrOnMemory {
/// Pointer to the temporary cache file
File(PathBuf), File(PathBuf),
/// Cached body itself
OnMemory(Vec<u8>), OnMemory(Vec<u8>),
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
/// Cache object definition
struct CacheObject { struct CacheObject {
/// Cache policy to determine if the stored cache can be used as a response to a new incoming request
pub policy: CachePolicy, pub policy: CachePolicy,
/// Cache target: on-memory object or temporary file
pub target: CacheFileOrOnMemory, pub target: CacheFileOrOnMemory,
} }
#[derive(Debug)] #[derive(Debug)]
struct CacheFileManager { /// Manager inner for cache on file system
struct CacheFileManagerInner {
/// Directory of temporary files
cache_dir: PathBuf, cache_dir: PathBuf,
/// Counter of current cached files
cnt: usize, cnt: usize,
/// Async runtime
runtime_handle: tokio::runtime::Handle, runtime_handle: tokio::runtime::Handle,
} }
impl CacheFileManager { impl CacheFileManagerInner {
async fn new(path: &PathBuf, runtime_handle: &tokio::runtime::Handle) -> Self { /// Build new cache file manager.
// Create cache file dir /// This first creates cache file dir if not exists, and cleans up the file inside the directory.
// Clean up the file dir before init /// TODO: Persistent cache is really difficult. `sqlite` or something like that is needed.
// TODO: Persistent cache is really difficult. maybe SQLite is needed. async fn new(path: impl AsRef<Path>, runtime_handle: &tokio::runtime::Handle) -> Self {
let path_buf = path.as_ref().to_path_buf();
if let Err(e) = fs::remove_dir_all(path).await { if let Err(e) = fs::remove_dir_all(path).await {
warn!("Failed to clean up the cache dir: {e}"); warn!("Failed to clean up the cache dir: {e}");
}; };
fs::create_dir_all(path).await.unwrap(); fs::create_dir_all(&path_buf).await.unwrap();
Self { Self {
cache_dir: path.clone(), cache_dir: path_buf.clone(),
cnt: 0, cnt: 0,
runtime_handle: runtime_handle.clone(), runtime_handle: runtime_handle.clone(),
} }
} }
async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes, policy: &CachePolicy) -> Result<CacheObject> { /// Create a new temporary file cache
async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> Result<CacheFileOrOnMemory> {
let cache_filepath = self.cache_dir.join(cache_filename); let cache_filepath = self.cache_dir.join(cache_filename);
let Ok(mut file) = File::create(&cache_filepath).await else { let Ok(mut file) = File::create(&cache_filepath).await else {
return Err(RpxyError::Cache("Failed to create file")); return Err(RpxyError::Cache("Failed to create file"));
@ -68,12 +80,10 @@ impl CacheFileManager {
}; };
} }
self.cnt += 1; self.cnt += 1;
Ok(CacheObject { Ok(CacheFileOrOnMemory::File(cache_filepath))
policy: policy.clone(),
target: CacheFileOrOnMemory::File(cache_filepath),
})
} }
/// Retrieve a stored temporary file cache
async fn read(&self, path: impl AsRef<Path>) -> Result<Body> { async fn read(&self, path: impl AsRef<Path>) -> Result<Body> {
let Ok(mut file) = File::open(&path).await else { let Ok(mut file) = File::open(&path).await else {
warn!("Cache file object cannot be opened"); warn!("Cache file object cannot be opened");
@ -96,6 +106,7 @@ impl CacheFileManager {
Ok(res_body) Ok(res_body)
} }
/// Remove file
async fn remove(&mut self, path: impl AsRef<Path>) -> Result<()> { async fn remove(&mut self, path: impl AsRef<Path>) -> Result<()> {
fs::remove_file(path.as_ref()).await?; fs::remove_file(path.as_ref()).await?;
self.cnt -= 1; self.cnt -= 1;
@ -105,10 +116,43 @@ impl CacheFileManager {
} }
} }
#[derive(Debug, Clone)]
/// Cache file manager outer that is responsible to handle `RwLock`
struct CacheFileManager {
inner: Arc<RwLock<CacheFileManagerInner>>,
}
impl CacheFileManager {
/// Build manager
async fn new(path: impl AsRef<Path>, runtime_handle: &tokio::runtime::Handle) -> Self {
Self {
inner: Arc::new(RwLock::new(CacheFileManagerInner::new(path, runtime_handle).await)),
}
}
/// Evict a temporary file cache
async fn evict(&self, path: impl AsRef<Path>) {
// Acquire the write lock
let mut inner = self.inner.write().await;
if let Err(e) = inner.remove(path).await {
warn!("Eviction failed during file object removal: {:?}", e);
};
}
/// Read a temporary file cache
async fn read(&self, path: impl AsRef<Path>) -> Result<Body> {
let mgr = self.inner.read().await;
mgr.read(&path).await
}
/// Create a temporary file cache
async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> Result<CacheFileOrOnMemory> {
let mut mgr = self.inner.write().await;
mgr.create(cache_filename, body_bytes).await
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RpxyCache { pub struct RpxyCache {
/// Managing cache file objects through RwLock's lock mechanism for file lock /// Managing cache file objects through RwLock's lock mechanism for file lock
cache_file_manager: Arc<RwLock<CacheFileManager>>, cache_file_manager: CacheFileManager,
/// Lru cache storing http message caching policy /// Lru cache storing http message caching policy
inner: Arc<Mutex<LruCache<String, CacheObject>>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう inner: Arc<Mutex<LruCache<String, CacheObject>>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう
/// Async runtime /// Async runtime
@ -127,7 +171,7 @@ impl RpxyCache {
} }
let path = globals.proxy_config.cache_dir.as_ref().unwrap(); let path = globals.proxy_config.cache_dir.as_ref().unwrap();
let cache_file_manager = Arc::new(RwLock::new(CacheFileManager::new(path, &globals.runtime_handle).await)); let cache_file_manager = CacheFileManager::new(path, &globals.runtime_handle).await;
let inner = Arc::new(Mutex::new(LruCache::new( let inner = Arc::new(Mutex::new(LruCache::new(
std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry).unwrap(), std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry).unwrap(),
))); )));
@ -158,14 +202,6 @@ impl RpxyCache {
lock.pop_entry(cache_key) lock.pop_entry(cache_key)
} }
async fn evict_cache_file(&self, filepath: impl AsRef<Path>) {
// Acquire the write lock
let mut mgr = self.cache_file_manager.write().await;
if let Err(e) = mgr.remove(filepath).await {
warn!("Eviction failed during file object removal: {:?}", e);
};
}
/// Get cached response /// Get cached response
pub async fn get<R>(&self, req: &Request<R>) -> Option<Response<Body>> { pub async fn get<R>(&self, req: &Request<R>) -> Option<Response<Body>> {
debug!("Current cache entries: {:?}", self.inner); debug!("Current cache entries: {:?}", self.inner);
@ -190,11 +226,11 @@ impl RpxyCache {
// This might be okay to keep as is since it would be updated later. // This might be okay to keep as is since it would be updated later.
// However, there is no guarantee that newly got objects will be still cacheable. // However, there is no guarantee that newly got objects will be still cacheable.
// So, we have to evict stale cache entries and cache file objects if found. // So, we have to evict stale cache entries and cache file objects if found.
debug!("Stale cache entry and file object: {cache_key}"); debug!("Stale cache entry: {cache_key}");
let _evicted_entry = self.evict_cache_entry(&cache_key); let _evicted_entry = self.evict_cache_entry(&cache_key);
// For cache file // For cache file
if let CacheFileOrOnMemory::File(path) = cached_object.target { if let CacheFileOrOnMemory::File(path) = cached_object.target {
self.evict_cache_file(&path).await; self.cache_file_manager.evict(&path).await;
} }
return None; return None;
}; };
@ -202,13 +238,12 @@ impl RpxyCache {
// Finally retrieve the file/on-memory object // Finally retrieve the file/on-memory object
match cached_object.target { match cached_object.target {
CacheFileOrOnMemory::File(path) => { CacheFileOrOnMemory::File(path) => {
let mgr = self.cache_file_manager.read().await; let res_body = match self.cache_file_manager.read(&path).await {
let res_body = match mgr.read(&path).await {
Ok(res_body) => res_body, Ok(res_body) => res_body,
Err(e) => { Err(e) => {
warn!("Failed to read from file cache: {e}"); warn!("Failed to read from file cache: {e}");
let _evicted_entry = self.evict_cache_entry(&cache_key); let _evicted_entry = self.evict_cache_entry(&cache_key);
self.evict_cache_file(&path).await; self.cache_file_manager.evict(&path).await;
return None; return None;
} }
}; };
@ -225,7 +260,7 @@ impl RpxyCache {
pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> { pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> {
let my_cache = self.inner.clone(); let my_cache = self.inner.clone();
let mgr = self.cache_file_manager.clone(); let mut mgr = self.cache_file_manager.clone();
let uri = uri.clone(); let uri = uri.clone();
let bytes_clone = body_bytes.clone(); let bytes_clone = body_bytes.clone();
let policy_clone = policy.clone(); let policy_clone = policy.clone();
@ -243,13 +278,15 @@ impl RpxyCache {
debug!("Cache file of {:?} bytes to be written", bytes_clone.len()); debug!("Cache file of {:?} bytes to be written", bytes_clone.len());
let cache_object = if bytes_clone.len() > max_each_size_on_memory { let cache_object = if bytes_clone.len() > max_each_size_on_memory {
let mut mgr = mgr.write().await; let Ok(target) = mgr.create(&cache_filename, &bytes_clone).await else {
let Ok(cache_object) = mgr.create(&cache_filename, &bytes_clone, &policy_clone).await else {
error!("Failed to put the body into the file object or cache entry"); error!("Failed to put the body into the file object or cache entry");
return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry")); return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry"));
}; };
debug!("Cached a new file: {} - {}", cache_key, cache_filename); debug!("Cached a new file: {} - {}", cache_key, cache_filename);
cache_object CacheObject {
policy: policy_clone,
target,
}
} else { } else {
debug!("Cached a new object on memory: {}", cache_key); debug!("Cached a new object on memory: {}", cache_key);
CacheObject { CacheObject {
@ -268,10 +305,7 @@ impl RpxyCache {
if k != cache_key { if k != cache_key {
info!("Over the cache capacity. Evict least recent used entry"); info!("Over the cache capacity. Evict least recent used entry");
if let CacheFileOrOnMemory::File(path) = v.target { if let CacheFileOrOnMemory::File(path) = v.target {
let mut mgr = mgr.write().await; mgr.evict(&path).await;
if let Err(e) = mgr.remove(&path).await {
warn!("Eviction failed during file object removal over the capacity: {:?}", e);
};
} }
} }
} }