feat: update implementation of cache. still unstable

This commit is contained in:
Jun Kurihara 2023-08-16 19:12:39 +09:00
commit cc6b78feb3
No known key found for this signature in database
GPG key ID: D992B3E3DE1DED23
7 changed files with 134 additions and 66 deletions

View file

@ -1,7 +1,7 @@
[workspace] [workspace]
members = ["rpxy-bin", "rpxy-lib"] members = ["rpxy-bin", "rpxy-lib"]
exclude = ["submodules", "h3-quinn"] exclude = ["submodules"]
[profile.release] [profile.release]
codegen-units = 1 codegen-units = 1

View file

@ -76,6 +76,7 @@ s2n-quic-rustls = { path = "../submodules/s2n-quic/quic/s2n-quic-rustls/", optio
# cache # cache
http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/" } http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/" }
moka = { version = "0.11.3", features = ["future", "sync"] } moka = { version = "0.11.3", features = ["future", "sync"] }
fs4 = { version = "0.6.6", features = ["tokio", "tokio-async"] }
# cookie handling for sticky cookie # cookie handling for sticky cookie
chrono = { version = "0.4.26", default-features = false, features = [ chrono = { version = "0.4.26", default-features = false, features = [

View file

@ -33,3 +33,5 @@ pub mod H3 {
pub const STICKY_COOKIE_NAME: &str = "rpxy_srv_id"; pub const STICKY_COOKIE_NAME: &str = "rpxy_srv_id";
pub const MAX_CACHE_ENTRY: u64 = 10_000; pub const MAX_CACHE_ENTRY: u64 = 10_000;
// TODO: max cache size per entry
// TODO: max cache size in total

View file

@ -1,6 +1,7 @@
use crate::{constants::MAX_CACHE_ENTRY, error::*, globals::Globals, log::*, CryptoSource}; use crate::{constants::MAX_CACHE_ENTRY, error::*, globals::Globals, log::*, CryptoSource};
use base64::{engine::general_purpose, Engine as _}; use base64::{engine::general_purpose, Engine as _};
use bytes::{Buf, Bytes, BytesMut}; use bytes::{Buf, Bytes, BytesMut};
use fs4::tokio::AsyncFileExt;
use http_cache_semantics::CachePolicy; use http_cache_semantics::CachePolicy;
use hyper::{ use hyper::{
http::{Request, Response}, http::{Request, Response},
@ -8,10 +9,16 @@ use hyper::{
}; };
use moka::future::Cache as MokaCache; use moka::future::Cache as MokaCache;
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::{fmt::Debug, path::PathBuf, time::SystemTime}; use std::{
fmt::Debug,
path::{Path, PathBuf},
sync::Arc,
time::SystemTime,
};
use tokio::{ use tokio::{
fs::{self, File}, fs::{self, File},
io::{AsyncReadExt, AsyncWriteExt}, io::{AsyncReadExt, AsyncWriteExt},
sync::RwLock,
}; };
// #[async_trait] // #[async_trait]
@ -39,15 +46,94 @@ fn derive_moka_key_from_uri(uri: &hyper::Uri) -> String {
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct CacheObject { struct CacheObject {
pub policy: CachePolicy, pub policy: CachePolicy,
pub target: Option<PathBuf>, pub target: Option<PathBuf>,
} }
#[derive(Debug)]
struct CacheFileManager {
cache_dir: PathBuf,
cnt: usize,
runtime_handle: tokio::runtime::Handle,
}
impl CacheFileManager {
async fn new(path: &PathBuf, runtime_handle: &tokio::runtime::Handle) -> Self {
// Create cache file dir
// Clean up the file dir before init
// TODO: Persistent cache is really difficult. maybe SQLite is needed.
if let Err(e) = fs::remove_dir_all(path).await {
warn!("Failed to clean up the cache dir: {e}");
};
fs::create_dir_all(path).await.unwrap();
Self {
cache_dir: path.clone(),
cnt: 0,
runtime_handle: runtime_handle.clone(),
}
}
async fn write(&mut self, cache_filename: &str, body_bytes: &Bytes, policy: &CachePolicy) -> Result<CacheObject> {
let cache_filepath = self.cache_dir.join(cache_filename);
let Ok(mut file) = File::create(&cache_filepath).await else {
return Err(RpxyError::Cache("Failed to create file"));
};
// TODO: ここでちゃんと書けないパターンっぽい?あるいは書いた後消されるパターンが起きている模様。
// evictしたときファイルは消えてentryが残ってるっぽい
let mut bytes_clone = body_bytes.clone();
while bytes_clone.has_remaining() {
warn!("remaining {}", bytes_clone.remaining());
if let Err(e) = file.write_buf(&mut bytes_clone).await {
error!("Failed to write file cache: {e}");
return Err(RpxyError::Cache("Failed to write file cache: {e}"));
};
}
self.cnt += 1;
Ok(CacheObject {
policy: policy.clone(),
target: Some(cache_filepath),
})
}
async fn read(&self, path: impl AsRef<Path>) -> Result<Body> {
let Ok(mut file) = File::open(&path).await else {
warn!("Cache file object cannot be opened");
return Err(RpxyError::Cache("Cache file object cannot be opened"));
};
let (body_sender, res_body) = Body::channel();
self.runtime_handle.spawn(async move {
let mut sender = body_sender;
let mut buf = BytesMut::new();
loop {
match file.read_buf(&mut buf).await {
Ok(0) => break,
Ok(_) => sender.send_data(buf.copy_to_bytes(buf.remaining())).await?,
Err(_) => break,
};
}
Ok(()) as Result<()>
});
Ok(res_body)
}
async fn remove(&mut self, path: impl AsRef<Path>) -> Result<()> {
fs::remove_file(path.as_ref()).await?;
self.cnt -= 1;
debug!("Removed a cache file at {:?} (file count: {})", path.as_ref(), self.cnt);
Ok(())
}
}
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct RpxyCache { pub struct RpxyCache {
cache_dir: PathBuf, /// Managing cache file objects through RwLock's lock mechanism for file lock
cache_file_manager: Arc<RwLock<CacheFileManager>>,
/// Moka's cache storing http message caching policy
inner: MokaCache<String, CacheObject>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう inner: MokaCache<String, CacheObject>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう
/// Async runtime
runtime_handle: tokio::runtime::Handle, runtime_handle: tokio::runtime::Handle,
} }
@ -57,41 +143,39 @@ impl RpxyCache {
if !globals.proxy_config.cache_enabled { if !globals.proxy_config.cache_enabled {
return None; return None;
} }
let path = globals.proxy_config.cache_dir.as_ref().unwrap();
let cache_file_manager = Arc::new(RwLock::new(CacheFileManager::new(path, &globals.runtime_handle).await));
let mgr_clone = cache_file_manager.clone();
let runtime_handle = globals.runtime_handle.clone(); let runtime_handle = globals.runtime_handle.clone();
let runtime_handle_clone = globals.runtime_handle.clone();
let eviction_listener = move |k, v: CacheObject, cause| { let eviction_listener = move |k, v: CacheObject, cause| {
debug!("Cache entry is being evicted : {k} {:?}", cause); debug!("Cache entry is being evicted : {k} {:?}", cause);
runtime_handle.block_on(async { runtime_handle.block_on(async {
if let Some(filepath) = v.target { if let Some(filepath) = v.target {
debug!("Evict file object: {k}"); debug!("Evict file object: {k}");
if let Err(e) = fs::remove_file(filepath).await { // Acquire the write lock
let mut mgr = mgr_clone.write().await;
if let Err(e) = mgr.remove(filepath).await {
warn!("Eviction failed during file object removal: {:?}", e); warn!("Eviction failed during file object removal: {:?}", e);
}; };
} }
}) })
}; };
// Create cache file dir
// Clean up the file dir before init
// TODO: Persistent cache is really difficult. maybe SQLite is needed.
let path = globals.proxy_config.cache_dir.as_ref().unwrap();
if let Err(e) = fs::remove_dir_all(path).await {
warn!("Failed to clean up the cache dir: {e}");
};
fs::create_dir_all(path).await.unwrap();
Some(Self { Some(Self {
cache_dir: path.clone(), cache_file_manager,
inner: MokaCache::builder() inner: MokaCache::builder()
.max_capacity(MAX_CACHE_ENTRY) .max_capacity(MAX_CACHE_ENTRY)
.eviction_listener_with_queued_delivery_mode(eviction_listener) .eviction_listener_with_queued_delivery_mode(eviction_listener)
.build(), // TODO: make this configurable, and along with size .build(), // TODO: make this configurable, and along with size
runtime_handle: runtime_handle_clone, runtime_handle: globals.runtime_handle.clone(),
}) })
} }
/// Get cached response /// Get cached response
pub async fn get<R>(&self, req: &Request<R>) -> Option<Response<Body>> { pub async fn get<R>(&self, req: &Request<R>) -> Option<Response<Body>> {
debug!("Current cache entries: {:?}", self.inner);
let moka_key = req.uri().to_string(); let moka_key = req.uri().to_string();
// First check cache chance // First check cache chance
@ -105,36 +189,24 @@ impl RpxyCache {
return None; return None;
}; };
let Ok(mut file) = File::open(&filepath.clone()).await else { let mgr = self.cache_file_manager.read().await;
warn!("Cache file object doesn't exist. Remove cache entry."); let res_body = match mgr.read(&filepath).await {
Ok(res_body) => res_body,
Err(e) => {
warn!("Failed to read from cache: {e}");
self.inner.invalidate(&moka_key).await; self.inner.invalidate(&moka_key).await;
// let my_cache = self.inner.clone();
// self.runtime_handle.spawn(async move {
// my_cache.invalidate(&moka_key).await;
// });
return None; return None;
};
let (body_sender, res_body) = Body::channel();
self.runtime_handle.spawn(async move {
let mut sender = body_sender;
// let mut size = 0usize;
let mut buf = BytesMut::new();
loop {
match file.read_buf(&mut buf).await {
Ok(0) => break,
Ok(_) => sender.send_data(buf.copy_to_bytes(buf.remaining())).await?,
Err(_) => break,
};
} }
Ok(()) as Result<()> };
});
let res = Response::from_parts(res_parts, res_body);
debug!("Cache hit: {moka_key}"); debug!("Cache hit: {moka_key}");
Some(res)
Some(Response::from_parts(res_parts, res_body))
} else { } else {
// Evict stale cache entry here // Evict stale cache entry.
debug!("Evict stale cache entry and file object: {moka_key}"); // This might be okay to keep as is since it would be updated later.
// However, there is no guarantee that newly got objects will be still cacheable.
// So, we have to evict stale cache entries and cache file objects if found.
debug!("Stale cache entry and file object: {moka_key}");
self.inner.invalidate(&moka_key).await; self.inner.invalidate(&moka_key).await;
// let my_cache = self.inner.clone(); // let my_cache = self.inner.clone();
// self.runtime_handle.spawn(async move { // self.runtime_handle.spawn(async move {
@ -163,34 +235,29 @@ impl RpxyCache {
} }
} }
pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: CachePolicy) -> Result<()> { pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> {
let my_cache = self.inner.clone(); let my_cache = self.inner.clone();
let uri = uri.clone(); let uri = uri.clone();
let cache_dir = self.cache_dir.clone(); let bytes_clone = body_bytes.clone();
let mut bytes_clone = body_bytes.clone(); let policy_clone = policy.clone();
let mgr_clone = self.cache_file_manager.clone();
self.runtime_handle.spawn(async move { self.runtime_handle.spawn(async move {
let moka_key = derive_moka_key_from_uri(&uri); let moka_key = derive_moka_key_from_uri(&uri);
let cache_filename = derive_filename_from_uri(&uri); let cache_filename = derive_filename_from_uri(&uri);
let cache_filepath = cache_dir.join(cache_filename);
let _x = my_cache warn!("{:?} bytes to be written", bytes_clone.len());
.get_with(moka_key, async { if let Err(e) = my_cache
let mut file = File::create(&cache_filepath).await.unwrap(); .try_get_with(moka_key, async {
while bytes_clone.has_remaining() { let mut mgr = mgr_clone.write().await;
if let Err(e) = file.write_buf(&mut bytes_clone).await { mgr.write(&cache_filename, &bytes_clone, &policy_clone).await
error!("Failed to write file cache: {e}");
return CacheObject { policy, target: None };
};
}
CacheObject {
policy,
target: Some(cache_filepath),
}
}) })
.await; .await
{
error!("Failed to put the body into the file object or cache entry: {e}");
};
debug!("Current cache entries: {}", my_cache.entry_count()); debug!("Current cache entries: {:?}", my_cache);
}); });
Ok(()) Ok(())

View file

@ -94,7 +94,7 @@ where
.cache .cache
.as_ref() .as_ref()
.unwrap() .unwrap()
.put(synth_req.unwrap().uri(), &aggregated, cache_policy) .put(synth_req.unwrap().uri(), &aggregated, &cache_policy)
.await .await
{ {
error!("{:?}", cache_err); error!("{:?}", cache_err);

View file

@ -7,7 +7,6 @@ mod utils_synth_response;
#[cfg(feature = "sticky-cookie")] #[cfg(feature = "sticky-cookie")]
use crate::backend::LbContext; use crate::backend::LbContext;
pub use cache::CacheObject;
pub use { pub use {
forwarder::Forwarder, forwarder::Forwarder,
handler_main::{HttpMessageHandler, HttpMessageHandlerBuilder, HttpMessageHandlerBuilderError}, handler_main::{HttpMessageHandler, HttpMessageHandlerBuilder, HttpMessageHandlerBuilderError},

View file

@ -22,7 +22,6 @@ use std::sync::Arc;
pub use crate::{ pub use crate::{
certs::{CertsAndKeys, CryptoSource}, certs::{CertsAndKeys, CryptoSource},
globals::{AppConfig, AppConfigList, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri}, globals::{AppConfig, AppConfigList, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri},
handler::CacheObject,
}; };
pub mod reexports { pub mod reexports {
pub use hyper::Uri; pub use hyper::Uri;