feat: on memory cache if less than 4k or specified cache size.
This commit is contained in:
		
					parent
					
						
							
								32b173966c
							
						
					
				
			
			
				commit
				
					
						5f6758ff9e
					
				
			
		
					 5 changed files with 82 additions and 29 deletions
				
			
		|  | @ -113,3 +113,4 @@ max_idle_timeout = 10              # secs. 0 represents an infinite timeout. | |||
| cache_dir = './cache'               # optional. default is "./cache" relative to the current working directory | ||||
| max_cache_entry = 1000              # optional. default is 1k | ||||
| max_cache_each_size = 65535         # optional. default is 64k | ||||
| max_cache_each_size_onmemory = 4096 # optional. default is 4k | ||||
|  |  | |||
|  | @ -38,6 +38,7 @@ pub struct CacheOption { | |||
|   pub cache_dir: Option<String>, | ||||
|   pub max_cache_entry: Option<usize>, | ||||
|   pub max_cache_each_size: Option<usize>, | ||||
|   pub max_cache_each_size_on_memory: Option<usize>, | ||||
| } | ||||
| 
 | ||||
| #[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] | ||||
|  | @ -184,6 +185,9 @@ impl TryInto<ProxyConfig> for &ConfigToml { | |||
|         if let Some(num) = cache_option.max_cache_each_size { | ||||
|           proxy_config.cache_max_each_size = num; | ||||
|         } | ||||
|         if let Some(num) = cache_option.max_cache_each_size_on_memory { | ||||
|           proxy_config.cache_max_each_size_on_memory = num; | ||||
|         } | ||||
|       } | ||||
|     } | ||||
| 
 | ||||
|  |  | |||
|  | @ -38,5 +38,8 @@ pub const MAX_CACHE_ENTRY: usize = 1_000; | |||
| #[cfg(feature = "cache")] | ||||
| // max size for each file in bytes
 | ||||
| pub const MAX_CACHE_EACH_SIZE: usize = 65_535; | ||||
| #[cfg(feature = "cache")] | ||||
| // on memory cache if less than or equel to
 | ||||
| pub const MAX_CACHE_EACH_SIZE_ON_MEMORY: usize = 4_096; | ||||
| 
 | ||||
| // TODO: max cache size in total
 | ||||
|  |  | |||
|  | @ -61,6 +61,8 @@ pub struct ProxyConfig { | |||
|   pub cache_max_entry: usize, | ||||
|   #[cfg(feature = "cache")] | ||||
|   pub cache_max_each_size: usize, | ||||
|   #[cfg(feature = "cache")] | ||||
|   pub cache_max_each_size_on_memory: usize, | ||||
| 
 | ||||
|   // All need to make packet acceptor
 | ||||
|   #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] | ||||
|  | @ -105,6 +107,8 @@ impl Default for ProxyConfig { | |||
|       cache_max_entry: MAX_CACHE_ENTRY, | ||||
|       #[cfg(feature = "cache")] | ||||
|       cache_max_each_size: MAX_CACHE_EACH_SIZE, | ||||
|       #[cfg(feature = "cache")] | ||||
|       cache_max_each_size_on_memory: MAX_CACHE_EACH_SIZE_ON_MEMORY, | ||||
| 
 | ||||
|       #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] | ||||
|       http3: false, | ||||
|  |  | |||
|  | @ -20,10 +20,16 @@ use tokio::{ | |||
|   sync::RwLock, | ||||
| }; | ||||
| 
 | ||||
| #[derive(Clone, Debug)] | ||||
| pub enum CacheFileOrOnMemory { | ||||
|   File(PathBuf), | ||||
|   OnMemory(Vec<u8>), | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone, Debug)] | ||||
| struct CacheObject { | ||||
|   pub policy: CachePolicy, | ||||
|   pub target: PathBuf, | ||||
|   pub target: CacheFileOrOnMemory, | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug)] | ||||
|  | @ -64,7 +70,7 @@ impl CacheFileManager { | |||
|     self.cnt += 1; | ||||
|     Ok(CacheObject { | ||||
|       policy: policy.clone(), | ||||
|       target: cache_filepath, | ||||
|       target: CacheFileOrOnMemory::File(cache_filepath), | ||||
|     }) | ||||
|   } | ||||
| 
 | ||||
|  | @ -109,6 +115,8 @@ pub struct RpxyCache { | |||
|   runtime_handle: tokio::runtime::Handle, | ||||
|   /// Maximum size of each cache file object
 | ||||
|   max_each_size: usize, | ||||
|   /// Maximum size of cache object on memory
 | ||||
|   max_each_size_on_memory: usize, | ||||
| } | ||||
| 
 | ||||
| impl RpxyCache { | ||||
|  | @ -124,11 +132,21 @@ impl RpxyCache { | |||
|       std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry).unwrap(), | ||||
|     ))); | ||||
| 
 | ||||
|     let max_each_size = globals.proxy_config.cache_max_each_size; | ||||
|     let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory; | ||||
|     if max_each_size < max_each_size_on_memory { | ||||
|       warn!( | ||||
|         "Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache" | ||||
|       ); | ||||
|       max_each_size_on_memory = max_each_size; | ||||
|     } | ||||
| 
 | ||||
|     Some(Self { | ||||
|       cache_file_manager, | ||||
|       inner, | ||||
|       runtime_handle: globals.runtime_handle.clone(), | ||||
|       max_each_size: globals.proxy_config.cache_max_each_size, | ||||
|       max_each_size, | ||||
|       max_each_size_on_memory, | ||||
|     }) | ||||
|   } | ||||
| 
 | ||||
|  | @ -174,25 +192,36 @@ impl RpxyCache { | |||
|       // So, we have to evict stale cache entries and cache file objects if found.
 | ||||
|       debug!("Stale cache entry and file object: {cache_key}"); | ||||
|       let _evicted_entry = self.evict_cache_entry(&cache_key); | ||||
|       self.evict_cache_file(&cached_object.target).await; | ||||
|       // For cache file
 | ||||
|       if let CacheFileOrOnMemory::File(path) = cached_object.target { | ||||
|         self.evict_cache_file(&path).await; | ||||
|       } | ||||
|       return None; | ||||
|     }; | ||||
| 
 | ||||
|     // Finally retrieve the file object
 | ||||
|     // Finally retrieve the file/on-memory object
 | ||||
|     match cached_object.target { | ||||
|       CacheFileOrOnMemory::File(path) => { | ||||
|         let mgr = self.cache_file_manager.read().await; | ||||
|     let res_body = match mgr.read(&cached_object.target).await { | ||||
|         let res_body = match mgr.read(&path).await { | ||||
|           Ok(res_body) => res_body, | ||||
|           Err(e) => { | ||||
|             warn!("Failed to read from file cache: {e}"); | ||||
|             let _evicted_entry = self.evict_cache_entry(&cache_key); | ||||
|         self.evict_cache_file(&cached_object.target).await; | ||||
|             self.evict_cache_file(&path).await; | ||||
|             return None; | ||||
|           } | ||||
|         }; | ||||
| 
 | ||||
|     debug!("Cache hit: {cache_key}"); | ||||
|         debug!("Cache hit from file: {cache_key}"); | ||||
|         Some(Response::from_parts(res_parts, res_body)) | ||||
|       } | ||||
|       CacheFileOrOnMemory::OnMemory(object) => { | ||||
|         debug!("Cache hit from on memory: {cache_key}"); | ||||
|         Some(Response::from_parts(res_parts, Body::from(object))) | ||||
|       } | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|   pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> { | ||||
|     let my_cache = self.inner.clone(); | ||||
|  | @ -201,6 +230,7 @@ impl RpxyCache { | |||
|     let bytes_clone = body_bytes.clone(); | ||||
|     let policy_clone = policy.clone(); | ||||
|     let max_each_size = self.max_each_size; | ||||
|     let max_each_size_on_memory = self.max_each_size_on_memory; | ||||
| 
 | ||||
|     self.runtime_handle.spawn(async move { | ||||
|       if bytes_clone.len() > max_each_size { | ||||
|  | @ -212,11 +242,21 @@ impl RpxyCache { | |||
| 
 | ||||
|       debug!("Cache file of {:?} bytes to be written", bytes_clone.len()); | ||||
| 
 | ||||
|       let cache_object = if bytes_clone.len() > max_each_size_on_memory { | ||||
|         let mut mgr = mgr.write().await; | ||||
|         let Ok(cache_object) = mgr.create(&cache_filename, &bytes_clone, &policy_clone).await else { | ||||
|           error!("Failed to put the body into the file object or cache entry"); | ||||
|           return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry")); | ||||
|         }; | ||||
|         debug!("Cached a new file: {} - {}", cache_key, cache_filename); | ||||
|         cache_object | ||||
|       } else { | ||||
|         debug!("Cached a new object on memory: {}", cache_key); | ||||
|         CacheObject { | ||||
|           policy: policy_clone, | ||||
|           target: CacheFileOrOnMemory::OnMemory(bytes_clone.to_vec()), | ||||
|         } | ||||
|       }; | ||||
|       let push_opt = { | ||||
|         let Ok(mut lock) = my_cache.lock() else { | ||||
|           error!("Failed to acquire mutex lock for writing cache entry"); | ||||
|  | @ -227,13 +267,14 @@ impl RpxyCache { | |||
|       if let Some((k, v)) = push_opt { | ||||
|         if k != cache_key { | ||||
|           info!("Over the cache capacity. Evict least recent used entry"); | ||||
|           if let Err(e) = mgr.remove(&v.target).await { | ||||
|           if let CacheFileOrOnMemory::File(path) = v.target { | ||||
|             let mut mgr = mgr.write().await; | ||||
|             if let Err(e) = mgr.remove(&path).await { | ||||
|               warn!("Eviction failed during file object removal over the capacity: {:?}", e); | ||||
|             }; | ||||
|           } | ||||
|         } | ||||
| 
 | ||||
|       debug!("Cached a new file: {} - {}", cache_key, cache_filename); | ||||
|       } | ||||
|       Ok(()) | ||||
|     }); | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Jun Kurihara
				Jun Kurihara