wip: feat: implemented cache

This commit is contained in:
Jun Kurihara 2023-12-12 22:50:24 +09:00
commit bd29c9dc1d
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
4 changed files with 113 additions and 23 deletions

View file

@ -3,10 +3,11 @@ mod load_balance;
mod upstream; mod upstream;
mod upstream_opts; mod upstream_opts;
// #[cfg(feature = "sticky-cookie")] #[cfg(feature = "sticky-cookie")]
// pub use self::load_balance::{StickyCookie, StickyCookieValue}; pub(crate) use self::load_balance::{StickyCookie, StickyCookieValue};
#[allow(unused)]
pub(crate) use self::{ pub(crate) use self::{
load_balance::{LoadBalance, LoadBalanceContext, StickyCookie, StickyCookieValue}, load_balance::{LoadBalance, LoadBalanceContext},
upstream::{PathManager, Upstream, UpstreamCandidates}, upstream::{PathManager, Upstream, UpstreamCandidates},
upstream_opts::UpstreamOption, upstream_opts::UpstreamOption,
}; };

View file

@ -44,4 +44,7 @@ pub enum CacheError {
#[error("Invalid cache target")] #[error("Invalid cache target")]
InvalidCacheTarget, InvalidCacheTarget,
#[error("Hash mismatched in cache file")]
HashMismatchedInCacheFile,
} }

View file

@ -1,12 +1,16 @@
use super::cache_error::*; use super::cache_error::*;
use crate::{globals::Globals, hyper_ext::body::UnboundedStreamBody, log::*}; use crate::{
globals::Globals,
hyper_ext::body::{full, BoxBody, ResponseBody, UnboundedStreamBody},
log::*,
};
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose, Engine as _};
use bytes::{Buf, Bytes, BytesMut}; use bytes::{Buf, Bytes, BytesMut};
use futures::channel::mpsc; use futures::channel::mpsc;
use http::{Request, Response, Uri}; use http::{Request, Response, Uri};
use http_body_util::{BodyExt, StreamBody}; use http_body_util::{BodyExt, StreamBody};
use http_cache_semantics::CachePolicy; use http_cache_semantics::CachePolicy;
use hyper::body::{Body, Frame, Incoming}; use hyper::body::{Frame, Incoming};
use lru::LruCache; use lru::LruCache;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::{ use std::{
@ -15,6 +19,7 @@ use std::{
atomic::{AtomicUsize, Ordering}, atomic::{AtomicUsize, Ordering},
Arc, Mutex, Arc, Mutex,
}, },
time::SystemTime,
}; };
use tokio::{ use tokio::{
fs::{self, File}, fs::{self, File},
@ -179,6 +184,66 @@ impl RpxyCache {
Ok(stream_body) Ok(stream_body)
} }
/// Get cached response
pub async fn get<R>(&self, req: &Request<R>) -> Option<Response<ResponseBody>> {
debug!(
"Current cache status: (total, on-memory, file) = {:?}",
self.count().await
);
let cache_key = derive_cache_key_from_uri(req.uri());
// First check cache chance
let Ok(Some(cached_object)) = self.inner.get(&cache_key) else {
return None;
};
// Secondly check the cache freshness as an HTTP message
let now = SystemTime::now();
let http_cache_semantics::BeforeRequest::Fresh(res_parts) = cached_object.policy.before_request(req, now) else {
// Evict stale cache entry.
// This might be okay to keep as is since it would be updated later.
// However, there is no guarantee that newly got objects will be still cacheable.
// So, we have to evict stale cache entries and cache file objects if found.
debug!("Stale cache entry: {cache_key}");
let _evicted_entry = self.inner.evict(&cache_key);
// For cache file
if let CacheFileOrOnMemory::File(path) = &cached_object.target {
self.file_store.evict(&path).await;
}
return None;
};
// Finally retrieve the file/on-memory object
let response_body = match cached_object.target {
CacheFileOrOnMemory::File(path) => {
let stream_body = match self.file_store.read(path.clone(), &cached_object.hash).await {
Ok(s) => s,
Err(e) => {
warn!("Failed to read from file cache: {e}");
let _evicted_entry = self.inner.evict(&cache_key);
self.file_store.evict(path).await;
return None;
}
};
debug!("Cache hit from file: {cache_key}");
ResponseBody::Streamed(stream_body)
}
CacheFileOrOnMemory::OnMemory(object) => {
debug!("Cache hit from on memory: {cache_key}");
let mut hasher = Sha256::new();
hasher.update(object.as_ref());
let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref());
if hash_bytes != cached_object.hash {
warn!("Hash mismatched. Cache object is corrupted");
let _evicted_entry = self.inner.evict(&cache_key);
return None;
}
ResponseBody::Boxed(BoxBody::new(full(object)))
}
};
Some(Response::from_parts(res_parts, response_body))
}
} }
/* ---------------------------------------------- */ /* ---------------------------------------------- */
@ -202,7 +267,7 @@ impl FileStore {
inner.cnt inner.cnt
} }
/// Create a temporary file cache /// Create a temporary file cache
async fn create(&mut self, ref cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> { async fn create(&mut self, cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.create(cache_object, body_bytes).await inner.create(cache_object, body_bytes).await
} }
@ -214,14 +279,18 @@ impl FileStore {
warn!("Eviction failed during file object removal: {:?}", e); warn!("Eviction failed during file object removal: {:?}", e);
}; };
} }
// /// Read a temporary file cache /// Read a temporary file cache
// async fn read(&self, path: impl AsRef<Path>) -> CacheResult<Bytes> { async fn read(
// let inner = self.inner.read().await; &self,
// inner.read(&path).await path: impl AsRef<Path> + Send + Sync + 'static,
// } hash: &Bytes,
) -> CacheResult<UnboundedStreamBody> {
let inner = self.inner.read().await;
inner.read(path, hash).await
}
} }
#[derive(Debug)] #[derive(Debug, Clone)]
/// Manager inner for cache on file system /// Manager inner for cache on file system
struct FileStoreInner { struct FileStoreInner {
/// Counter of current cached files /// Counter of current cached files
@ -264,26 +333,43 @@ impl FileStoreInner {
} }
/// Retrieve a stored temporary file cache /// Retrieve a stored temporary file cache
async fn read(&self, path: impl AsRef<Path>) -> CacheResult<UnboundedStreamBody> { async fn read(
&self,
path: impl AsRef<Path> + Send + Sync + 'static,
hash: &Bytes,
) -> CacheResult<UnboundedStreamBody> {
let Ok(mut file) = File::open(&path).await else { let Ok(mut file) = File::open(&path).await else {
warn!("Cache file object cannot be opened"); warn!("Cache file object cannot be opened");
return Err(CacheError::FailedToOpenCacheFile); return Err(CacheError::FailedToOpenCacheFile);
}; };
let hash_clone = hash.clone();
let mut self_clone = self.clone();
let (body_tx, body_rx) = mpsc::unbounded::<Result<Frame<Bytes>, hyper::Error>>(); let (body_tx, body_rx) = mpsc::unbounded::<Result<Frame<Bytes>, hyper::Error>>();
self.runtime_handle.spawn(async move { self.runtime_handle.spawn(async move {
// let mut sender = body_sender; let mut hasher = Sha256::new();
let mut buf = BytesMut::new(); let mut buf = BytesMut::new();
loop { loop {
match file.read_buf(&mut buf).await { match file.read_buf(&mut buf).await {
Ok(0) => break, Ok(0) => break,
Ok(_) => body_tx Ok(_) => {
.unbounded_send(Ok(Frame::data(buf.copy_to_bytes(buf.remaining())))) let bytes = buf.copy_to_bytes(buf.remaining());
.map_err(|e| CacheError::FailedToSendFrameFromCache(e.to_string()))?, hasher.update(bytes.as_ref());
body_tx
.unbounded_send(Ok(Frame::data(bytes)))
.map_err(|e| CacheError::FailedToSendFrameFromCache(e.to_string()))?
}
Err(_) => break, Err(_) => break,
}; };
} }
let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref());
if hash_bytes != hash_clone {
warn!("Hash mismatched. Cache object is corrupted. Force to remove the cache file.");
// only file can be evicted
let _evicted_entry = self_clone.remove(&path).await;
return Err(CacheError::HashMismatchedInCacheFile);
}
Ok(()) as CacheResult<()> Ok(()) as CacheResult<()>
}); });

View file

@ -47,12 +47,12 @@ where
{ {
let mut synth_req = None; let mut synth_req = None;
if self.cache.is_some() { if self.cache.is_some() {
// TODO: try reading from cache // try reading from cache
// if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await {
// // if found, return it as response. // if found, return it as response.
// info!("Cache hit - Return from cache"); info!("Cache hit - Return from cache");
// return Ok(cached_response); return Ok(cached_response);
// }; };
// Synthetic request copy used just for caching (cannot clone request object...) // Synthetic request copy used just for caching (cannot clone request object...)
synth_req = Some(build_synth_req_for_cache(&req)); synth_req = Some(build_synth_req_for_cache(&req));