chore: refactor

This commit is contained in:
Jun Kurihara 2025-05-20 22:50:54 +09:00
commit 886aa74d6c
No known key found for this signature in database
GPG key ID: D992B3E3DE1DED23
2 changed files with 38 additions and 27 deletions

View file

@ -52,21 +52,30 @@ impl RpxyCache {
if !globals.proxy_config.cache_enabled { if !globals.proxy_config.cache_enabled {
return None; return None;
} }
let cache_dir = globals.proxy_config.cache_dir.as_ref().unwrap(); let cache_dir = match globals.proxy_config.cache_dir.as_ref() {
Some(dir) => dir,
None => {
warn!("Cache directory not set in proxy config");
return None;
}
};
let file_store = FileStore::new(&globals.runtime_handle).await; let file_store = FileStore::new(&globals.runtime_handle).await;
let inner = LruCacheManager::new(globals.proxy_config.cache_max_entry); let inner = LruCacheManager::new(globals.proxy_config.cache_max_entry);
let max_each_size = globals.proxy_config.cache_max_each_size; let max_each_size = globals.proxy_config.cache_max_each_size;
let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory; let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory;
if max_each_size < max_each_size_on_memory { if max_each_size < max_each_size_on_memory {
warn!("Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache"); warn!("Maximum size of on-memory cache per entry must be smaller than or equal to the maximum of each file cache");
max_each_size_on_memory = max_each_size; max_each_size_on_memory = max_each_size;
} }
if let Err(e) = fs::remove_dir_all(cache_dir).await { if let Err(e) = fs::remove_dir_all(cache_dir).await {
warn!("Failed to clean up the cache dir: {e}"); warn!("Failed to clean up the cache dir: {e}");
}; }
fs::create_dir_all(&cache_dir).await.unwrap(); if let Err(e) = fs::create_dir_all(&cache_dir).await {
error!("Failed to create cache dir: {e}");
return None;
}
Some(Self { Some(Self {
file_store, file_store,
@ -256,20 +265,19 @@ impl FileStore {
let inner = self.inner.read().await; let inner = self.inner.read().await;
inner.cnt inner.cnt
} }
/// Create a temporary file cache /// Create a temporary file cache, returns error if file cannot be created or written
async fn create(&mut self, cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> { async fn create(&mut self, cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> {
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
inner.create(cache_object, body_bytes).await inner.create(cache_object, body_bytes).await
} }
/// Evict a temporary file cache /// Evict a temporary file cache, logs warning if removal fails
async fn evict(&self, path: impl AsRef<Path>) { async fn evict(&self, path: impl AsRef<Path>) {
// Acquire the write lock
let mut inner = self.inner.write().await; let mut inner = self.inner.write().await;
if let Err(e) = inner.remove(path).await { if let Err(e) = inner.remove(path).await {
warn!("Eviction failed during file object removal: {:?}", e); warn!("Eviction failed during file object removal: {:?}", e);
}; }
} }
/// Read a temporary file cache /// Read a temporary file cache, returns error if file cannot be opened or hash mismatches
async fn read(&self, path: impl AsRef<Path> + Send + Sync + 'static, hash: &Bytes) -> CacheResult<UnboundedStreamBody> { async fn read(&self, path: impl AsRef<Path> + Send + Sync + 'static, hash: &Bytes) -> CacheResult<UnboundedStreamBody> {
let inner = self.inner.read().await; let inner = self.inner.read().await;
inner.read(path, hash).await inner.read(path, hash).await
@ -305,15 +313,15 @@ impl FileStoreInner {
return Err(CacheError::InvalidCacheTarget); return Err(CacheError::InvalidCacheTarget);
} }
}; };
let Ok(mut file) = File::create(&cache_filepath).await else { let mut file = File::create(&cache_filepath)
return Err(CacheError::FailedToCreateFileCache); .await
}; .map_err(|_| CacheError::FailedToCreateFileCache)?;
let mut bytes_clone = body_bytes.clone(); let mut bytes_clone = body_bytes.clone();
while bytes_clone.has_remaining() { while bytes_clone.has_remaining() {
if let Err(e) = file.write_buf(&mut bytes_clone).await { file.write_buf(&mut bytes_clone).await.map_err(|e| {
error!("Failed to write file cache: {e}"); error!("Failed to write file cache: {e}");
return Err(CacheError::FailedToWriteFileCache); CacheError::FailedToWriteFileCache
}; })?;
} }
self.cnt += 1; self.cnt += 1;
Ok(()) Ok(())
@ -435,11 +443,14 @@ impl LruCacheManager {
self.cnt.load(Ordering::Relaxed) self.cnt.load(Ordering::Relaxed)
} }
/// Evict an entry /// Evict an entry from the LRU cache, logs error if mutex cannot be acquired
fn evict(&self, cache_key: &str) -> Option<(String, CacheObject)> { fn evict(&self, cache_key: &str) -> Option<(String, CacheObject)> {
let Ok(mut lock) = self.inner.lock() else { let mut lock = match self.inner.lock() {
error!("Mutex can't be locked to evict a cache entry"); Ok(lock) => lock,
return None; Err(_) => {
error!("Mutex can't be locked to evict a cache entry");
return None;
}
}; };
let res = lock.pop_entry(cache_key); let res = lock.pop_entry(cache_key);
// This may be inconsistent with the actual number of entries // This may be inconsistent with the actual number of entries
@ -447,7 +458,7 @@ impl LruCacheManager {
res res
} }
/// Push an entry /// Push an entry into the LRU cache, returns error if mutex cannot be acquired
fn push(&self, cache_key: &str, cache_object: &CacheObject) -> CacheResult<Option<(String, CacheObject)>> { fn push(&self, cache_key: &str, cache_object: &CacheObject) -> CacheResult<Option<(String, CacheObject)>> {
let mut lock = self.inner.lock().map_err(|_| { let mut lock = self.inner.lock().map_err(|_| {
error!("Failed to acquire mutex lock for writing cache entry"); error!("Failed to acquire mutex lock for writing cache entry");
@ -459,7 +470,7 @@ impl LruCacheManager {
res res
} }
/// Get an entry /// Get an entry from the LRU cache, returns error if mutex cannot be acquired
fn get(&self, cache_key: &str) -> CacheResult<Option<CacheObject>> { fn get(&self, cache_key: &str) -> CacheResult<Option<CacheObject>> {
let mut lock = self.inner.lock().map_err(|_| { let mut lock = self.inner.lock().map_err(|_| {
error!("Mutex can't be locked for checking cache entry"); error!("Mutex can't be locked for checking cache entry");

View file

@ -126,9 +126,9 @@ where
warn!( warn!(
" "
-------------------------------------------------------------------------------------------------- --------------------------------------------------------------------------------------------------
Request forwarder is working without TLS support!!! Request forwarder is working without TLS support!
We recommend to use this just for testing. This mode is intended for testing only.
Please enable native-tls-backend or rustls-backend feature to enable TLS support. Enable 'native-tls-backend' or 'rustls-backend' feature for TLS support.
--------------------------------------------------------------------------------------------------" --------------------------------------------------------------------------------------------------"
); );
let executor = LocalExecutor::new(_globals.runtime_handle.clone()); let executor = LocalExecutor::new(_globals.runtime_handle.clone());
@ -159,7 +159,7 @@ where
/// Build forwarder /// Build forwarder
pub async fn try_new(_globals: &Arc<Globals>) -> RpxyResult<Self> { pub async fn try_new(_globals: &Arc<Globals>) -> RpxyResult<Self> {
// build hyper client with hyper-tls // build hyper client with hyper-tls
info!("Native TLS support is enabled for the connection to backend applications"); info!("Native TLS support enabled for backend connections (native-tls)");
let executor = LocalExecutor::new(_globals.runtime_handle.clone()); let executor = LocalExecutor::new(_globals.runtime_handle.clone());
let try_build_connector = |alpns: &[&str]| { let try_build_connector = |alpns: &[&str]| {
@ -209,14 +209,14 @@ where
#[cfg(feature = "webpki-roots")] #[cfg(feature = "webpki-roots")]
let builder_h2 = hyper_rustls::HttpsConnectorBuilder::new().with_webpki_roots(); let builder_h2 = hyper_rustls::HttpsConnectorBuilder::new().with_webpki_roots();
#[cfg(feature = "webpki-roots")] #[cfg(feature = "webpki-roots")]
info!("Mozilla WebPKI root certs with rustls is used for the connection to backend applications"); info!("Rustls backend: Mozilla WebPKI root certs used for backend connections");
#[cfg(not(feature = "webpki-roots"))] #[cfg(not(feature = "webpki-roots"))]
let builder = hyper_rustls::HttpsConnectorBuilder::new().with_platform_verifier(); let builder = hyper_rustls::HttpsConnectorBuilder::new().with_platform_verifier();
#[cfg(not(feature = "webpki-roots"))] #[cfg(not(feature = "webpki-roots"))]
let builder_h2 = hyper_rustls::HttpsConnectorBuilder::new().with_platform_verifier(); let builder_h2 = hyper_rustls::HttpsConnectorBuilder::new().with_platform_verifier();
#[cfg(not(feature = "webpki-roots"))] #[cfg(not(feature = "webpki-roots"))]
info!("Platform verifier with rustls is used for the connection to backend applications"); info!("Rustls backend: Platform verifier used for backend connections");
let mut http = HttpConnector::new(); let mut http = HttpConnector::new();
http.enforce_http(false); http.enforce_http(false);