wip: implement on-memory cache as is

This commit is contained in:
Jun Kurihara 2023-12-09 12:14:59 +09:00
commit ed33c5d4f1
No known key found for this signature in database
GPG key ID: D992B3E3DE1DED23
3 changed files with 46 additions and 6 deletions

View file

@ -24,7 +24,7 @@ http3-s2n = [
sticky-cookie = ["base64", "sha2", "chrono"] sticky-cookie = ["base64", "sha2", "chrono"]
native-tls-backend = ["hyper-tls"] native-tls-backend = ["hyper-tls"]
rustls-backend = [] rustls-backend = []
cache = ["http-cache-semantics", "lru"] cache = ["http-cache-semantics", "lru", "sha2", "base64"]
native-roots = [] #"hyper-rustls/native-tokio"] native-roots = [] #"hyper-rustls/native-tokio"]
[dependencies] [dependencies]
@ -87,9 +87,10 @@ s2n-quic-rustls = { version = "0.32.0", optional = true }
# for UDP socket wit SO_REUSEADDR when h3 with quinn # for UDP socket wit SO_REUSEADDR when h3 with quinn
socket2 = { version = "0.5.5", features = ["all"], optional = true } socket2 = { version = "0.5.5", features = ["all"], optional = true }
# # cache # cache
http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true }
lru = { version = "0.12.1", optional = true } lru = { version = "0.12.1", optional = true }
sha2 = { version = "0.10.8", default-features = false, optional = true }
# cookie handling for sticky cookie # cookie handling for sticky cookie
chrono = { version = "0.4.31", default-features = false, features = [ chrono = { version = "0.4.31", default-features = false, features = [
@ -98,7 +99,6 @@ chrono = { version = "0.4.31", default-features = false, features = [
"clock", "clock",
], optional = true } ], optional = true }
base64 = { version = "0.21.5", optional = true } base64 = { version = "0.21.5", optional = true }
sha2 = { version = "0.10.8", default-features = false, optional = true }
[dev-dependencies] [dev-dependencies]

View file

@ -93,6 +93,10 @@ pub enum RpxyError {
#[error("Failed to write byte buffer")] #[error("Failed to write byte buffer")]
FailedToWriteByteBufferForCache, FailedToWriteByteBufferForCache,
#[cfg(feature = "cache")]
#[error("Failed to acquire mutex lock for cache")]
FailedToAcquiredMutexLockForCache,
// Upstream connection setting errors // Upstream connection setting errors
#[error("Unsupported upstream option")] #[error("Unsupported upstream option")]
UnsupportedUpstreamOption, UnsupportedUpstreamOption,

View file

@ -4,14 +4,18 @@ use http_cache_semantics::CachePolicy;
use lru::LruCache; use lru::LruCache;
use std::{ use std::{
path::{Path, PathBuf}, path::{Path, PathBuf},
sync::{atomic::AtomicUsize, Arc, Mutex}, sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
}; };
use tokio::{fs, sync::RwLock}; use tokio::{fs, sync::RwLock};
/* ---------------------------------------------- */ /* ---------------------------------------------- */
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
/// Cache main manager
pub struct RpxyCache { pub struct RpxyCache {
/// Lru cache storing http message caching policy /// Inner lru cache manager storing http message caching policy
inner: LruCacheManager, inner: LruCacheManager,
/// Managing cache file objects through RwLock's lock mechanism for file lock /// Managing cache file objects through RwLock's lock mechanism for file lock
file_store: FileStore, file_store: FileStore,
@ -122,7 +126,9 @@ struct CacheObject {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
/// Lru cache manager that is responsible to handle `Mutex` as an outer of `LruCache` /// Lru cache manager that is responsible to handle `Mutex` as an outer of `LruCache`
struct LruCacheManager { struct LruCacheManager {
/// Inner lru cache manager main object
inner: Arc<Mutex<LruCache<String, CacheObject>>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう inner: Arc<Mutex<LruCache<String, CacheObject>>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう
/// Counter of current cached object (total)
cnt: Arc<AtomicUsize>, cnt: Arc<AtomicUsize>,
} }
@ -133,12 +139,42 @@ impl LruCacheManager {
inner: Arc::new(Mutex::new(LruCache::new( inner: Arc::new(Mutex::new(LruCache::new(
std::num::NonZeroUsize::new(cache_max_entry).unwrap(), std::num::NonZeroUsize::new(cache_max_entry).unwrap(),
))), ))),
cnt: Arc::new(AtomicUsize::default()), cnt: Default::default(),
} }
} }
/// Count entries
fn count(&self) -> usize {
self.cnt.load(Ordering::Relaxed)
}
/// Evict an entry
fn evict(&self, cache_key: &str) -> Option<(String, CacheObject)> {
let Ok(mut lock) = self.inner.lock() else {
error!("Mutex can't be locked to evict a cache entry");
return None;
};
let res = lock.pop_entry(cache_key);
// This may be inconsistent with the actual number of entries
self.cnt.store(lock.len(), Ordering::Relaxed);
res
}
/// Push an entry
fn push(&self, cache_key: &str, cache_object: CacheObject) -> RpxyResult<Option<(String, CacheObject)>> {
let Ok(mut lock) = self.inner.lock() else {
error!("Failed to acquire mutex lock for writing cache entry");
return Err(RpxyError::FailedToAcquiredMutexLockForCache);
};
let res = Ok(lock.push(cache_key.to_string(), cache_object));
// This may be inconsistent with the actual number of entries
self.cnt.store(lock.len(), Ordering::Relaxed);
res
}
} }
/* ---------------------------------------------- */ /* ---------------------------------------------- */
/// Generate cache policy if the response is cacheable
pub fn get_policy_if_cacheable<B1, B2>( pub fn get_policy_if_cacheable<B1, B2>(
req: Option<&Request<B1>>, req: Option<&Request<B1>>,
res: Option<&Response<B2>>, res: Option<&Response<B2>>,