wip: feat: update h3 response reader to use async stream
This commit is contained in:
parent
ed33c5d4f1
commit
cc48394e73
4 changed files with 145 additions and 8 deletions
|
|
@ -97,6 +97,18 @@ pub enum RpxyError {
|
||||||
#[error("Failed to acquire mutex lock for cache")]
|
#[error("Failed to acquire mutex lock for cache")]
|
||||||
FailedToAcquiredMutexLockForCache,
|
FailedToAcquiredMutexLockForCache,
|
||||||
|
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
#[error("Failed to create file cache")]
|
||||||
|
FailedToCreateFileCache,
|
||||||
|
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
#[error("Failed to write file cache")]
|
||||||
|
FailedToWriteFileCache,
|
||||||
|
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
#[error("Failed to open cache file")]
|
||||||
|
FailedToOpenCacheFile,
|
||||||
|
|
||||||
// Upstream connection setting errors
|
// Upstream connection setting errors
|
||||||
#[error("Unsupported upstream option")]
|
#[error("Unsupported upstream option")]
|
||||||
UnsupportedUpstreamOption,
|
UnsupportedUpstreamOption,
|
||||||
|
|
|
||||||
|
|
@ -1,15 +1,22 @@
|
||||||
use crate::{error::*, globals::Globals, log::*};
|
use crate::{error::*, globals::Globals, log::*};
|
||||||
|
use bytes::{Buf, Bytes, BytesMut};
|
||||||
use http::{Request, Response};
|
use http::{Request, Response};
|
||||||
|
use http_body_util::StreamBody;
|
||||||
use http_cache_semantics::CachePolicy;
|
use http_cache_semantics::CachePolicy;
|
||||||
use lru::LruCache;
|
use lru::LruCache;
|
||||||
use std::{
|
use std::{
|
||||||
|
convert::Infallible,
|
||||||
path::{Path, PathBuf},
|
path::{Path, PathBuf},
|
||||||
sync::{
|
sync::{
|
||||||
atomic::{AtomicUsize, Ordering},
|
atomic::{AtomicUsize, Ordering},
|
||||||
Arc, Mutex,
|
Arc, Mutex,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
use tokio::{fs, sync::RwLock};
|
use tokio::{
|
||||||
|
fs::{self, File},
|
||||||
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
|
sync::RwLock,
|
||||||
|
};
|
||||||
|
|
||||||
/* ---------------------------------------------- */
|
/* ---------------------------------------------- */
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug)]
|
||||||
|
|
@ -54,6 +61,14 @@ impl RpxyCache {
|
||||||
max_each_size_on_memory,
|
max_each_size_on_memory,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Count cache entries
|
||||||
|
pub async fn count(&self) -> (usize, usize, usize) {
|
||||||
|
let total = self.inner.count();
|
||||||
|
let file = self.file_store.count().await;
|
||||||
|
let on_memory = total - file;
|
||||||
|
(total, on_memory, file)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ---------------------------------------------- */
|
/* ---------------------------------------------- */
|
||||||
|
|
@ -71,6 +86,32 @@ impl FileStore {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl FileStore {
|
||||||
|
/// Count file cache entries
|
||||||
|
async fn count(&self) -> usize {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
inner.cnt
|
||||||
|
}
|
||||||
|
/// Create a temporary file cache
|
||||||
|
async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> RpxyResult<CacheFileOrOnMemory> {
|
||||||
|
let mut inner = self.inner.write().await;
|
||||||
|
inner.create(cache_filename, body_bytes).await
|
||||||
|
}
|
||||||
|
// /// Evict a temporary file cache
|
||||||
|
// async fn evict(&self, path: impl AsRef<Path>) {
|
||||||
|
// // Acquire the write lock
|
||||||
|
// let mut inner = self.inner.write().await;
|
||||||
|
// if let Err(e) = inner.remove(path).await {
|
||||||
|
// warn!("Eviction failed during file object removal: {:?}", e);
|
||||||
|
// };
|
||||||
|
// }
|
||||||
|
// /// Read a temporary file cache
|
||||||
|
// async fn read(&self, path: impl AsRef<Path>) -> RpxyResult<Bytes> {
|
||||||
|
// let inner = self.inner.read().await;
|
||||||
|
// inner.read(&path).await
|
||||||
|
// }
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
/// Manager inner for cache on file system
|
/// Manager inner for cache on file system
|
||||||
struct FileStoreInner {
|
struct FileStoreInner {
|
||||||
|
|
@ -98,6 +139,67 @@ impl FileStoreInner {
|
||||||
runtime_handle: runtime_handle.clone(),
|
runtime_handle: runtime_handle.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Create a new temporary file cache
|
||||||
|
async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> RpxyResult<CacheFileOrOnMemory> {
|
||||||
|
let cache_filepath = self.cache_dir.join(cache_filename);
|
||||||
|
let Ok(mut file) = File::create(&cache_filepath).await else {
|
||||||
|
return Err(RpxyError::FailedToCreateFileCache);
|
||||||
|
};
|
||||||
|
let mut bytes_clone = body_bytes.clone();
|
||||||
|
while bytes_clone.has_remaining() {
|
||||||
|
if let Err(e) = file.write_buf(&mut bytes_clone).await {
|
||||||
|
error!("Failed to write file cache: {e}");
|
||||||
|
return Err(RpxyError::FailedToWriteFileCache);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
self.cnt += 1;
|
||||||
|
Ok(CacheFileOrOnMemory::File(cache_filepath))
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Retrieve a stored temporary file cache
|
||||||
|
async fn read(&self, path: impl AsRef<Path>) -> RpxyResult<()> {
|
||||||
|
let Ok(mut file) = File::open(&path).await else {
|
||||||
|
warn!("Cache file object cannot be opened");
|
||||||
|
return Err(RpxyError::FailedToOpenCacheFile);
|
||||||
|
};
|
||||||
|
|
||||||
|
/* ----------------------------- */
|
||||||
|
// PoC for streaming body
|
||||||
|
use futures::channel::mpsc;
|
||||||
|
let (tx, rx) = mpsc::unbounded::<Result<hyper::body::Frame<bytes::Bytes>, Infallible>>();
|
||||||
|
|
||||||
|
// let (body_sender, res_body) = Body::channel();
|
||||||
|
self.runtime_handle.spawn(async move {
|
||||||
|
// let mut sender = body_sender;
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
loop {
|
||||||
|
match file.read_buf(&mut buf).await {
|
||||||
|
Ok(0) => break,
|
||||||
|
Ok(_) => tx
|
||||||
|
.unbounded_send(Ok(hyper::body::Frame::data(buf.copy_to_bytes(buf.remaining()))))
|
||||||
|
.map_err(|e| anyhow::anyhow!("Failed to read cache file: {e}"))?,
|
||||||
|
//sender.send_data(buf.copy_to_bytes(buf.remaining())).await?,
|
||||||
|
Err(_) => break,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
Ok(()) as anyhow::Result<()>
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut rx = http_body_util::StreamBody::new(rx);
|
||||||
|
// TODO: 結局incominglikeなbodystreamを定義することになる。これだったらh3と合わせて自分で定義した方が良さそう。
|
||||||
|
// typeが長すぎるのでwrapperを作った方がいい。
|
||||||
|
// let response = Response::builder()
|
||||||
|
// .status(200)
|
||||||
|
// .header("content-type", "application/octet-stream")
|
||||||
|
// .body(rx)
|
||||||
|
// .unwrap();
|
||||||
|
|
||||||
|
todo!()
|
||||||
|
/* ----------------------------- */
|
||||||
|
|
||||||
|
// Ok(res_body)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/* ---------------------------------------------- */
|
/* ---------------------------------------------- */
|
||||||
|
|
|
||||||
|
|
@ -75,9 +75,12 @@ where
|
||||||
};
|
};
|
||||||
let (parts, body) = res.unwrap().into_parts();
|
let (parts, body) = res.unwrap().into_parts();
|
||||||
|
|
||||||
|
// TODO: This is inefficient since current strategy needs to copy the whole body onto memory to cache it.
|
||||||
|
// This should be handled by copying buffer simultaneously while forwarding response to downstream.
|
||||||
let Ok(bytes) = body.collect().await.map(|v| v.to_bytes()) else {
|
let Ok(bytes) = body.collect().await.map(|v| v.to_bytes()) else {
|
||||||
return Err(RpxyError::FailedToWriteByteBufferForCache);
|
return Err(RpxyError::FailedToWriteByteBufferForCache);
|
||||||
};
|
};
|
||||||
|
let bytes_clone = bytes.clone();
|
||||||
|
|
||||||
// TODO: this is inefficient. needs to be reconsidered to avoid unnecessary copy and should spawn async task to store cache.
|
// TODO: this is inefficient. needs to be reconsidered to avoid unnecessary copy and should spawn async task to store cache.
|
||||||
// We may need to use the same logic as h3.
|
// We may need to use the same logic as h3.
|
||||||
|
|
|
||||||
|
|
@ -153,20 +153,40 @@ where
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let (new_res_parts, new_body) = res.into_parts();
|
let (new_res_parts, mut new_body) = res.into_parts();
|
||||||
let new_res = Response::from_parts(new_res_parts, ());
|
let new_res = Response::from_parts(new_res_parts, ());
|
||||||
|
|
||||||
match send_stream.send_response(new_res).await {
|
match send_stream.send_response(new_res).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
debug!("HTTP/3 response to connection successful");
|
debug!("HTTP/3 response to connection successful");
|
||||||
// aggregate body without copying
|
loop {
|
||||||
let body_data = new_body
|
let frame = match new_body.frame().await {
|
||||||
.collect()
|
Some(frame) => frame,
|
||||||
.await
|
None => {
|
||||||
|
debug!("Response body finished");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
.map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?;
|
.map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?;
|
||||||
|
|
||||||
// create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes()
|
if frame.is_data() {
|
||||||
send_stream.send_data(body_data.to_bytes()).await?;
|
let data = frame.into_data().unwrap_or_default();
|
||||||
|
debug!("Write data to HTTP/3 stream");
|
||||||
|
send_stream.send_data(data).await?;
|
||||||
|
} else if frame.is_trailers() {
|
||||||
|
let trailers = frame.into_trailers().unwrap_or_default();
|
||||||
|
debug!("Write trailer to HTTP/3 stream");
|
||||||
|
send_stream.send_trailers(trailers).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// // aggregate body without copying
|
||||||
|
// let body_data = new_body
|
||||||
|
// .collect()
|
||||||
|
// .await
|
||||||
|
// .map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?;
|
||||||
|
|
||||||
|
// // create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes()
|
||||||
|
// send_stream.send_data(body_data.to_bytes()).await?;
|
||||||
|
|
||||||
// TODO: needs handling trailer? should be included in body from handler.
|
// TODO: needs handling trailer? should be included in body from handler.
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue