refactor: cache manager wrapper to hide mutex lock

This commit is contained in:
Jun Kurihara 2023-08-22 21:15:34 +09:00
commit 7cfcd60243
No known key found for this signature in database
GPG key ID: D992B3E3DE1DED23
2 changed files with 87 additions and 41 deletions

View file

@ -11,7 +11,10 @@ use sha2::{Digest, Sha256};
use std::{ use std::{
fmt::Debug, fmt::Debug,
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{Arc, Mutex}, sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
time::SystemTime, time::SystemTime,
}; };
use tokio::{ use tokio::{
@ -147,6 +150,64 @@ impl CacheFileManager {
let mut mgr = self.inner.write().await; let mut mgr = self.inner.write().await;
mgr.create(cache_filename, body_bytes).await mgr.create(cache_filename, body_bytes).await
} }
async fn count(&self) -> usize {
let mgr = self.inner.read().await;
mgr.cnt
}
}
#[derive(Debug, Clone)]
/// Lru cache manager that is responsible to handle `Mutex` as an outer of `LruCache`
struct LruCacheManager {
inner: Arc<Mutex<LruCache<String, CacheObject>>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう
cnt: Arc<AtomicUsize>,
}
impl LruCacheManager {
/// Build LruCache
fn new(cache_max_entry: usize) -> Self {
Self {
inner: Arc::new(Mutex::new(LruCache::new(
std::num::NonZeroUsize::new(cache_max_entry).unwrap(),
))),
cnt: Arc::new(AtomicUsize::default()),
}
}
/// Count entries
fn count(&self) -> usize {
self.cnt.load(Ordering::Relaxed)
}
/// Evict an entry
fn evict(&self, cache_key: &str) -> Option<(String, CacheObject)> {
let Ok(mut lock) = self.inner.lock() else {
error!("Mutex can't be locked to evict a cache entry");
return None;
};
let res = lock.pop_entry(cache_key);
self.cnt.store(lock.len(), Ordering::Relaxed);
res
}
/// Get an entry
fn get(&self, cache_key: &str) -> Result<Option<CacheObject>> {
let Ok(mut lock) = self.inner.lock() else {
error!("Mutex can't be locked for checking cache entry");
return Err(RpxyError::Cache("Mutex can't be locked for checking cache entry"));
};
let Some(cached_object) = lock.get(cache_key) else {
return Ok(None);
};
Ok(Some(cached_object.clone()))
}
/// Push an entry
fn push(&self, cache_key: &str, cache_object: CacheObject) -> Result<Option<(String, CacheObject)>> {
let Ok(mut lock) = self.inner.lock() else {
error!("Failed to acquire mutex lock for writing cache entry");
return Err(RpxyError::Cache("Failed to acquire mutex lock for writing cache entry"));
};
let res = Ok(lock.push(cache_key.to_string(), cache_object));
self.cnt.store(lock.len(), Ordering::Relaxed);
res
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -154,7 +215,7 @@ pub struct RpxyCache {
/// Managing cache file objects through RwLock's lock mechanism for file lock /// Managing cache file objects through RwLock's lock mechanism for file lock
cache_file_manager: CacheFileManager, cache_file_manager: CacheFileManager,
/// Lru cache storing http message caching policy /// Lru cache storing http message caching policy
inner: Arc<Mutex<LruCache<String, CacheObject>>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう inner: LruCacheManager,
/// Async runtime /// Async runtime
runtime_handle: tokio::runtime::Handle, runtime_handle: tokio::runtime::Handle,
/// Maximum size of each cache file object /// Maximum size of each cache file object
@ -172,9 +233,7 @@ impl RpxyCache {
let path = globals.proxy_config.cache_dir.as_ref().unwrap(); let path = globals.proxy_config.cache_dir.as_ref().unwrap();
let cache_file_manager = CacheFileManager::new(path, &globals.runtime_handle).await; let cache_file_manager = CacheFileManager::new(path, &globals.runtime_handle).await;
let inner = Arc::new(Mutex::new(LruCache::new( let inner = LruCacheManager::new(globals.proxy_config.cache_max_entry);
std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry).unwrap(),
)));
let max_each_size = globals.proxy_config.cache_max_each_size; let max_each_size = globals.proxy_config.cache_max_each_size;
let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory; let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory;
@ -194,30 +253,26 @@ impl RpxyCache {
}) })
} }
fn evict_cache_entry(&self, cache_key: &str) -> Option<(String, CacheObject)> { /// Count cache entries
let Ok(mut lock) = self.inner.lock() else { pub async fn count(&self) -> (usize, usize, usize) {
error!("Mutex can't be locked to evict a cache entry"); let total = self.inner.count();
return None; let file = self.cache_file_manager.count().await;
}; let on_memory = total - file;
lock.pop_entry(cache_key) (total, on_memory, file)
} }
/// Get cached response /// Get cached response
pub async fn get<R>(&self, req: &Request<R>) -> Option<Response<Body>> { pub async fn get<R>(&self, req: &Request<R>) -> Option<Response<Body>> {
debug!("Current cache entries: {:?}", self.inner); debug!(
"Current cache status: (total, on-memory, file) = {:?}",
self.count().await
);
let cache_key = req.uri().to_string(); let cache_key = req.uri().to_string();
// First check cache chance // First check cache chance
let cached_object = { let Ok(Some(cached_object)) = self.inner.get(&cache_key) else {
let Ok(mut lock) = self.inner.lock() else {
error!("Mutex can't be locked for checking cache entry");
return None; return None;
}; };
let Some(cached_object) = lock.get(&cache_key) else {
return None;
};
cached_object.clone()
};
// Secondly check the cache freshness as an HTTP message // Secondly check the cache freshness as an HTTP message
let now = SystemTime::now(); let now = SystemTime::now();
@ -227,9 +282,9 @@ impl RpxyCache {
// However, there is no guarantee that newly got objects will be still cacheable. // However, there is no guarantee that newly got objects will be still cacheable.
// So, we have to evict stale cache entries and cache file objects if found. // So, we have to evict stale cache entries and cache file objects if found.
debug!("Stale cache entry: {cache_key}"); debug!("Stale cache entry: {cache_key}");
let _evicted_entry = self.evict_cache_entry(&cache_key); let _evicted_entry = self.inner.evict(&cache_key);
// For cache file // For cache file
if let CacheFileOrOnMemory::File(path) = cached_object.target { if let CacheFileOrOnMemory::File(path) = &cached_object.target {
self.cache_file_manager.evict(&path).await; self.cache_file_manager.evict(&path).await;
} }
return None; return None;
@ -242,7 +297,7 @@ impl RpxyCache {
Ok(res_body) => res_body, Ok(res_body) => res_body,
Err(e) => { Err(e) => {
warn!("Failed to read from file cache: {e}"); warn!("Failed to read from file cache: {e}");
let _evicted_entry = self.evict_cache_entry(&cache_key); let _evicted_entry = self.inner.evict(&cache_key);
self.cache_file_manager.evict(&path).await; self.cache_file_manager.evict(&path).await;
return None; return None;
} }
@ -258,6 +313,7 @@ impl RpxyCache {
} }
} }
/// Put response into the cache
pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> { pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> {
let my_cache = self.inner.clone(); let my_cache = self.inner.clone();
let mut mgr = self.cache_file_manager.clone(); let mut mgr = self.cache_file_manager.clone();
@ -273,16 +329,13 @@ impl RpxyCache {
return Err(RpxyError::Cache("Too large to cache")); return Err(RpxyError::Cache("Too large to cache"));
} }
let cache_key = derive_cache_key_from_uri(&uri); let cache_key = derive_cache_key_from_uri(&uri);
let cache_filename = derive_filename_from_uri(&uri);
debug!("Cache file of {:?} bytes to be written", bytes_clone.len()); debug!("Object of size {:?} bytes to be cached", bytes_clone.len());
let cache_object = if bytes_clone.len() > max_each_size_on_memory { let cache_object = if bytes_clone.len() > max_each_size_on_memory {
let Ok(target) = mgr.create(&cache_filename, &bytes_clone).await else { let cache_filename = derive_filename_from_uri(&uri);
error!("Failed to put the body into the file object or cache entry"); let target = mgr.create(&cache_filename, &bytes_clone).await?;
return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry")); debug!("Cached a new cache file: {} - {}", cache_key, cache_filename);
};
debug!("Cached a new file: {} - {}", cache_key, cache_filename);
CacheObject { CacheObject {
policy: policy_clone, policy: policy_clone,
target, target,
@ -294,14 +347,8 @@ impl RpxyCache {
target: CacheFileOrOnMemory::OnMemory(bytes_clone.to_vec()), target: CacheFileOrOnMemory::OnMemory(bytes_clone.to_vec()),
} }
}; };
let push_opt = {
let Ok(mut lock) = my_cache.lock() else { if let Some((k, v)) = my_cache.push(&cache_key, cache_object)? {
error!("Failed to acquire mutex lock for writing cache entry");
return Err(RpxyError::Cache("Failed to acquire mutex lock for writing cache entry"));
};
lock.push(cache_key.clone(), cache_object)
};
if let Some((k, v)) = push_opt {
if k != cache_key { if k != cache_key {
info!("Over the cache capacity. Evict least recent used entry"); info!("Over the cache capacity. Evict least recent used entry");
if let CacheFileOrOnMemory::File(path) = v.target { if let CacheFileOrOnMemory::File(path) = v.target {
@ -338,7 +385,7 @@ where
let new_policy = CachePolicy::new(req, res); let new_policy = CachePolicy::new(req, res);
if new_policy.is_storable() { if new_policy.is_storable() {
debug!("Response is cacheable: {:?}\n{:?}", req, res.headers()); // debug!("Response is cacheable: {:?}\n{:?}", req, res.headers());
Ok(Some(new_policy)) Ok(Some(new_policy))
} else { } else {
Ok(None) Ok(None)

View file

@ -87,7 +87,6 @@ where
return res; return res;
}; };
let (parts, body) = res.unwrap().into_parts(); let (parts, body) = res.unwrap().into_parts();
// TODO: Inefficient?
let Ok(mut bytes) = hyper::body::aggregate(body).await else { let Ok(mut bytes) = hyper::body::aggregate(body).await else {
return Err(RpxyError::Cache("Failed to write byte buffer")); return Err(RpxyError::Cache("Failed to write byte buffer"));
}; };