From ed33c5d4f119b26fb75b2ea64ccff22ed3bf0913 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Sat, 9 Dec 2023 12:14:59 +0900 Subject: [PATCH] wip: implement on-memory cache as is --- rpxy-lib/Cargo.toml | 6 ++--- rpxy-lib/src/error.rs | 4 ++++ rpxy-lib/src/forwarder/cache.rs | 42 ++++++++++++++++++++++++++++++--- 3 files changed, 46 insertions(+), 6 deletions(-) diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index fe715e0..65c7c58 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -24,7 +24,7 @@ http3-s2n = [ sticky-cookie = ["base64", "sha2", "chrono"] native-tls-backend = ["hyper-tls"] rustls-backend = [] -cache = ["http-cache-semantics", "lru"] +cache = ["http-cache-semantics", "lru", "sha2", "base64"] native-roots = [] #"hyper-rustls/native-tokio"] [dependencies] @@ -87,9 +87,10 @@ s2n-quic-rustls = { version = "0.32.0", optional = true } # for UDP socket wit SO_REUSEADDR when h3 with quinn socket2 = { version = "0.5.5", features = ["all"], optional = true } -# # cache +# cache http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } lru = { version = "0.12.1", optional = true } +sha2 = { version = "0.10.8", default-features = false, optional = true } # cookie handling for sticky cookie chrono = { version = "0.4.31", default-features = false, features = [ @@ -98,7 +99,6 @@ chrono = { version = "0.4.31", default-features = false, features = [ "clock", ], optional = true } base64 = { version = "0.21.5", optional = true } -sha2 = { version = "0.10.8", default-features = false, optional = true } [dev-dependencies] diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 4cbc463..2763d1e 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -93,6 +93,10 @@ pub enum RpxyError { #[error("Failed to write byte buffer")] FailedToWriteByteBufferForCache, + #[cfg(feature = "cache")] + #[error("Failed to acquire mutex lock for cache")] + FailedToAcquiredMutexLockForCache, + // Upstream connection setting errors #[error("Unsupported upstream option")] UnsupportedUpstreamOption, diff --git a/rpxy-lib/src/forwarder/cache.rs b/rpxy-lib/src/forwarder/cache.rs index 73bed7b..ea29a41 100644 --- a/rpxy-lib/src/forwarder/cache.rs +++ b/rpxy-lib/src/forwarder/cache.rs @@ -4,14 +4,18 @@ use http_cache_semantics::CachePolicy; use lru::LruCache; use std::{ path::{Path, PathBuf}, - sync::{atomic::AtomicUsize, Arc, Mutex}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, }; use tokio::{fs, sync::RwLock}; /* ---------------------------------------------- */ #[derive(Clone, Debug)] +/// Cache main manager pub struct RpxyCache { - /// Lru cache storing http message caching policy + /// Inner lru cache manager storing http message caching policy inner: LruCacheManager, /// Managing cache file objects through RwLock's lock mechanism for file lock file_store: FileStore, @@ -122,7 +126,9 @@ struct CacheObject { #[derive(Debug, Clone)] /// Lru cache manager that is responsible to handle `Mutex` as an outer of `LruCache` struct LruCacheManager { + /// Inner lru cache manager main object inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + /// Counter of current cached object (total) cnt: Arc, } @@ -133,12 +139,42 @@ impl LruCacheManager { inner: Arc::new(Mutex::new(LruCache::new( std::num::NonZeroUsize::new(cache_max_entry).unwrap(), ))), - cnt: Arc::new(AtomicUsize::default()), + cnt: Default::default(), } } + + /// Count entries + fn count(&self) -> usize { + self.cnt.load(Ordering::Relaxed) + } + + /// Evict an entry + fn evict(&self, cache_key: &str) -> Option<(String, CacheObject)> { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked to evict a cache entry"); + return None; + }; + let res = lock.pop_entry(cache_key); + // This may be inconsistent with the actual number of entries + self.cnt.store(lock.len(), Ordering::Relaxed); + res + } + + /// Push an entry + fn push(&self, cache_key: &str, cache_object: CacheObject) -> RpxyResult> { + let Ok(mut lock) = self.inner.lock() else { + error!("Failed to acquire mutex lock for writing cache entry"); + return Err(RpxyError::FailedToAcquiredMutexLockForCache); + }; + let res = Ok(lock.push(cache_key.to_string(), cache_object)); + // This may be inconsistent with the actual number of entries + self.cnt.store(lock.len(), Ordering::Relaxed); + res + } } /* ---------------------------------------------- */ +/// Generate cache policy if the response is cacheable pub fn get_policy_if_cacheable( req: Option<&Request>, res: Option<&Response>,