wip: feat: refactored cache implementation for put

This commit is contained in:
Jun Kurihara 2023-12-12 22:15:34 +09:00
commit 8dd6af6bc5
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
3 changed files with 142 additions and 60 deletions

View file

@ -15,6 +15,9 @@ pub enum CacheError {
#[error("Failed to acquire mutex lock for cache")]
FailedToAcquiredMutexLockForCache,
#[error("Failed to acquire mutex lock for check")]
FailedToAcquiredMutexLockForCheck,
#[error("Failed to create file cache")]
FailedToCreateFileCache,
@ -32,4 +35,13 @@ pub enum CacheError {
#[error("Failed to send frame to cache {0}")]
FailedToSendFrameToCache(String),
#[error("Failed to send frame from file cache {0}")]
FailedToSendFrameFromCache(String),
#[error("Failed to remove cache file: {0}")]
FailedToRemoveCacheFile(String),
#[error("Invalid cache target")]
InvalidCacheTarget,
}

View file

@ -1,14 +1,15 @@
use super::cache_error::*;
use crate::{globals::Globals, hyper_ext::body::UnboundedStreamBody, log::*};
use base64::{engine::general_purpose, Engine as _};
use bytes::{Buf, Bytes, BytesMut};
use futures::channel::mpsc;
use http::{Request, Response};
use http::{Request, Response, Uri};
use http_body_util::{BodyExt, StreamBody};
use http_cache_semantics::CachePolicy;
use hyper::body::{Body, Frame, Incoming};
use lru::LruCache;
use sha2::{Digest, Sha256};
use std::{
convert::Infallible,
path::{Path, PathBuf},
sync::{
atomic::{AtomicUsize, Ordering},
@ -35,6 +36,8 @@ pub struct RpxyCache {
max_each_size: usize,
/// Maximum size of cache object on memory
max_each_size_on_memory: usize,
/// Cache directory path
cache_dir: PathBuf,
}
impl RpxyCache {
@ -43,8 +46,8 @@ impl RpxyCache {
if !globals.proxy_config.cache_enabled {
return None;
}
let path = globals.proxy_config.cache_dir.as_ref().unwrap();
let file_store = FileStore::new(path, &globals.runtime_handle).await;
let cache_dir = globals.proxy_config.cache_dir.as_ref().unwrap();
let file_store = FileStore::new(&globals.runtime_handle).await;
let inner = LruCacheManager::new(globals.proxy_config.cache_max_entry);
let max_each_size = globals.proxy_config.cache_max_each_size;
@ -56,12 +59,18 @@ impl RpxyCache {
max_each_size_on_memory = max_each_size;
}
if let Err(e) = fs::remove_dir_all(cache_dir).await {
warn!("Failed to clean up the cache dir: {e}");
};
fs::create_dir_all(&cache_dir).await.unwrap();
Some(Self {
file_store,
inner,
runtime_handle: globals.runtime_handle.clone(),
max_each_size,
max_each_size_on_memory,
cache_dir: cache_dir.clone(),
})
}
@ -80,17 +89,20 @@ impl RpxyCache {
mut body: Incoming,
policy: &CachePolicy,
) -> CacheResult<UnboundedStreamBody> {
let my_cache = self.inner.clone();
let cache_manager = self.inner.clone();
let mut file_store = self.file_store.clone();
let uri = uri.clone();
let policy_clone = policy.clone();
let max_each_size = self.max_each_size;
let max_each_size_on_memory = self.max_each_size_on_memory;
let cache_dir = self.cache_dir.clone();
let (body_tx, body_rx) = mpsc::unbounded::<Result<Frame<Bytes>, hyper::Error>>();
self.runtime_handle.spawn(async move {
let mut size = 0usize;
let mut buf = BytesMut::new();
loop {
let frame = match body.frame().await {
Some(frame) => frame,
@ -118,10 +130,9 @@ impl RpxyCache {
.map(|f| {
if f.is_data() {
let data_bytes = f.data_ref().unwrap().clone();
debug!("cache data bytes of {} bytes", data_bytes.len())
// TODO: cache data bytes as file or on memory
// fileにするかmemoryにするかの判断はある程度までバッファしてやってという手を使うことになる。途中までキャッシュしたやつはどうするかとかいう判断も必要。
// ファイルとObjectのbindをどうやってするか
debug!("cache data bytes of {} bytes", data_bytes.len());
// We do not use stream-type buffering since it needs to lock file during operation.
buf.extend(data_bytes.as_ref());
}
})
.map_err(|e| CacheError::FailedToCacheBytes(e.to_string()))?;
@ -132,6 +143,35 @@ impl RpxyCache {
.map_err(|e| CacheError::FailedToSendFrameToCache(e.to_string()))?;
}
let buf = buf.freeze();
// Calculate hash of the cached data, after all data is received.
// In-operation calculation is possible but it blocks sending data.
let mut hasher = Sha256::new();
hasher.update(buf.as_ref());
let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref());
debug!("Cached data: {} bytes, hash = {:?}", size, hash_bytes);
// Create cache object
let cache_key = derive_cache_key_from_uri(&uri);
let cache_object = CacheObject {
policy: policy_clone,
target: CacheFileOrOnMemory::build(&cache_dir, &uri, &buf, max_each_size_on_memory),
hash: hash_bytes,
};
if let Some((k, v)) = cache_manager.push(&cache_key, &cache_object)? {
if k != cache_key {
info!("Over the cache capacity. Evict least recent used entry");
if let CacheFileOrOnMemory::File(path) = v.target {
file_store.evict(&path).await;
}
}
}
// store cache object to file
if let CacheFileOrOnMemory::File(_) = cache_object.target {
file_store.create(&cache_object, &buf).await?;
}
Ok(()) as CacheResult<()>
});
@ -145,36 +185,35 @@ impl RpxyCache {
#[derive(Debug, Clone)]
/// Cache file manager outer that is responsible to handle `RwLock`
struct FileStore {
/// Inner file store main object
inner: Arc<RwLock<FileStoreInner>>,
}
impl FileStore {
/// Build manager
async fn new(path: impl AsRef<Path>, runtime_handle: &tokio::runtime::Handle) -> Self {
async fn new(runtime_handle: &tokio::runtime::Handle) -> Self {
Self {
inner: Arc::new(RwLock::new(FileStoreInner::new(path, runtime_handle).await)),
}
inner: Arc::new(RwLock::new(FileStoreInner::new(runtime_handle).await)),
}
}
impl FileStore {
/// Count file cache entries
async fn count(&self) -> usize {
let inner = self.inner.read().await;
inner.cnt
}
/// Create a temporary file cache
async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> CacheResult<CacheFileOrOnMemory> {
async fn create(&mut self, ref cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> {
let mut inner = self.inner.write().await;
inner.create(cache_filename, body_bytes).await
inner.create(cache_object, body_bytes).await
}
/// Evict a temporary file cache
async fn evict(&self, path: impl AsRef<Path>) {
// Acquire the write lock
let mut inner = self.inner.write().await;
if let Err(e) = inner.remove(path).await {
warn!("Eviction failed during file object removal: {:?}", e);
};
}
// /// Evict a temporary file cache
// async fn evict(&self, path: impl AsRef<Path>) {
// // Acquire the write lock
// let mut inner = self.inner.write().await;
// if let Err(e) = inner.remove(path).await {
// warn!("Eviction failed during file object removal: {:?}", e);
// };
// }
// /// Read a temporary file cache
// async fn read(&self, path: impl AsRef<Path>) -> CacheResult<Bytes> {
// let inner = self.inner.read().await;
@ -185,8 +224,6 @@ impl FileStore {
#[derive(Debug)]
/// Manager inner for cache on file system
struct FileStoreInner {
/// Directory of temporary files
cache_dir: PathBuf,
/// Counter of current cached files
cnt: usize,
/// Async runtime
@ -197,22 +234,21 @@ impl FileStoreInner {
/// Build new cache file manager.
/// This first creates cache file dir if not exists, and cleans up the file inside the directory.
/// TODO: Persistent cache is really difficult. `sqlite` or something like that is needed.
async fn new(path: impl AsRef<Path>, runtime_handle: &tokio::runtime::Handle) -> Self {
let path_buf = path.as_ref().to_path_buf();
if let Err(e) = fs::remove_dir_all(path).await {
warn!("Failed to clean up the cache dir: {e}");
};
fs::create_dir_all(&path_buf).await.unwrap();
async fn new(runtime_handle: &tokio::runtime::Handle) -> Self {
Self {
cache_dir: path_buf.clone(),
cnt: 0,
runtime_handle: runtime_handle.clone(),
}
}
/// Create a new temporary file cache
async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> CacheResult<CacheFileOrOnMemory> {
let cache_filepath = self.cache_dir.join(cache_filename);
async fn create(&mut self, cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> {
let cache_filepath = match cache_object.target {
CacheFileOrOnMemory::File(ref path) => path.clone(),
CacheFileOrOnMemory::OnMemory(_) => {
return Err(CacheError::InvalidCacheTarget);
}
};
let Ok(mut file) = File::create(&cache_filepath).await else {
return Err(CacheError::FailedToCreateFileCache);
};
@ -224,50 +260,47 @@ impl FileStoreInner {
};
}
self.cnt += 1;
Ok(CacheFileOrOnMemory::File(cache_filepath))
Ok(())
}
/// Retrieve a stored temporary file cache
async fn read(&self, path: impl AsRef<Path>) -> CacheResult<()> {
async fn read(&self, path: impl AsRef<Path>) -> CacheResult<UnboundedStreamBody> {
let Ok(mut file) = File::open(&path).await else {
warn!("Cache file object cannot be opened");
return Err(CacheError::FailedToOpenCacheFile);
};
/* ----------------------------- */
// PoC for streaming body
let (tx, rx) = mpsc::unbounded::<Result<hyper::body::Frame<bytes::Bytes>, Infallible>>();
let (body_tx, body_rx) = mpsc::unbounded::<Result<Frame<Bytes>, hyper::Error>>();
// let (body_sender, res_body) = Body::channel();
self.runtime_handle.spawn(async move {
// let mut sender = body_sender;
let mut buf = BytesMut::new();
loop {
match file.read_buf(&mut buf).await {
Ok(0) => break,
Ok(_) => tx
.unbounded_send(Ok(hyper::body::Frame::data(buf.copy_to_bytes(buf.remaining()))))
.map_err(|e| anyhow::anyhow!("Failed to read cache file: {e}"))?,
//sender.send_data(buf.copy_to_bytes(buf.remaining())).await?,
Ok(_) => body_tx
.unbounded_send(Ok(Frame::data(buf.copy_to_bytes(buf.remaining()))))
.map_err(|e| CacheError::FailedToSendFrameFromCache(e.to_string()))?,
Err(_) => break,
};
}
Ok(()) as anyhow::Result<()>
Ok(()) as CacheResult<()>
});
let mut rx = http_body_util::StreamBody::new(rx);
// TODO: 結局incominglikeなbodystreamを定義することになる。これだったらh3と合わせて自分で定義した方が良さそう。
// typeが長すぎるのでwrapperを作った方がいい。
// let response = Response::builder()
// .status(200)
// .header("content-type", "application/octet-stream")
// .body(rx)
// .unwrap();
let stream_body = StreamBody::new(body_rx);
todo!()
/* ----------------------------- */
Ok(stream_body)
}
// Ok(res_body)
/// Remove file
async fn remove(&mut self, path: impl AsRef<Path>) -> CacheResult<()> {
fs::remove_file(path.as_ref())
.await
.map_err(|e| CacheError::FailedToRemoveCacheFile(e.to_string()))?;
self.cnt -= 1;
debug!("Removed a cache file at {:?} (file count: {})", path.as_ref(), self.cnt);
Ok(())
}
}
@ -279,7 +312,20 @@ pub enum CacheFileOrOnMemory {
/// Pointer to the temporary cache file
File(PathBuf),
/// Cached body itself
OnMemory(Vec<u8>),
OnMemory(Bytes),
}
impl CacheFileOrOnMemory {
/// Get cache object target
fn build(cache_dir: &Path, uri: &Uri, object: &Bytes, max_each_size_on_memory: usize) -> Self {
if object.len() > max_each_size_on_memory {
let cache_filename = derive_filename_from_uri(uri);
let cache_filepath = cache_dir.join(cache_filename);
CacheFileOrOnMemory::File(cache_filepath)
} else {
CacheFileOrOnMemory::OnMemory(object.clone())
}
}
}
#[derive(Clone, Debug)]
@ -290,7 +336,7 @@ struct CacheObject {
/// Cache target: on-memory object or temporary file
pub target: CacheFileOrOnMemory,
/// SHA256 hash of target to strongly bind the cache metadata (this object) and file target
pub hash: Vec<u8>,
pub hash: Bytes,
}
/* ---------------------------------------------- */
@ -332,16 +378,28 @@ impl LruCacheManager {
}
/// Push an entry
fn push(&self, cache_key: &str, cache_object: CacheObject) -> CacheResult<Option<(String, CacheObject)>> {
fn push(&self, cache_key: &str, cache_object: &CacheObject) -> CacheResult<Option<(String, CacheObject)>> {
let Ok(mut lock) = self.inner.lock() else {
error!("Failed to acquire mutex lock for writing cache entry");
return Err(CacheError::FailedToAcquiredMutexLockForCache);
};
let res = Ok(lock.push(cache_key.to_string(), cache_object));
let res = Ok(lock.push(cache_key.to_string(), cache_object.clone()));
// This may be inconsistent with the actual number of entries
self.cnt.store(lock.len(), Ordering::Relaxed);
res
}
/// Get an entry
fn get(&self, cache_key: &str) -> CacheResult<Option<CacheObject>> {
let Ok(mut lock) = self.inner.lock() else {
error!("Mutex can't be locked for checking cache entry");
return Err(CacheError::FailedToAcquiredMutexLockForCheck);
};
let Some(cached_object) = lock.get(cache_key) else {
return Ok(None);
};
Ok(Some(cached_object.clone()))
}
}
/* ---------------------------------------------- */
@ -366,3 +424,14 @@ pub fn get_policy_if_cacheable<B1, B2>(
Ok(None)
}
}
fn derive_filename_from_uri(uri: &hyper::Uri) -> String {
let mut hasher = Sha256::new();
hasher.update(uri.to_string());
let digest = hasher.finalize();
general_purpose::URL_SAFE_NO_PAD.encode(digest)
}
fn derive_cache_key_from_uri(uri: &hyper::Uri) -> String {
uri.to_string()
}

View file

@ -47,6 +47,7 @@ where
{
let mut synth_req = None;
if self.cache.is_some() {
// TODO: try reading from cache
// if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await {
// // if found, return it as response.
// info!("Cache hit - Return from cache");