diff --git a/Cargo.toml b/Cargo.toml index 0a32c32..29e2277 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] members = ["rpxy-bin", "rpxy-lib"] -exclude = ["submodules", "h3-quinn"] +exclude = ["submodules"] [profile.release] codegen-units = 1 diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index 9cd6265..3b90659 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -76,6 +76,7 @@ s2n-quic-rustls = { path = "../submodules/s2n-quic/quic/s2n-quic-rustls/", optio # cache http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/" } moka = { version = "0.11.3", features = ["future", "sync"] } +fs4 = { version = "0.6.6", features = ["tokio", "tokio-async"] } # cookie handling for sticky cookie chrono = { version = "0.4.26", default-features = false, features = [ diff --git a/rpxy-lib/src/constants.rs b/rpxy-lib/src/constants.rs index 93b54bf..4abf403 100644 --- a/rpxy-lib/src/constants.rs +++ b/rpxy-lib/src/constants.rs @@ -33,3 +33,5 @@ pub mod H3 { pub const STICKY_COOKIE_NAME: &str = "rpxy_srv_id"; pub const MAX_CACHE_ENTRY: u64 = 10_000; +// TODO: max cache size per entry +// TODO: max cache size in total diff --git a/rpxy-lib/src/handler/cache.rs b/rpxy-lib/src/handler/cache.rs index 8bc14f8..5e09efa 100644 --- a/rpxy-lib/src/handler/cache.rs +++ b/rpxy-lib/src/handler/cache.rs @@ -1,6 +1,7 @@ use crate::{constants::MAX_CACHE_ENTRY, error::*, globals::Globals, log::*, CryptoSource}; use base64::{engine::general_purpose, Engine as _}; use bytes::{Buf, Bytes, BytesMut}; +use fs4::tokio::AsyncFileExt; use http_cache_semantics::CachePolicy; use hyper::{ http::{Request, Response}, @@ -8,10 +9,16 @@ use hyper::{ }; use moka::future::Cache as MokaCache; use sha2::{Digest, Sha256}; -use std::{fmt::Debug, path::PathBuf, time::SystemTime}; +use std::{ + fmt::Debug, + path::{Path, PathBuf}, + sync::Arc, + time::SystemTime, +}; use tokio::{ fs::{self, File}, io::{AsyncReadExt, AsyncWriteExt}, + sync::RwLock, }; // #[async_trait] @@ -39,15 +46,94 @@ fn derive_moka_key_from_uri(uri: &hyper::Uri) -> String { } #[derive(Clone, Debug)] -pub struct CacheObject { +struct CacheObject { pub policy: CachePolicy, pub target: Option, } +#[derive(Debug)] +struct CacheFileManager { + cache_dir: PathBuf, + cnt: usize, + runtime_handle: tokio::runtime::Handle, +} + +impl CacheFileManager { + async fn new(path: &PathBuf, runtime_handle: &tokio::runtime::Handle) -> Self { + // Create cache file dir + // Clean up the file dir before init + // TODO: Persistent cache is really difficult. maybe SQLite is needed. + if let Err(e) = fs::remove_dir_all(path).await { + warn!("Failed to clean up the cache dir: {e}"); + }; + fs::create_dir_all(path).await.unwrap(); + Self { + cache_dir: path.clone(), + cnt: 0, + runtime_handle: runtime_handle.clone(), + } + } + + async fn write(&mut self, cache_filename: &str, body_bytes: &Bytes, policy: &CachePolicy) -> Result { + let cache_filepath = self.cache_dir.join(cache_filename); + let Ok(mut file) = File::create(&cache_filepath).await else { + return Err(RpxyError::Cache("Failed to create file")); + }; + // TODO: ここでちゃんと書けないパターンっぽい?あるいは書いた後消されるパターンが起きている模様。 + // evictしたときファイルは消えてentryが残ってるっぽい + let mut bytes_clone = body_bytes.clone(); + while bytes_clone.has_remaining() { + warn!("remaining {}", bytes_clone.remaining()); + if let Err(e) = file.write_buf(&mut bytes_clone).await { + error!("Failed to write file cache: {e}"); + return Err(RpxyError::Cache("Failed to write file cache: {e}")); + }; + } + self.cnt += 1; + Ok(CacheObject { + policy: policy.clone(), + target: Some(cache_filepath), + }) + } + + async fn read(&self, path: impl AsRef) -> Result { + let Ok(mut file) = File::open(&path).await else { + warn!("Cache file object cannot be opened"); + return Err(RpxyError::Cache("Cache file object cannot be opened")); + }; + let (body_sender, res_body) = Body::channel(); + self.runtime_handle.spawn(async move { + let mut sender = body_sender; + let mut buf = BytesMut::new(); + loop { + match file.read_buf(&mut buf).await { + Ok(0) => break, + Ok(_) => sender.send_data(buf.copy_to_bytes(buf.remaining())).await?, + Err(_) => break, + }; + } + Ok(()) as Result<()> + }); + + Ok(res_body) + } + + async fn remove(&mut self, path: impl AsRef) -> Result<()> { + fs::remove_file(path.as_ref()).await?; + self.cnt -= 1; + debug!("Removed a cache file at {:?} (file count: {})", path.as_ref(), self.cnt); + + Ok(()) + } +} + #[derive(Clone, Debug)] pub struct RpxyCache { - cache_dir: PathBuf, + /// Managing cache file objects through RwLock's lock mechanism for file lock + cache_file_manager: Arc>, + /// Moka's cache storing http message caching policy inner: MokaCache, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + /// Async runtime runtime_handle: tokio::runtime::Handle, } @@ -57,41 +143,39 @@ impl RpxyCache { if !globals.proxy_config.cache_enabled { return None; } + + let path = globals.proxy_config.cache_dir.as_ref().unwrap(); + let cache_file_manager = Arc::new(RwLock::new(CacheFileManager::new(path, &globals.runtime_handle).await)); + let mgr_clone = cache_file_manager.clone(); + let runtime_handle = globals.runtime_handle.clone(); - let runtime_handle_clone = globals.runtime_handle.clone(); let eviction_listener = move |k, v: CacheObject, cause| { debug!("Cache entry is being evicted : {k} {:?}", cause); runtime_handle.block_on(async { if let Some(filepath) = v.target { debug!("Evict file object: {k}"); - if let Err(e) = fs::remove_file(filepath).await { + // Acquire the write lock + let mut mgr = mgr_clone.write().await; + if let Err(e) = mgr.remove(filepath).await { warn!("Eviction failed during file object removal: {:?}", e); }; } }) }; - // Create cache file dir - // Clean up the file dir before init - // TODO: Persistent cache is really difficult. maybe SQLite is needed. - let path = globals.proxy_config.cache_dir.as_ref().unwrap(); - if let Err(e) = fs::remove_dir_all(path).await { - warn!("Failed to clean up the cache dir: {e}"); - }; - fs::create_dir_all(path).await.unwrap(); - Some(Self { - cache_dir: path.clone(), + cache_file_manager, inner: MokaCache::builder() .max_capacity(MAX_CACHE_ENTRY) .eviction_listener_with_queued_delivery_mode(eviction_listener) .build(), // TODO: make this configurable, and along with size - runtime_handle: runtime_handle_clone, + runtime_handle: globals.runtime_handle.clone(), }) } /// Get cached response pub async fn get(&self, req: &Request) -> Option> { + debug!("Current cache entries: {:?}", self.inner); let moka_key = req.uri().to_string(); // First check cache chance @@ -105,36 +189,24 @@ impl RpxyCache { return None; }; - let Ok(mut file) = File::open(&filepath.clone()).await else { - warn!("Cache file object doesn't exist. Remove cache entry."); - self.inner.invalidate(&moka_key).await; - // let my_cache = self.inner.clone(); - // self.runtime_handle.spawn(async move { - // my_cache.invalidate(&moka_key).await; - // }); - return None; - }; - let (body_sender, res_body) = Body::channel(); - self.runtime_handle.spawn(async move { - let mut sender = body_sender; - // let mut size = 0usize; - let mut buf = BytesMut::new(); - loop { - match file.read_buf(&mut buf).await { - Ok(0) => break, - Ok(_) => sender.send_data(buf.copy_to_bytes(buf.remaining())).await?, - Err(_) => break, - }; + let mgr = self.cache_file_manager.read().await; + let res_body = match mgr.read(&filepath).await { + Ok(res_body) => res_body, + Err(e) => { + warn!("Failed to read from cache: {e}"); + self.inner.invalidate(&moka_key).await; + return None; } - Ok(()) as Result<()> - }); - - let res = Response::from_parts(res_parts, res_body); + }; debug!("Cache hit: {moka_key}"); - Some(res) + + Some(Response::from_parts(res_parts, res_body)) } else { - // Evict stale cache entry here - debug!("Evict stale cache entry and file object: {moka_key}"); + // Evict stale cache entry. + // This might be okay to keep as is since it would be updated later. + // However, there is no guarantee that newly got objects will be still cacheable. + // So, we have to evict stale cache entries and cache file objects if found. + debug!("Stale cache entry and file object: {moka_key}"); self.inner.invalidate(&moka_key).await; // let my_cache = self.inner.clone(); // self.runtime_handle.spawn(async move { @@ -163,34 +235,29 @@ impl RpxyCache { } } - pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: CachePolicy) -> Result<()> { + pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> { let my_cache = self.inner.clone(); let uri = uri.clone(); - let cache_dir = self.cache_dir.clone(); - let mut bytes_clone = body_bytes.clone(); + let bytes_clone = body_bytes.clone(); + let policy_clone = policy.clone(); + let mgr_clone = self.cache_file_manager.clone(); self.runtime_handle.spawn(async move { let moka_key = derive_moka_key_from_uri(&uri); let cache_filename = derive_filename_from_uri(&uri); - let cache_filepath = cache_dir.join(cache_filename); - let _x = my_cache - .get_with(moka_key, async { - let mut file = File::create(&cache_filepath).await.unwrap(); - while bytes_clone.has_remaining() { - if let Err(e) = file.write_buf(&mut bytes_clone).await { - error!("Failed to write file cache: {e}"); - return CacheObject { policy, target: None }; - }; - } - CacheObject { - policy, - target: Some(cache_filepath), - } + warn!("{:?} bytes to be written", bytes_clone.len()); + if let Err(e) = my_cache + .try_get_with(moka_key, async { + let mut mgr = mgr_clone.write().await; + mgr.write(&cache_filename, &bytes_clone, &policy_clone).await }) - .await; + .await + { + error!("Failed to put the body into the file object or cache entry: {e}"); + }; - debug!("Current cache entries: {}", my_cache.entry_count()); + debug!("Current cache entries: {:?}", my_cache); }); Ok(()) diff --git a/rpxy-lib/src/handler/forwarder.rs b/rpxy-lib/src/handler/forwarder.rs index 713ba06..28fc263 100644 --- a/rpxy-lib/src/handler/forwarder.rs +++ b/rpxy-lib/src/handler/forwarder.rs @@ -94,7 +94,7 @@ where .cache .as_ref() .unwrap() - .put(synth_req.unwrap().uri(), &aggregated, cache_policy) + .put(synth_req.unwrap().uri(), &aggregated, &cache_policy) .await { error!("{:?}", cache_err); diff --git a/rpxy-lib/src/handler/mod.rs b/rpxy-lib/src/handler/mod.rs index beea073..f2d3e43 100644 --- a/rpxy-lib/src/handler/mod.rs +++ b/rpxy-lib/src/handler/mod.rs @@ -7,7 +7,6 @@ mod utils_synth_response; #[cfg(feature = "sticky-cookie")] use crate::backend::LbContext; -pub use cache::CacheObject; pub use { forwarder::Forwarder, handler_main::{HttpMessageHandler, HttpMessageHandlerBuilder, HttpMessageHandlerBuilderError}, diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index d3f7bca..0596625 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -22,7 +22,6 @@ use std::sync::Arc; pub use crate::{ certs::{CertsAndKeys, CryptoSource}, globals::{AppConfig, AppConfigList, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri}, - handler::CacheObject, }; pub mod reexports { pub use hyper::Uri;