diff --git a/config-example.toml b/config-example.toml index 694ff7c..424417a 100644 --- a/config-example.toml +++ b/config-example.toml @@ -110,6 +110,7 @@ max_idle_timeout = 10 # secs. 0 represents an infinite timeout. # If this specified, file cache feature is enabled [experimental.cache] -cache_dir = './cache' # optional. default is "./cache" relative to the current working directory -max_cache_entry = 1000 # optional. default is 1k -max_cache_each_size = 65535 # optional. default is 64k +cache_dir = './cache' # optional. default is "./cache" relative to the current working directory +max_cache_entry = 1000 # optional. default is 1k +max_cache_each_size = 65535 # optional. default is 64k +max_cache_each_size_onmemory = 4096 # optional. default is 4k diff --git a/rpxy-bin/src/config/toml.rs b/rpxy-bin/src/config/toml.rs index 2d03895..e678012 100644 --- a/rpxy-bin/src/config/toml.rs +++ b/rpxy-bin/src/config/toml.rs @@ -38,6 +38,7 @@ pub struct CacheOption { pub cache_dir: Option, pub max_cache_entry: Option, pub max_cache_each_size: Option, + pub max_cache_each_size_on_memory: Option, } #[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] @@ -184,6 +185,9 @@ impl TryInto for &ConfigToml { if let Some(num) = cache_option.max_cache_each_size { proxy_config.cache_max_each_size = num; } + if let Some(num) = cache_option.max_cache_each_size_on_memory { + proxy_config.cache_max_each_size_on_memory = num; + } } } diff --git a/rpxy-lib/src/constants.rs b/rpxy-lib/src/constants.rs index 510836a..ebec1fc 100644 --- a/rpxy-lib/src/constants.rs +++ b/rpxy-lib/src/constants.rs @@ -38,5 +38,8 @@ pub const MAX_CACHE_ENTRY: usize = 1_000; #[cfg(feature = "cache")] // max size for each file in bytes pub const MAX_CACHE_EACH_SIZE: usize = 65_535; +#[cfg(feature = "cache")] +// on memory cache if less than or equel to +pub const MAX_CACHE_EACH_SIZE_ON_MEMORY: usize = 4_096; // TODO: max cache size in total diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index 6f2efb0..d1c0130 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -61,6 +61,8 @@ pub struct ProxyConfig { pub cache_max_entry: usize, #[cfg(feature = "cache")] pub cache_max_each_size: usize, + #[cfg(feature = "cache")] + pub cache_max_each_size_on_memory: usize, // All need to make packet acceptor #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] @@ -105,6 +107,8 @@ impl Default for ProxyConfig { cache_max_entry: MAX_CACHE_ENTRY, #[cfg(feature = "cache")] cache_max_each_size: MAX_CACHE_EACH_SIZE, + #[cfg(feature = "cache")] + cache_max_each_size_on_memory: MAX_CACHE_EACH_SIZE_ON_MEMORY, #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] http3: false, diff --git a/rpxy-lib/src/handler/cache.rs b/rpxy-lib/src/handler/cache.rs index ce8aac4..22ed51b 100644 --- a/rpxy-lib/src/handler/cache.rs +++ b/rpxy-lib/src/handler/cache.rs @@ -20,10 +20,16 @@ use tokio::{ sync::RwLock, }; +#[derive(Clone, Debug)] +pub enum CacheFileOrOnMemory { + File(PathBuf), + OnMemory(Vec), +} + #[derive(Clone, Debug)] struct CacheObject { pub policy: CachePolicy, - pub target: PathBuf, + pub target: CacheFileOrOnMemory, } #[derive(Debug)] @@ -64,7 +70,7 @@ impl CacheFileManager { self.cnt += 1; Ok(CacheObject { policy: policy.clone(), - target: cache_filepath, + target: CacheFileOrOnMemory::File(cache_filepath), }) } @@ -109,6 +115,8 @@ pub struct RpxyCache { runtime_handle: tokio::runtime::Handle, /// Maximum size of each cache file object max_each_size: usize, + /// Maximum size of cache object on memory + max_each_size_on_memory: usize, } impl RpxyCache { @@ -124,11 +132,21 @@ impl RpxyCache { std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry).unwrap(), ))); + let max_each_size = globals.proxy_config.cache_max_each_size; + let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory; + if max_each_size < max_each_size_on_memory { + warn!( + "Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache" + ); + max_each_size_on_memory = max_each_size; + } + Some(Self { cache_file_manager, inner, runtime_handle: globals.runtime_handle.clone(), - max_each_size: globals.proxy_config.cache_max_each_size, + max_each_size, + max_each_size_on_memory, }) } @@ -174,24 +192,35 @@ impl RpxyCache { // So, we have to evict stale cache entries and cache file objects if found. debug!("Stale cache entry and file object: {cache_key}"); let _evicted_entry = self.evict_cache_entry(&cache_key); - self.evict_cache_file(&cached_object.target).await; + // For cache file + if let CacheFileOrOnMemory::File(path) = cached_object.target { + self.evict_cache_file(&path).await; + } return None; }; - // Finally retrieve the file object - let mgr = self.cache_file_manager.read().await; - let res_body = match mgr.read(&cached_object.target).await { - Ok(res_body) => res_body, - Err(e) => { - warn!("Failed to read from file cache: {e}"); - let _evicted_entry = self.evict_cache_entry(&cache_key); - self.evict_cache_file(&cached_object.target).await; - return None; - } - }; + // Finally retrieve the file/on-memory object + match cached_object.target { + CacheFileOrOnMemory::File(path) => { + let mgr = self.cache_file_manager.read().await; + let res_body = match mgr.read(&path).await { + Ok(res_body) => res_body, + Err(e) => { + warn!("Failed to read from file cache: {e}"); + let _evicted_entry = self.evict_cache_entry(&cache_key); + self.evict_cache_file(&path).await; + return None; + } + }; - debug!("Cache hit: {cache_key}"); - Some(Response::from_parts(res_parts, res_body)) + debug!("Cache hit from file: {cache_key}"); + Some(Response::from_parts(res_parts, res_body)) + } + CacheFileOrOnMemory::OnMemory(object) => { + debug!("Cache hit from on memory: {cache_key}"); + Some(Response::from_parts(res_parts, Body::from(object))) + } + } } pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> { @@ -201,6 +230,7 @@ impl RpxyCache { let bytes_clone = body_bytes.clone(); let policy_clone = policy.clone(); let max_each_size = self.max_each_size; + let max_each_size_on_memory = self.max_each_size_on_memory; self.runtime_handle.spawn(async move { if bytes_clone.len() > max_each_size { @@ -212,10 +242,20 @@ impl RpxyCache { debug!("Cache file of {:?} bytes to be written", bytes_clone.len()); - let mut mgr = mgr.write().await; - let Ok(cache_object) = mgr.create(&cache_filename, &bytes_clone, &policy_clone).await else { - error!("Failed to put the body into the file object or cache entry"); - return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry")); + let cache_object = if bytes_clone.len() > max_each_size_on_memory { + let mut mgr = mgr.write().await; + let Ok(cache_object) = mgr.create(&cache_filename, &bytes_clone, &policy_clone).await else { + error!("Failed to put the body into the file object or cache entry"); + return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry")); + }; + debug!("Cached a new file: {} - {}", cache_key, cache_filename); + cache_object + } else { + debug!("Cached a new object on memory: {}", cache_key); + CacheObject { + policy: policy_clone, + target: CacheFileOrOnMemory::OnMemory(bytes_clone.to_vec()), + } }; let push_opt = { let Ok(mut lock) = my_cache.lock() else { @@ -227,13 +267,14 @@ impl RpxyCache { if let Some((k, v)) = push_opt { if k != cache_key { info!("Over the cache capacity. Evict least recent used entry"); - if let Err(e) = mgr.remove(&v.target).await { - warn!("Eviction failed during file object removal over the capacity: {:?}", e); - }; + if let CacheFileOrOnMemory::File(path) = v.target { + let mut mgr = mgr.write().await; + if let Err(e) = mgr.remove(&path).await { + warn!("Eviction failed during file object removal over the capacity: {:?}", e); + }; + } } } - - debug!("Cached a new file: {} - {}", cache_key, cache_filename); Ok(()) });