From 8dd6af6bc5d4dfb53959f7283f73689582ecab07 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Tue, 12 Dec 2023 22:15:34 +0900 Subject: [PATCH] wip: feat: refactored cache implementation for put --- rpxy-lib/src/forwarder/cache/cache_error.rs | 12 ++ rpxy-lib/src/forwarder/cache/cache_main.rs | 189 +++++++++++++------- rpxy-lib/src/forwarder/client.rs | 1 + 3 files changed, 142 insertions(+), 60 deletions(-) diff --git a/rpxy-lib/src/forwarder/cache/cache_error.rs b/rpxy-lib/src/forwarder/cache/cache_error.rs index bb2ffa6..5f6146a 100644 --- a/rpxy-lib/src/forwarder/cache/cache_error.rs +++ b/rpxy-lib/src/forwarder/cache/cache_error.rs @@ -15,6 +15,9 @@ pub enum CacheError { #[error("Failed to acquire mutex lock for cache")] FailedToAcquiredMutexLockForCache, + #[error("Failed to acquire mutex lock for check")] + FailedToAcquiredMutexLockForCheck, + #[error("Failed to create file cache")] FailedToCreateFileCache, @@ -32,4 +35,13 @@ pub enum CacheError { #[error("Failed to send frame to cache {0}")] FailedToSendFrameToCache(String), + + #[error("Failed to send frame from file cache {0}")] + FailedToSendFrameFromCache(String), + + #[error("Failed to remove cache file: {0}")] + FailedToRemoveCacheFile(String), + + #[error("Invalid cache target")] + InvalidCacheTarget, } diff --git a/rpxy-lib/src/forwarder/cache/cache_main.rs b/rpxy-lib/src/forwarder/cache/cache_main.rs index 659ac41..c16f1d6 100644 --- a/rpxy-lib/src/forwarder/cache/cache_main.rs +++ b/rpxy-lib/src/forwarder/cache/cache_main.rs @@ -1,14 +1,15 @@ use super::cache_error::*; use crate::{globals::Globals, hyper_ext::body::UnboundedStreamBody, log::*}; +use base64::{engine::general_purpose, Engine as _}; use bytes::{Buf, Bytes, BytesMut}; use futures::channel::mpsc; -use http::{Request, Response}; +use http::{Request, Response, Uri}; use http_body_util::{BodyExt, StreamBody}; use http_cache_semantics::CachePolicy; use hyper::body::{Body, Frame, Incoming}; use lru::LruCache; +use sha2::{Digest, Sha256}; use std::{ - convert::Infallible, path::{Path, PathBuf}, sync::{ atomic::{AtomicUsize, Ordering}, @@ -35,6 +36,8 @@ pub struct RpxyCache { max_each_size: usize, /// Maximum size of cache object on memory max_each_size_on_memory: usize, + /// Cache directory path + cache_dir: PathBuf, } impl RpxyCache { @@ -43,8 +46,8 @@ impl RpxyCache { if !globals.proxy_config.cache_enabled { return None; } - let path = globals.proxy_config.cache_dir.as_ref().unwrap(); - let file_store = FileStore::new(path, &globals.runtime_handle).await; + let cache_dir = globals.proxy_config.cache_dir.as_ref().unwrap(); + let file_store = FileStore::new(&globals.runtime_handle).await; let inner = LruCacheManager::new(globals.proxy_config.cache_max_entry); let max_each_size = globals.proxy_config.cache_max_each_size; @@ -56,12 +59,18 @@ impl RpxyCache { max_each_size_on_memory = max_each_size; } + if let Err(e) = fs::remove_dir_all(cache_dir).await { + warn!("Failed to clean up the cache dir: {e}"); + }; + fs::create_dir_all(&cache_dir).await.unwrap(); + Some(Self { file_store, inner, runtime_handle: globals.runtime_handle.clone(), max_each_size, max_each_size_on_memory, + cache_dir: cache_dir.clone(), }) } @@ -80,17 +89,20 @@ impl RpxyCache { mut body: Incoming, policy: &CachePolicy, ) -> CacheResult { - let my_cache = self.inner.clone(); + let cache_manager = self.inner.clone(); let mut file_store = self.file_store.clone(); let uri = uri.clone(); let policy_clone = policy.clone(); let max_each_size = self.max_each_size; let max_each_size_on_memory = self.max_each_size_on_memory; + let cache_dir = self.cache_dir.clone(); let (body_tx, body_rx) = mpsc::unbounded::, hyper::Error>>(); self.runtime_handle.spawn(async move { let mut size = 0usize; + let mut buf = BytesMut::new(); + loop { let frame = match body.frame().await { Some(frame) => frame, @@ -118,10 +130,9 @@ impl RpxyCache { .map(|f| { if f.is_data() { let data_bytes = f.data_ref().unwrap().clone(); - debug!("cache data bytes of {} bytes", data_bytes.len()) - // TODO: cache data bytes as file or on memory - // fileにするかmemoryにするかの判断はある程度までバッファしてやってという手を使うことになる。途中までキャッシュしたやつはどうするかとかいう判断も必要。 - // ファイルとObjectのbindをどうやってするか + debug!("cache data bytes of {} bytes", data_bytes.len()); + // We do not use stream-type buffering since it needs to lock file during operation. + buf.extend(data_bytes.as_ref()); } }) .map_err(|e| CacheError::FailedToCacheBytes(e.to_string()))?; @@ -132,6 +143,35 @@ impl RpxyCache { .map_err(|e| CacheError::FailedToSendFrameToCache(e.to_string()))?; } + let buf = buf.freeze(); + // Calculate hash of the cached data, after all data is received. + // In-operation calculation is possible but it blocks sending data. + let mut hasher = Sha256::new(); + hasher.update(buf.as_ref()); + let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref()); + debug!("Cached data: {} bytes, hash = {:?}", size, hash_bytes); + + // Create cache object + let cache_key = derive_cache_key_from_uri(&uri); + let cache_object = CacheObject { + policy: policy_clone, + target: CacheFileOrOnMemory::build(&cache_dir, &uri, &buf, max_each_size_on_memory), + hash: hash_bytes, + }; + + if let Some((k, v)) = cache_manager.push(&cache_key, &cache_object)? { + if k != cache_key { + info!("Over the cache capacity. Evict least recent used entry"); + if let CacheFileOrOnMemory::File(path) = v.target { + file_store.evict(&path).await; + } + } + } + // store cache object to file + if let CacheFileOrOnMemory::File(_) = cache_object.target { + file_store.create(&cache_object, &buf).await?; + } + Ok(()) as CacheResult<()> }); @@ -145,36 +185,35 @@ impl RpxyCache { #[derive(Debug, Clone)] /// Cache file manager outer that is responsible to handle `RwLock` struct FileStore { + /// Inner file store main object inner: Arc>, } impl FileStore { /// Build manager - async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { + async fn new(runtime_handle: &tokio::runtime::Handle) -> Self { Self { - inner: Arc::new(RwLock::new(FileStoreInner::new(path, runtime_handle).await)), + inner: Arc::new(RwLock::new(FileStoreInner::new(runtime_handle).await)), } } -} -impl FileStore { /// Count file cache entries async fn count(&self) -> usize { let inner = self.inner.read().await; inner.cnt } /// Create a temporary file cache - async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> CacheResult { + async fn create(&mut self, ref cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> { let mut inner = self.inner.write().await; - inner.create(cache_filename, body_bytes).await + inner.create(cache_object, body_bytes).await + } + /// Evict a temporary file cache + async fn evict(&self, path: impl AsRef) { + // Acquire the write lock + let mut inner = self.inner.write().await; + if let Err(e) = inner.remove(path).await { + warn!("Eviction failed during file object removal: {:?}", e); + }; } - // /// Evict a temporary file cache - // async fn evict(&self, path: impl AsRef) { - // // Acquire the write lock - // let mut inner = self.inner.write().await; - // if let Err(e) = inner.remove(path).await { - // warn!("Eviction failed during file object removal: {:?}", e); - // }; - // } // /// Read a temporary file cache // async fn read(&self, path: impl AsRef) -> CacheResult { // let inner = self.inner.read().await; @@ -185,8 +224,6 @@ impl FileStore { #[derive(Debug)] /// Manager inner for cache on file system struct FileStoreInner { - /// Directory of temporary files - cache_dir: PathBuf, /// Counter of current cached files cnt: usize, /// Async runtime @@ -197,22 +234,21 @@ impl FileStoreInner { /// Build new cache file manager. /// This first creates cache file dir if not exists, and cleans up the file inside the directory. /// TODO: Persistent cache is really difficult. `sqlite` or something like that is needed. - async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { - let path_buf = path.as_ref().to_path_buf(); - if let Err(e) = fs::remove_dir_all(path).await { - warn!("Failed to clean up the cache dir: {e}"); - }; - fs::create_dir_all(&path_buf).await.unwrap(); + async fn new(runtime_handle: &tokio::runtime::Handle) -> Self { Self { - cache_dir: path_buf.clone(), cnt: 0, runtime_handle: runtime_handle.clone(), } } /// Create a new temporary file cache - async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> CacheResult { - let cache_filepath = self.cache_dir.join(cache_filename); + async fn create(&mut self, cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> { + let cache_filepath = match cache_object.target { + CacheFileOrOnMemory::File(ref path) => path.clone(), + CacheFileOrOnMemory::OnMemory(_) => { + return Err(CacheError::InvalidCacheTarget); + } + }; let Ok(mut file) = File::create(&cache_filepath).await else { return Err(CacheError::FailedToCreateFileCache); }; @@ -224,50 +260,47 @@ impl FileStoreInner { }; } self.cnt += 1; - Ok(CacheFileOrOnMemory::File(cache_filepath)) + Ok(()) } /// Retrieve a stored temporary file cache - async fn read(&self, path: impl AsRef) -> CacheResult<()> { + async fn read(&self, path: impl AsRef) -> CacheResult { let Ok(mut file) = File::open(&path).await else { warn!("Cache file object cannot be opened"); return Err(CacheError::FailedToOpenCacheFile); }; - /* ----------------------------- */ - // PoC for streaming body - let (tx, rx) = mpsc::unbounded::, Infallible>>(); + let (body_tx, body_rx) = mpsc::unbounded::, hyper::Error>>(); - // let (body_sender, res_body) = Body::channel(); self.runtime_handle.spawn(async move { // let mut sender = body_sender; let mut buf = BytesMut::new(); loop { match file.read_buf(&mut buf).await { Ok(0) => break, - Ok(_) => tx - .unbounded_send(Ok(hyper::body::Frame::data(buf.copy_to_bytes(buf.remaining())))) - .map_err(|e| anyhow::anyhow!("Failed to read cache file: {e}"))?, - //sender.send_data(buf.copy_to_bytes(buf.remaining())).await?, + Ok(_) => body_tx + .unbounded_send(Ok(Frame::data(buf.copy_to_bytes(buf.remaining())))) + .map_err(|e| CacheError::FailedToSendFrameFromCache(e.to_string()))?, Err(_) => break, }; } - Ok(()) as anyhow::Result<()> + Ok(()) as CacheResult<()> }); - let mut rx = http_body_util::StreamBody::new(rx); - // TODO: 結局incominglikeなbodystreamを定義することになる。これだったらh3と合わせて自分で定義した方が良さそう。 - // typeが長すぎるのでwrapperを作った方がいい。 - // let response = Response::builder() - // .status(200) - // .header("content-type", "application/octet-stream") - // .body(rx) - // .unwrap(); + let stream_body = StreamBody::new(body_rx); - todo!() - /* ----------------------------- */ + Ok(stream_body) + } - // Ok(res_body) + /// Remove file + async fn remove(&mut self, path: impl AsRef) -> CacheResult<()> { + fs::remove_file(path.as_ref()) + .await + .map_err(|e| CacheError::FailedToRemoveCacheFile(e.to_string()))?; + self.cnt -= 1; + debug!("Removed a cache file at {:?} (file count: {})", path.as_ref(), self.cnt); + + Ok(()) } } @@ -279,7 +312,20 @@ pub enum CacheFileOrOnMemory { /// Pointer to the temporary cache file File(PathBuf), /// Cached body itself - OnMemory(Vec), + OnMemory(Bytes), +} + +impl CacheFileOrOnMemory { + /// Get cache object target + fn build(cache_dir: &Path, uri: &Uri, object: &Bytes, max_each_size_on_memory: usize) -> Self { + if object.len() > max_each_size_on_memory { + let cache_filename = derive_filename_from_uri(uri); + let cache_filepath = cache_dir.join(cache_filename); + CacheFileOrOnMemory::File(cache_filepath) + } else { + CacheFileOrOnMemory::OnMemory(object.clone()) + } + } } #[derive(Clone, Debug)] @@ -290,7 +336,7 @@ struct CacheObject { /// Cache target: on-memory object or temporary file pub target: CacheFileOrOnMemory, /// SHA256 hash of target to strongly bind the cache metadata (this object) and file target - pub hash: Vec, + pub hash: Bytes, } /* ---------------------------------------------- */ @@ -332,16 +378,28 @@ impl LruCacheManager { } /// Push an entry - fn push(&self, cache_key: &str, cache_object: CacheObject) -> CacheResult> { + fn push(&self, cache_key: &str, cache_object: &CacheObject) -> CacheResult> { let Ok(mut lock) = self.inner.lock() else { error!("Failed to acquire mutex lock for writing cache entry"); return Err(CacheError::FailedToAcquiredMutexLockForCache); }; - let res = Ok(lock.push(cache_key.to_string(), cache_object)); + let res = Ok(lock.push(cache_key.to_string(), cache_object.clone())); // This may be inconsistent with the actual number of entries self.cnt.store(lock.len(), Ordering::Relaxed); res } + + /// Get an entry + fn get(&self, cache_key: &str) -> CacheResult> { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked for checking cache entry"); + return Err(CacheError::FailedToAcquiredMutexLockForCheck); + }; + let Some(cached_object) = lock.get(cache_key) else { + return Ok(None); + }; + Ok(Some(cached_object.clone())) + } } /* ---------------------------------------------- */ @@ -366,3 +424,14 @@ pub fn get_policy_if_cacheable( Ok(None) } } + +fn derive_filename_from_uri(uri: &hyper::Uri) -> String { + let mut hasher = Sha256::new(); + hasher.update(uri.to_string()); + let digest = hasher.finalize(); + general_purpose::URL_SAFE_NO_PAD.encode(digest) +} + +fn derive_cache_key_from_uri(uri: &hyper::Uri) -> String { + uri.to_string() +} diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index 8d2e307..c6f1ca9 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -47,6 +47,7 @@ where { let mut synth_req = None; if self.cache.is_some() { + // TODO: try reading from cache // if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { // // if found, return it as response. // info!("Cache hit - Return from cache");