From bd29c9dc1da19918bc9e253b20435787a3346ff9 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Tue, 12 Dec 2023 22:50:24 +0900 Subject: [PATCH] wip: feat: implemented cache --- rpxy-lib/src/backend/mod.rs | 7 +- rpxy-lib/src/forwarder/cache/cache_error.rs | 3 + rpxy-lib/src/forwarder/cache/cache_main.rs | 114 +++++++++++++++++--- rpxy-lib/src/forwarder/client.rs | 12 +-- 4 files changed, 113 insertions(+), 23 deletions(-) diff --git a/rpxy-lib/src/backend/mod.rs b/rpxy-lib/src/backend/mod.rs index 788960d..097810a 100644 --- a/rpxy-lib/src/backend/mod.rs +++ b/rpxy-lib/src/backend/mod.rs @@ -3,10 +3,11 @@ mod load_balance; mod upstream; mod upstream_opts; -// #[cfg(feature = "sticky-cookie")] -// pub use self::load_balance::{StickyCookie, StickyCookieValue}; +#[cfg(feature = "sticky-cookie")] +pub(crate) use self::load_balance::{StickyCookie, StickyCookieValue}; +#[allow(unused)] pub(crate) use self::{ - load_balance::{LoadBalance, LoadBalanceContext, StickyCookie, StickyCookieValue}, + load_balance::{LoadBalance, LoadBalanceContext}, upstream::{PathManager, Upstream, UpstreamCandidates}, upstream_opts::UpstreamOption, }; diff --git a/rpxy-lib/src/forwarder/cache/cache_error.rs b/rpxy-lib/src/forwarder/cache/cache_error.rs index 5f6146a..35eae83 100644 --- a/rpxy-lib/src/forwarder/cache/cache_error.rs +++ b/rpxy-lib/src/forwarder/cache/cache_error.rs @@ -44,4 +44,7 @@ pub enum CacheError { #[error("Invalid cache target")] InvalidCacheTarget, + + #[error("Hash mismatched in cache file")] + HashMismatchedInCacheFile, } diff --git a/rpxy-lib/src/forwarder/cache/cache_main.rs b/rpxy-lib/src/forwarder/cache/cache_main.rs index c16f1d6..3c85d0c 100644 --- a/rpxy-lib/src/forwarder/cache/cache_main.rs +++ b/rpxy-lib/src/forwarder/cache/cache_main.rs @@ -1,12 +1,16 @@ use super::cache_error::*; -use crate::{globals::Globals, hyper_ext::body::UnboundedStreamBody, log::*}; +use crate::{ + globals::Globals, + hyper_ext::body::{full, BoxBody, ResponseBody, UnboundedStreamBody}, + log::*, +}; use base64::{engine::general_purpose, Engine as _}; use bytes::{Buf, Bytes, BytesMut}; use futures::channel::mpsc; use http::{Request, Response, Uri}; use http_body_util::{BodyExt, StreamBody}; use http_cache_semantics::CachePolicy; -use hyper::body::{Body, Frame, Incoming}; +use hyper::body::{Frame, Incoming}; use lru::LruCache; use sha2::{Digest, Sha256}; use std::{ @@ -15,6 +19,7 @@ use std::{ atomic::{AtomicUsize, Ordering}, Arc, Mutex, }, + time::SystemTime, }; use tokio::{ fs::{self, File}, @@ -179,6 +184,66 @@ impl RpxyCache { Ok(stream_body) } + + /// Get cached response + pub async fn get(&self, req: &Request) -> Option> { + debug!( + "Current cache status: (total, on-memory, file) = {:?}", + self.count().await + ); + let cache_key = derive_cache_key_from_uri(req.uri()); + + // First check cache chance + let Ok(Some(cached_object)) = self.inner.get(&cache_key) else { + return None; + }; + + // Secondly check the cache freshness as an HTTP message + let now = SystemTime::now(); + let http_cache_semantics::BeforeRequest::Fresh(res_parts) = cached_object.policy.before_request(req, now) else { + // Evict stale cache entry. + // This might be okay to keep as is since it would be updated later. + // However, there is no guarantee that newly got objects will be still cacheable. + // So, we have to evict stale cache entries and cache file objects if found. + debug!("Stale cache entry: {cache_key}"); + let _evicted_entry = self.inner.evict(&cache_key); + // For cache file + if let CacheFileOrOnMemory::File(path) = &cached_object.target { + self.file_store.evict(&path).await; + } + return None; + }; + + // Finally retrieve the file/on-memory object + let response_body = match cached_object.target { + CacheFileOrOnMemory::File(path) => { + let stream_body = match self.file_store.read(path.clone(), &cached_object.hash).await { + Ok(s) => s, + Err(e) => { + warn!("Failed to read from file cache: {e}"); + let _evicted_entry = self.inner.evict(&cache_key); + self.file_store.evict(path).await; + return None; + } + }; + debug!("Cache hit from file: {cache_key}"); + ResponseBody::Streamed(stream_body) + } + CacheFileOrOnMemory::OnMemory(object) => { + debug!("Cache hit from on memory: {cache_key}"); + let mut hasher = Sha256::new(); + hasher.update(object.as_ref()); + let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref()); + if hash_bytes != cached_object.hash { + warn!("Hash mismatched. Cache object is corrupted"); + let _evicted_entry = self.inner.evict(&cache_key); + return None; + } + ResponseBody::Boxed(BoxBody::new(full(object))) + } + }; + Some(Response::from_parts(res_parts, response_body)) + } } /* ---------------------------------------------- */ @@ -202,7 +267,7 @@ impl FileStore { inner.cnt } /// Create a temporary file cache - async fn create(&mut self, ref cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> { + async fn create(&mut self, cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> { let mut inner = self.inner.write().await; inner.create(cache_object, body_bytes).await } @@ -214,14 +279,18 @@ impl FileStore { warn!("Eviction failed during file object removal: {:?}", e); }; } - // /// Read a temporary file cache - // async fn read(&self, path: impl AsRef) -> CacheResult { - // let inner = self.inner.read().await; - // inner.read(&path).await - // } + /// Read a temporary file cache + async fn read( + &self, + path: impl AsRef + Send + Sync + 'static, + hash: &Bytes, + ) -> CacheResult { + let inner = self.inner.read().await; + inner.read(path, hash).await + } } -#[derive(Debug)] +#[derive(Debug, Clone)] /// Manager inner for cache on file system struct FileStoreInner { /// Counter of current cached files @@ -264,26 +333,43 @@ impl FileStoreInner { } /// Retrieve a stored temporary file cache - async fn read(&self, path: impl AsRef) -> CacheResult { + async fn read( + &self, + path: impl AsRef + Send + Sync + 'static, + hash: &Bytes, + ) -> CacheResult { let Ok(mut file) = File::open(&path).await else { warn!("Cache file object cannot be opened"); return Err(CacheError::FailedToOpenCacheFile); }; + let hash_clone = hash.clone(); + let mut self_clone = self.clone(); let (body_tx, body_rx) = mpsc::unbounded::, hyper::Error>>(); self.runtime_handle.spawn(async move { - // let mut sender = body_sender; + let mut hasher = Sha256::new(); let mut buf = BytesMut::new(); loop { match file.read_buf(&mut buf).await { Ok(0) => break, - Ok(_) => body_tx - .unbounded_send(Ok(Frame::data(buf.copy_to_bytes(buf.remaining())))) - .map_err(|e| CacheError::FailedToSendFrameFromCache(e.to_string()))?, + Ok(_) => { + let bytes = buf.copy_to_bytes(buf.remaining()); + hasher.update(bytes.as_ref()); + body_tx + .unbounded_send(Ok(Frame::data(bytes))) + .map_err(|e| CacheError::FailedToSendFrameFromCache(e.to_string()))? + } Err(_) => break, }; } + let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref()); + if hash_bytes != hash_clone { + warn!("Hash mismatched. Cache object is corrupted. Force to remove the cache file."); + // only file can be evicted + let _evicted_entry = self_clone.remove(&path).await; + return Err(CacheError::HashMismatchedInCacheFile); + } Ok(()) as CacheResult<()> }); diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index c6f1ca9..c8a8ec7 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -47,12 +47,12 @@ where { let mut synth_req = None; if self.cache.is_some() { - // TODO: try reading from cache - // if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { - // // if found, return it as response. - // info!("Cache hit - Return from cache"); - // return Ok(cached_response); - // }; + // try reading from cache + if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { + // if found, return it as response. + info!("Cache hit - Return from cache"); + return Ok(cached_response); + }; // Synthetic request copy used just for caching (cannot clone request object...) synth_req = Some(build_synth_req_for_cache(&req));