feat: totally updated cache structure using lru crate instead of moka (i.e., using simpler crate)
This commit is contained in:
parent
2477c6bf1b
commit
07d3accb91
10 changed files with 157 additions and 132 deletions
|
|
@ -5,6 +5,7 @@
|
||||||
### Improvement
|
### Improvement
|
||||||
|
|
||||||
- Feat: Enabled `h2c` (HTTP/2 cleartext) requests to upstream app servers (in the previous versions, only HTTP/1.1 is allowed for cleartext requests)
|
- Feat: Enabled `h2c` (HTTP/2 cleartext) requests to upstream app servers (in the previous versions, only HTTP/1.1 is allowed for cleartext requests)
|
||||||
|
- Feat: Initial implementation of caching feature using file + on memory cache. (Caveats: No persistance of the cache. Once config is updated, the cache is totally eliminated.)
|
||||||
- Refactor: logs of minor improvements
|
- Refactor: logs of minor improvements
|
||||||
|
|
||||||
### Bugfix
|
### Bugfix
|
||||||
|
|
|
||||||
5
TODO.md
5
TODO.md
|
|
@ -1,7 +1,10 @@
|
||||||
# TODO List
|
# TODO List
|
||||||
|
|
||||||
- [Done in 0.6.0] But we need more sophistication on `Forwarder` struct. ~~Fix strategy for `h2c` requests on forwarded requests upstream. This needs to update forwarder definition. Also, maybe forwarder would have a cache corresponding to the following task.~~
|
- [Done in 0.6.0] But we need more sophistication on `Forwarder` struct. ~~Fix strategy for `h2c` requests on forwarded requests upstream. This needs to update forwarder definition. Also, maybe forwarder would have a cache corresponding to the following task.~~
|
||||||
- [Try in v0.6.0] **Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55))**
|
- [Initial implementation in v0.6.0] ~~**Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55))**~~ Using `lru` crate might be inefficient in terms of the speed. Also, this cache feature should be a separated `feature` (But I think okay to be included in `default`).
|
||||||
|
- Consider more sophisticated architecture for cache
|
||||||
|
- Persistent cache (if possible).
|
||||||
|
- etc etc
|
||||||
- Improvement of path matcher
|
- Improvement of path matcher
|
||||||
- More flexible option for rewriting path
|
- More flexible option for rewriting path
|
||||||
- Refactoring
|
- Refactoring
|
||||||
|
|
|
||||||
|
|
@ -111,3 +111,5 @@ max_idle_timeout = 10 # secs. 0 represents an infinite timeout.
|
||||||
# If this specified, file cache feature is enabled
|
# If this specified, file cache feature is enabled
|
||||||
[experimental.cache]
|
[experimental.cache]
|
||||||
cache_dir = './cache' # optional. default is "./cache" relative to the current working directory
|
cache_dir = './cache' # optional. default is "./cache" relative to the current working directory
|
||||||
|
max_cache_entry = 1000 # optional. default is 1k
|
||||||
|
max_cache_each_size = 65535 # optional. default is 64k
|
||||||
|
|
|
||||||
|
|
@ -36,6 +36,8 @@ pub struct Http3Option {
|
||||||
#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)]
|
#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)]
|
||||||
pub struct CacheOption {
|
pub struct CacheOption {
|
||||||
pub cache_dir: Option<String>,
|
pub cache_dir: Option<String>,
|
||||||
|
pub max_cache_entry: Option<usize>,
|
||||||
|
pub max_cache_each_size: Option<usize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)]
|
#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)]
|
||||||
|
|
@ -173,7 +175,15 @@ impl TryInto<ProxyConfig> for &ConfigToml {
|
||||||
proxy_config.cache_dir = match &cache_option.cache_dir {
|
proxy_config.cache_dir = match &cache_option.cache_dir {
|
||||||
Some(cache_dir) => Some(PathBuf::from(cache_dir)),
|
Some(cache_dir) => Some(PathBuf::from(cache_dir)),
|
||||||
None => Some(PathBuf::from(CACHE_DIR)),
|
None => Some(PathBuf::from(CACHE_DIR)),
|
||||||
}
|
};
|
||||||
|
proxy_config.cache_max_entry = match &cache_option.max_cache_entry {
|
||||||
|
Some(num) => Some(*num),
|
||||||
|
None => Some(MAX_CACHE_ENTRY),
|
||||||
|
};
|
||||||
|
proxy_config.cache_max_each_size = match &cache_option.max_cache_each_size {
|
||||||
|
Some(num) => Some(*num),
|
||||||
|
None => Some(MAX_CACHE_EACH_SIZE),
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,12 @@
|
||||||
pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"];
|
pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"];
|
||||||
pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"];
|
pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"];
|
||||||
pub const CONFIG_WATCH_DELAY_SECS: u32 = 20;
|
pub const CONFIG_WATCH_DELAY_SECS: u32 = 20;
|
||||||
|
|
||||||
|
// Cache directory
|
||||||
pub const CACHE_DIR: &str = "./cache";
|
pub const CACHE_DIR: &str = "./cache";
|
||||||
|
// # of entries in cache
|
||||||
|
pub const MAX_CACHE_ENTRY: usize = 1_000;
|
||||||
|
// max size for each file in bytes
|
||||||
|
pub const MAX_CACHE_EACH_SIZE: usize = 65_535;
|
||||||
|
|
||||||
|
// TODO: max cache size in total
|
||||||
|
|
|
||||||
|
|
@ -75,8 +75,7 @@ s2n-quic-rustls = { path = "../submodules/s2n-quic/quic/s2n-quic-rustls/", optio
|
||||||
|
|
||||||
# cache
|
# cache
|
||||||
http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/" }
|
http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/" }
|
||||||
moka = { version = "0.11.3", features = ["future", "sync"] }
|
lru = { version = "0.11.0" }
|
||||||
fs4 = { version = "0.6.6", features = ["tokio", "tokio-async"] }
|
|
||||||
|
|
||||||
# cookie handling for sticky cookie
|
# cookie handling for sticky cookie
|
||||||
chrono = { version = "0.4.26", default-features = false, features = [
|
chrono = { version = "0.4.26", default-features = false, features = [
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,3 @@ pub mod H3 {
|
||||||
#[cfg(feature = "sticky-cookie")]
|
#[cfg(feature = "sticky-cookie")]
|
||||||
/// For load-balancing with sticky cookie
|
/// For load-balancing with sticky cookie
|
||||||
pub const STICKY_COOKIE_NAME: &str = "rpxy_srv_id";
|
pub const STICKY_COOKIE_NAME: &str = "rpxy_srv_id";
|
||||||
|
|
||||||
pub const MAX_CACHE_ENTRY: u64 = 10_000;
|
|
||||||
// TODO: max cache size per entry
|
|
||||||
// TODO: max cache size in total
|
|
||||||
|
|
|
||||||
|
|
@ -55,6 +55,8 @@ pub struct ProxyConfig {
|
||||||
|
|
||||||
pub cache_enabled: bool,
|
pub cache_enabled: bool,
|
||||||
pub cache_dir: Option<PathBuf>,
|
pub cache_dir: Option<PathBuf>,
|
||||||
|
pub cache_max_entry: Option<usize>,
|
||||||
|
pub cache_max_each_size: Option<usize>,
|
||||||
|
|
||||||
// All need to make packet acceptor
|
// All need to make packet acceptor
|
||||||
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
|
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
|
||||||
|
|
@ -93,6 +95,8 @@ impl Default for ProxyConfig {
|
||||||
|
|
||||||
cache_enabled: false,
|
cache_enabled: false,
|
||||||
cache_dir: None,
|
cache_dir: None,
|
||||||
|
cache_max_entry: None,
|
||||||
|
cache_max_each_size: None,
|
||||||
|
|
||||||
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
|
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
|
||||||
http3: false,
|
http3: false,
|
||||||
|
|
|
||||||
|
|
@ -1,18 +1,17 @@
|
||||||
use crate::{constants::MAX_CACHE_ENTRY, error::*, globals::Globals, log::*, CryptoSource};
|
use crate::{error::*, globals::Globals, log::*, CryptoSource};
|
||||||
use base64::{engine::general_purpose, Engine as _};
|
use base64::{engine::general_purpose, Engine as _};
|
||||||
use bytes::{Buf, Bytes, BytesMut};
|
use bytes::{Buf, Bytes, BytesMut};
|
||||||
use fs4::tokio::AsyncFileExt;
|
|
||||||
use http_cache_semantics::CachePolicy;
|
use http_cache_semantics::CachePolicy;
|
||||||
use hyper::{
|
use hyper::{
|
||||||
http::{Request, Response},
|
http::{Request, Response},
|
||||||
Body,
|
Body,
|
||||||
};
|
};
|
||||||
use moka::future::Cache as MokaCache;
|
use lru::LruCache;
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
use std::{
|
use std::{
|
||||||
fmt::Debug,
|
fmt::Debug,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::Arc,
|
sync::{Arc, Mutex},
|
||||||
time::SystemTime,
|
time::SystemTime,
|
||||||
};
|
};
|
||||||
use tokio::{
|
use tokio::{
|
||||||
|
|
@ -21,34 +20,10 @@ use tokio::{
|
||||||
sync::RwLock,
|
sync::RwLock,
|
||||||
};
|
};
|
||||||
|
|
||||||
// #[async_trait]
|
|
||||||
// pub trait CacheTarget {
|
|
||||||
// type TargetInput;
|
|
||||||
// type TargetOutput;
|
|
||||||
// type Error;
|
|
||||||
// /// Get target object from somewhere
|
|
||||||
// async fn get(&self) -> Self::TargetOutput;
|
|
||||||
// /// Write target object into somewhere
|
|
||||||
// async fn put(&self, taget: Self::TargetOutput) -> Result<(), Self::Error>;
|
|
||||||
// /// Remove target object from somewhere (when evicted self)
|
|
||||||
// async fn remove(&self) -> Result<(), Self::Error>;
|
|
||||||
// }
|
|
||||||
|
|
||||||
fn derive_filename_from_uri(uri: &hyper::Uri) -> String {
|
|
||||||
let mut hasher = Sha256::new();
|
|
||||||
hasher.update(uri.to_string());
|
|
||||||
let digest = hasher.finalize();
|
|
||||||
general_purpose::URL_SAFE_NO_PAD.encode(digest)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn derive_moka_key_from_uri(uri: &hyper::Uri) -> String {
|
|
||||||
uri.to_string()
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
struct CacheObject {
|
struct CacheObject {
|
||||||
pub policy: CachePolicy,
|
pub policy: CachePolicy,
|
||||||
pub target: Option<PathBuf>,
|
pub target: PathBuf,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -74,16 +49,13 @@ impl CacheFileManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn write(&mut self, cache_filename: &str, body_bytes: &Bytes, policy: &CachePolicy) -> Result<CacheObject> {
|
async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes, policy: &CachePolicy) -> Result<CacheObject> {
|
||||||
let cache_filepath = self.cache_dir.join(cache_filename);
|
let cache_filepath = self.cache_dir.join(cache_filename);
|
||||||
let Ok(mut file) = File::create(&cache_filepath).await else {
|
let Ok(mut file) = File::create(&cache_filepath).await else {
|
||||||
return Err(RpxyError::Cache("Failed to create file"));
|
return Err(RpxyError::Cache("Failed to create file"));
|
||||||
};
|
};
|
||||||
// TODO: ここでちゃんと書けないパターンっぽい?あるいは書いた後消されるパターンが起きている模様。
|
|
||||||
// evictしたときファイルは消えてentryが残ってるっぽい
|
|
||||||
let mut bytes_clone = body_bytes.clone();
|
let mut bytes_clone = body_bytes.clone();
|
||||||
while bytes_clone.has_remaining() {
|
while bytes_clone.has_remaining() {
|
||||||
warn!("remaining {}", bytes_clone.remaining());
|
|
||||||
if let Err(e) = file.write_buf(&mut bytes_clone).await {
|
if let Err(e) = file.write_buf(&mut bytes_clone).await {
|
||||||
error!("Failed to write file cache: {e}");
|
error!("Failed to write file cache: {e}");
|
||||||
return Err(RpxyError::Cache("Failed to write file cache: {e}"));
|
return Err(RpxyError::Cache("Failed to write file cache: {e}"));
|
||||||
|
|
@ -92,7 +64,7 @@ impl CacheFileManager {
|
||||||
self.cnt += 1;
|
self.cnt += 1;
|
||||||
Ok(CacheObject {
|
Ok(CacheObject {
|
||||||
policy: policy.clone(),
|
policy: policy.clone(),
|
||||||
target: Some(cache_filepath),
|
target: cache_filepath,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -131,10 +103,12 @@ impl CacheFileManager {
|
||||||
pub struct RpxyCache {
|
pub struct RpxyCache {
|
||||||
/// Managing cache file objects through RwLock's lock mechanism for file lock
|
/// Managing cache file objects through RwLock's lock mechanism for file lock
|
||||||
cache_file_manager: Arc<RwLock<CacheFileManager>>,
|
cache_file_manager: Arc<RwLock<CacheFileManager>>,
|
||||||
/// Moka's cache storing http message caching policy
|
/// Lru cache storing http message caching policy
|
||||||
inner: MokaCache<String, CacheObject>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう
|
inner: Arc<Mutex<LruCache<String, CacheObject>>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう
|
||||||
/// Async runtime
|
/// Async runtime
|
||||||
runtime_handle: tokio::runtime::Handle,
|
runtime_handle: tokio::runtime::Handle,
|
||||||
|
/// Maximum size of each cache file object
|
||||||
|
max_each_size: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RpxyCache {
|
impl RpxyCache {
|
||||||
|
|
@ -146,78 +120,139 @@ impl RpxyCache {
|
||||||
|
|
||||||
let path = globals.proxy_config.cache_dir.as_ref().unwrap();
|
let path = globals.proxy_config.cache_dir.as_ref().unwrap();
|
||||||
let cache_file_manager = Arc::new(RwLock::new(CacheFileManager::new(path, &globals.runtime_handle).await));
|
let cache_file_manager = Arc::new(RwLock::new(CacheFileManager::new(path, &globals.runtime_handle).await));
|
||||||
let mgr_clone = cache_file_manager.clone();
|
let inner = Arc::new(Mutex::new(LruCache::new(
|
||||||
|
std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry.unwrap()).unwrap(),
|
||||||
let runtime_handle = globals.runtime_handle.clone();
|
)));
|
||||||
let eviction_listener = move |k, v: CacheObject, cause| {
|
|
||||||
debug!("Cache entry is being evicted : {k} {:?}", cause);
|
|
||||||
runtime_handle.block_on(async {
|
|
||||||
if let Some(filepath) = v.target {
|
|
||||||
debug!("Evict file object: {k}");
|
|
||||||
// Acquire the write lock
|
|
||||||
let mut mgr = mgr_clone.write().await;
|
|
||||||
if let Err(e) = mgr.remove(filepath).await {
|
|
||||||
warn!("Eviction failed during file object removal: {:?}", e);
|
|
||||||
};
|
|
||||||
}
|
|
||||||
})
|
|
||||||
};
|
|
||||||
|
|
||||||
Some(Self {
|
Some(Self {
|
||||||
cache_file_manager,
|
cache_file_manager,
|
||||||
inner: MokaCache::builder()
|
inner,
|
||||||
.max_capacity(MAX_CACHE_ENTRY)
|
|
||||||
.eviction_listener_with_queued_delivery_mode(eviction_listener)
|
|
||||||
.build(), // TODO: make this configurable, and along with size
|
|
||||||
runtime_handle: globals.runtime_handle.clone(),
|
runtime_handle: globals.runtime_handle.clone(),
|
||||||
|
max_each_size: globals.proxy_config.cache_max_each_size.unwrap(),
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn evict_cache_entry(&self, cache_key: &str) -> Option<(String, CacheObject)> {
|
||||||
|
let Ok(mut lock) = self.inner.lock() else {
|
||||||
|
error!("Mutex can't be locked to evict a cache entry");
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
lock.pop_entry(cache_key)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn evict_cache_file(&self, filepath: impl AsRef<Path>) {
|
||||||
|
// Acquire the write lock
|
||||||
|
let mut mgr = self.cache_file_manager.write().await;
|
||||||
|
if let Err(e) = mgr.remove(filepath).await {
|
||||||
|
warn!("Eviction failed during file object removal: {:?}", e);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
/// Get cached response
|
/// Get cached response
|
||||||
pub async fn get<R>(&self, req: &Request<R>) -> Option<Response<Body>> {
|
pub async fn get<R>(&self, req: &Request<R>) -> Option<Response<Body>> {
|
||||||
debug!("Current cache entries: {:?}", self.inner);
|
debug!("Current cache entries: {:?}", self.inner);
|
||||||
let moka_key = req.uri().to_string();
|
let cache_key = req.uri().to_string();
|
||||||
|
|
||||||
// First check cache chance
|
// First check cache chance
|
||||||
let Some(cached_object) = self.inner.get(&moka_key) else {
|
let cached_object = {
|
||||||
|
let Ok(mut lock) = self.inner.lock() else {
|
||||||
|
error!("Mutex can't be locked for checking cache entry");
|
||||||
return None;
|
return None;
|
||||||
};
|
};
|
||||||
|
let Some(cached_object) = lock.get(&cache_key) else {
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
cached_object.clone()
|
||||||
|
};
|
||||||
|
|
||||||
|
// Secondly check the cache freshness as an HTTP message
|
||||||
let now = SystemTime::now();
|
let now = SystemTime::now();
|
||||||
if let http_cache_semantics::BeforeRequest::Fresh(res_parts) = cached_object.policy.before_request(req, now) {
|
let http_cache_semantics::BeforeRequest::Fresh(res_parts) = cached_object.policy.before_request(req, now) else {
|
||||||
let Some(filepath) = cached_object.target else {
|
|
||||||
return None;
|
|
||||||
};
|
|
||||||
|
|
||||||
let mgr = self.cache_file_manager.read().await;
|
|
||||||
let res_body = match mgr.read(&filepath).await {
|
|
||||||
Ok(res_body) => res_body,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to read from cache: {e}");
|
|
||||||
self.inner.invalidate(&moka_key).await;
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
debug!("Cache hit: {moka_key}");
|
|
||||||
|
|
||||||
Some(Response::from_parts(res_parts, res_body))
|
|
||||||
} else {
|
|
||||||
// Evict stale cache entry.
|
// Evict stale cache entry.
|
||||||
// This might be okay to keep as is since it would be updated later.
|
// This might be okay to keep as is since it would be updated later.
|
||||||
// However, there is no guarantee that newly got objects will be still cacheable.
|
// However, there is no guarantee that newly got objects will be still cacheable.
|
||||||
// So, we have to evict stale cache entries and cache file objects if found.
|
// So, we have to evict stale cache entries and cache file objects if found.
|
||||||
debug!("Stale cache entry and file object: {moka_key}");
|
debug!("Stale cache entry and file object: {cache_key}");
|
||||||
self.inner.invalidate(&moka_key).await;
|
let _evicted_entry = self.evict_cache_entry(&cache_key);
|
||||||
// let my_cache = self.inner.clone();
|
self.evict_cache_file(&cached_object.target).await;
|
||||||
// self.runtime_handle.spawn(async move {
|
return None;
|
||||||
// eviction listener will be activated during invalidation.
|
};
|
||||||
// my_cache.invalidate(&moka_key).await;
|
|
||||||
// });
|
// Finally retrieve the file object
|
||||||
None
|
let mgr = self.cache_file_manager.read().await;
|
||||||
|
let res_body = match mgr.read(&cached_object.target).await {
|
||||||
|
Ok(res_body) => res_body,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to read from file cache: {e}");
|
||||||
|
let _evicted_entry = self.evict_cache_entry(&cache_key);
|
||||||
|
self.evict_cache_file(&cached_object.target).await;
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
debug!("Cache hit: {cache_key}");
|
||||||
|
Some(Response::from_parts(res_parts, res_body))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> {
|
||||||
|
let my_cache = self.inner.clone();
|
||||||
|
let mgr = self.cache_file_manager.clone();
|
||||||
|
let uri = uri.clone();
|
||||||
|
let bytes_clone = body_bytes.clone();
|
||||||
|
let policy_clone = policy.clone();
|
||||||
|
let max_each_size = self.max_each_size;
|
||||||
|
|
||||||
|
self.runtime_handle.spawn(async move {
|
||||||
|
if bytes_clone.len() > max_each_size {
|
||||||
|
warn!("Too large to cache");
|
||||||
|
return Err(RpxyError::Cache("Too large to cache"));
|
||||||
|
}
|
||||||
|
let cache_key = derive_cache_key_from_uri(&uri);
|
||||||
|
let cache_filename = derive_filename_from_uri(&uri);
|
||||||
|
|
||||||
|
debug!("Cache file of {:?} bytes to be written", bytes_clone.len());
|
||||||
|
|
||||||
|
let mut mgr = mgr.write().await;
|
||||||
|
let Ok(cache_object) = mgr.create(&cache_filename, &bytes_clone, &policy_clone).await else {
|
||||||
|
error!("Failed to put the body into the file object or cache entry");
|
||||||
|
return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry"));
|
||||||
|
};
|
||||||
|
let push_opt = {
|
||||||
|
let Ok(mut lock) = my_cache.lock() else {
|
||||||
|
error!("Failed to acquire mutex lock for writing cache entry");
|
||||||
|
return Err(RpxyError::Cache("Failed to acquire mutex lock for writing cache entry"));
|
||||||
|
};
|
||||||
|
lock.push(cache_key.clone(), cache_object)
|
||||||
|
};
|
||||||
|
if let Some((k, v)) = push_opt {
|
||||||
|
if k != cache_key {
|
||||||
|
info!("Over the cache capacity. Evict least recent used entry");
|
||||||
|
if let Err(e) = mgr.remove(&v.target).await {
|
||||||
|
warn!("Eviction failed during file object removal over the capacity: {:?}", e);
|
||||||
|
};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_cacheable<R>(&self, req: Option<&Request<R>>, res: Option<&Response<Body>>) -> Result<Option<CachePolicy>>
|
debug!("Cached a new file: {} - {}", cache_key, cache_filename);
|
||||||
|
Ok(())
|
||||||
|
});
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn derive_filename_from_uri(uri: &hyper::Uri) -> String {
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(uri.to_string());
|
||||||
|
let digest = hasher.finalize();
|
||||||
|
general_purpose::URL_SAFE_NO_PAD.encode(digest)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn derive_cache_key_from_uri(uri: &hyper::Uri) -> String {
|
||||||
|
uri.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_policy_if_cacheable<R>(req: Option<&Request<R>>, res: Option<&Response<Body>>) -> Result<Option<CachePolicy>>
|
||||||
where
|
where
|
||||||
R: Debug,
|
R: Debug,
|
||||||
{
|
{
|
||||||
|
|
@ -234,32 +269,3 @@ impl RpxyCache {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> {
|
|
||||||
let my_cache = self.inner.clone();
|
|
||||||
let uri = uri.clone();
|
|
||||||
let bytes_clone = body_bytes.clone();
|
|
||||||
let policy_clone = policy.clone();
|
|
||||||
let mgr_clone = self.cache_file_manager.clone();
|
|
||||||
|
|
||||||
self.runtime_handle.spawn(async move {
|
|
||||||
let moka_key = derive_moka_key_from_uri(&uri);
|
|
||||||
let cache_filename = derive_filename_from_uri(&uri);
|
|
||||||
|
|
||||||
warn!("{:?} bytes to be written", bytes_clone.len());
|
|
||||||
if let Err(e) = my_cache
|
|
||||||
.try_get_with(moka_key, async {
|
|
||||||
let mut mgr = mgr_clone.write().await;
|
|
||||||
mgr.write(&cache_filename, &bytes_clone, &policy_clone).await
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
error!("Failed to put the body into the file object or cache entry: {e}");
|
|
||||||
};
|
|
||||||
|
|
||||||
debug!("Current cache entries: {:?}", my_cache);
|
|
||||||
});
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
use super::cache::RpxyCache;
|
use super::cache::{get_policy_if_cacheable, RpxyCache};
|
||||||
use crate::{error::RpxyError, globals::Globals, log::*, CryptoSource};
|
use crate::{error::RpxyError, globals::Globals, log::*, CryptoSource};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Buf;
|
use bytes::Buf;
|
||||||
|
|
@ -55,7 +55,7 @@ where
|
||||||
if self.cache.is_some() {
|
if self.cache.is_some() {
|
||||||
if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await {
|
if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await {
|
||||||
// if found, return it as response.
|
// if found, return it as response.
|
||||||
debug!("Cache hit - Return from cache");
|
info!("Cache hit - Return from cache");
|
||||||
return Ok(cached_response);
|
return Ok(cached_response);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
@ -76,11 +76,7 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
// check cacheability and store it if cacheable
|
// check cacheability and store it if cacheable
|
||||||
let Ok(Some(cache_policy)) = self
|
let Ok(Some(cache_policy)) = get_policy_if_cacheable(synth_req.as_ref(), res.as_ref().ok()) else {
|
||||||
.cache
|
|
||||||
.as_ref()
|
|
||||||
.unwrap()
|
|
||||||
.is_cacheable(synth_req.as_ref(), res.as_ref().ok()) else {
|
|
||||||
return res;
|
return res;
|
||||||
};
|
};
|
||||||
let (parts, body) = res.unwrap().into_parts();
|
let (parts, body) = res.unwrap().into_parts();
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue