diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 2763d1e..343cf04 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -97,6 +97,18 @@ pub enum RpxyError { #[error("Failed to acquire mutex lock for cache")] FailedToAcquiredMutexLockForCache, + #[cfg(feature = "cache")] + #[error("Failed to create file cache")] + FailedToCreateFileCache, + + #[cfg(feature = "cache")] + #[error("Failed to write file cache")] + FailedToWriteFileCache, + + #[cfg(feature = "cache")] + #[error("Failed to open cache file")] + FailedToOpenCacheFile, + // Upstream connection setting errors #[error("Unsupported upstream option")] UnsupportedUpstreamOption, diff --git a/rpxy-lib/src/forwarder/cache.rs b/rpxy-lib/src/forwarder/cache.rs index ea29a41..03755e6 100644 --- a/rpxy-lib/src/forwarder/cache.rs +++ b/rpxy-lib/src/forwarder/cache.rs @@ -1,15 +1,22 @@ use crate::{error::*, globals::Globals, log::*}; +use bytes::{Buf, Bytes, BytesMut}; use http::{Request, Response}; +use http_body_util::StreamBody; use http_cache_semantics::CachePolicy; use lru::LruCache; use std::{ + convert::Infallible, path::{Path, PathBuf}, sync::{ atomic::{AtomicUsize, Ordering}, Arc, Mutex, }, }; -use tokio::{fs, sync::RwLock}; +use tokio::{ + fs::{self, File}, + io::{AsyncReadExt, AsyncWriteExt}, + sync::RwLock, +}; /* ---------------------------------------------- */ #[derive(Clone, Debug)] @@ -54,6 +61,14 @@ impl RpxyCache { max_each_size_on_memory, }) } + + /// Count cache entries + pub async fn count(&self) -> (usize, usize, usize) { + let total = self.inner.count(); + let file = self.file_store.count().await; + let on_memory = total - file; + (total, on_memory, file) + } } /* ---------------------------------------------- */ @@ -71,6 +86,32 @@ impl FileStore { } } +impl FileStore { + /// Count file cache entries + async fn count(&self) -> usize { + let inner = self.inner.read().await; + inner.cnt + } + /// Create a temporary file cache + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> RpxyResult { + let mut inner = self.inner.write().await; + inner.create(cache_filename, body_bytes).await + } + // /// Evict a temporary file cache + // async fn evict(&self, path: impl AsRef) { + // // Acquire the write lock + // let mut inner = self.inner.write().await; + // if let Err(e) = inner.remove(path).await { + // warn!("Eviction failed during file object removal: {:?}", e); + // }; + // } + // /// Read a temporary file cache + // async fn read(&self, path: impl AsRef) -> RpxyResult { + // let inner = self.inner.read().await; + // inner.read(&path).await + // } +} + #[derive(Debug)] /// Manager inner for cache on file system struct FileStoreInner { @@ -98,6 +139,67 @@ impl FileStoreInner { runtime_handle: runtime_handle.clone(), } } + + /// Create a new temporary file cache + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> RpxyResult { + let cache_filepath = self.cache_dir.join(cache_filename); + let Ok(mut file) = File::create(&cache_filepath).await else { + return Err(RpxyError::FailedToCreateFileCache); + }; + let mut bytes_clone = body_bytes.clone(); + while bytes_clone.has_remaining() { + if let Err(e) = file.write_buf(&mut bytes_clone).await { + error!("Failed to write file cache: {e}"); + return Err(RpxyError::FailedToWriteFileCache); + }; + } + self.cnt += 1; + Ok(CacheFileOrOnMemory::File(cache_filepath)) + } + + /// Retrieve a stored temporary file cache + async fn read(&self, path: impl AsRef) -> RpxyResult<()> { + let Ok(mut file) = File::open(&path).await else { + warn!("Cache file object cannot be opened"); + return Err(RpxyError::FailedToOpenCacheFile); + }; + + /* ----------------------------- */ + // PoC for streaming body + use futures::channel::mpsc; + let (tx, rx) = mpsc::unbounded::, Infallible>>(); + + // let (body_sender, res_body) = Body::channel(); + self.runtime_handle.spawn(async move { + // let mut sender = body_sender; + let mut buf = BytesMut::new(); + loop { + match file.read_buf(&mut buf).await { + Ok(0) => break, + Ok(_) => tx + .unbounded_send(Ok(hyper::body::Frame::data(buf.copy_to_bytes(buf.remaining())))) + .map_err(|e| anyhow::anyhow!("Failed to read cache file: {e}"))?, + //sender.send_data(buf.copy_to_bytes(buf.remaining())).await?, + Err(_) => break, + }; + } + Ok(()) as anyhow::Result<()> + }); + + let mut rx = http_body_util::StreamBody::new(rx); + // TODO: 結局incominglikeなbodystreamを定義することになる。これだったらh3と合わせて自分で定義した方が良さそう。 + // typeが長すぎるのでwrapperを作った方がいい。 + // let response = Response::builder() + // .status(200) + // .header("content-type", "application/octet-stream") + // .body(rx) + // .unwrap(); + + todo!() + /* ----------------------------- */ + + // Ok(res_body) + } } /* ---------------------------------------------- */ diff --git a/rpxy-lib/src/forwarder/client.rs b/rpxy-lib/src/forwarder/client.rs index 5718f2e..c6c6218 100644 --- a/rpxy-lib/src/forwarder/client.rs +++ b/rpxy-lib/src/forwarder/client.rs @@ -75,9 +75,12 @@ where }; let (parts, body) = res.unwrap().into_parts(); + // TODO: This is inefficient since current strategy needs to copy the whole body onto memory to cache it. + // This should be handled by copying buffer simultaneously while forwarding response to downstream. let Ok(bytes) = body.collect().await.map(|v| v.to_bytes()) else { return Err(RpxyError::FailedToWriteByteBufferForCache); }; + let bytes_clone = bytes.clone(); // TODO: this is inefficient. needs to be reconsidered to avoid unnecessary copy and should spawn async task to store cache. // We may need to use the same logic as h3. diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index d194b1f..1846d67 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -153,20 +153,40 @@ where ) .await?; - let (new_res_parts, new_body) = res.into_parts(); + let (new_res_parts, mut new_body) = res.into_parts(); let new_res = Response::from_parts(new_res_parts, ()); match send_stream.send_response(new_res).await { Ok(_) => { debug!("HTTP/3 response to connection successful"); - // aggregate body without copying - let body_data = new_body - .collect() - .await + loop { + let frame = match new_body.frame().await { + Some(frame) => frame, + None => { + debug!("Response body finished"); + break; + } + } .map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?; - // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes() - send_stream.send_data(body_data.to_bytes()).await?; + if frame.is_data() { + let data = frame.into_data().unwrap_or_default(); + debug!("Write data to HTTP/3 stream"); + send_stream.send_data(data).await?; + } else if frame.is_trailers() { + let trailers = frame.into_trailers().unwrap_or_default(); + debug!("Write trailer to HTTP/3 stream"); + send_stream.send_trailers(trailers).await?; + } + } + // // aggregate body without copying + // let body_data = new_body + // .collect() + // .await + // .map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?; + + // // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes() + // send_stream.send_data(body_data.to_bytes()).await?; // TODO: needs handling trailer? should be included in body from handler. }