Merge pull request #127 from junkurihara/feat/cache-hyper-1.0
feat: cache hyper 1.0 with stream body directly attached to response stream
This commit is contained in:
commit
66efa932f0
20 changed files with 894 additions and 121 deletions
|
|
@ -24,7 +24,7 @@ http3-s2n = [
|
||||||
sticky-cookie = ["base64", "sha2", "chrono"]
|
sticky-cookie = ["base64", "sha2", "chrono"]
|
||||||
native-tls-backend = ["hyper-tls"]
|
native-tls-backend = ["hyper-tls"]
|
||||||
rustls-backend = []
|
rustls-backend = []
|
||||||
cache = [] #"http-cache-semantics", "lru"]
|
cache = ["http-cache-semantics", "lru", "sha2", "base64"]
|
||||||
native-roots = [] #"hyper-rustls/native-tokio"]
|
native-roots = [] #"hyper-rustls/native-tokio"]
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
|
@ -41,6 +41,7 @@ tokio = { version = "1.34.0", default-features = false, features = [
|
||||||
"macros",
|
"macros",
|
||||||
"fs",
|
"fs",
|
||||||
] }
|
] }
|
||||||
|
pin-project-lite = "0.2.13"
|
||||||
async-trait = "0.1.74"
|
async-trait = "0.1.74"
|
||||||
|
|
||||||
# Error handling
|
# Error handling
|
||||||
|
|
@ -66,7 +67,7 @@ hyper-tls = { version = "0.6.0", features = ["alpn"], optional = true }
|
||||||
|
|
||||||
# tls and cert management for server
|
# tls and cert management for server
|
||||||
hot_reload = "0.1.4"
|
hot_reload = "0.1.4"
|
||||||
rustls = { version = "0.21.9", default-features = false }
|
rustls = { version = "0.21.10", default-features = false }
|
||||||
tokio-rustls = { version = "0.24.1", features = ["early-data"] }
|
tokio-rustls = { version = "0.24.1", features = ["early-data"] }
|
||||||
webpki = "0.22.4"
|
webpki = "0.22.4"
|
||||||
x509-parser = "0.15.1"
|
x509-parser = "0.15.1"
|
||||||
|
|
@ -87,9 +88,10 @@ s2n-quic-rustls = { version = "0.32.0", optional = true }
|
||||||
# for UDP socket wit SO_REUSEADDR when h3 with quinn
|
# for UDP socket wit SO_REUSEADDR when h3 with quinn
|
||||||
socket2 = { version = "0.5.5", features = ["all"], optional = true }
|
socket2 = { version = "0.5.5", features = ["all"], optional = true }
|
||||||
|
|
||||||
# # cache
|
# cache
|
||||||
# http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true }
|
http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true }
|
||||||
# lru = { version = "0.12.1", optional = true }
|
lru = { version = "0.12.1", optional = true }
|
||||||
|
sha2 = { version = "0.10.8", default-features = false, optional = true }
|
||||||
|
|
||||||
# cookie handling for sticky cookie
|
# cookie handling for sticky cookie
|
||||||
chrono = { version = "0.4.31", default-features = false, features = [
|
chrono = { version = "0.4.31", default-features = false, features = [
|
||||||
|
|
@ -98,7 +100,6 @@ chrono = { version = "0.4.31", default-features = false, features = [
|
||||||
"clock",
|
"clock",
|
||||||
], optional = true }
|
], optional = true }
|
||||||
base64 = { version = "0.21.5", optional = true }
|
base64 = { version = "0.21.5", optional = true }
|
||||||
sha2 = { version = "0.10.8", default-features = false, optional = true }
|
|
||||||
|
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
|
|
|
||||||
|
|
@ -3,10 +3,11 @@ mod load_balance;
|
||||||
mod upstream;
|
mod upstream;
|
||||||
mod upstream_opts;
|
mod upstream_opts;
|
||||||
|
|
||||||
// #[cfg(feature = "sticky-cookie")]
|
#[cfg(feature = "sticky-cookie")]
|
||||||
// pub use self::load_balance::{StickyCookie, StickyCookieValue};
|
pub(crate) use self::load_balance::{StickyCookie, StickyCookieValue};
|
||||||
|
#[allow(unused)]
|
||||||
pub(crate) use self::{
|
pub(crate) use self::{
|
||||||
load_balance::{LoadBalance, LoadBalanceContext, StickyCookie, StickyCookieValue},
|
load_balance::{LoadBalance, LoadBalanceContext},
|
||||||
upstream::{PathManager, Upstream, UpstreamCandidates},
|
upstream::{PathManager, Upstream, UpstreamCandidates},
|
||||||
upstream_opts::UpstreamOption,
|
upstream_opts::UpstreamOption,
|
||||||
};
|
};
|
||||||
|
|
|
||||||
|
|
@ -4,8 +4,8 @@ pub const RESPONSE_HEADER_SERVER: &str = "rpxy";
|
||||||
pub const TCP_LISTEN_BACKLOG: u32 = 1024;
|
pub const TCP_LISTEN_BACKLOG: u32 = 1024;
|
||||||
// pub const HTTP_LISTEN_PORT: u16 = 8080;
|
// pub const HTTP_LISTEN_PORT: u16 = 8080;
|
||||||
// pub const HTTPS_LISTEN_PORT: u16 = 8443;
|
// pub const HTTPS_LISTEN_PORT: u16 = 8443;
|
||||||
pub const PROXY_TIMEOUT_SEC: u64 = 60;
|
pub const PROXY_IDLE_TIMEOUT_SEC: u64 = 20;
|
||||||
pub const UPSTREAM_TIMEOUT_SEC: u64 = 60;
|
pub const UPSTREAM_IDLE_TIMEOUT_SEC: u64 = 20;
|
||||||
pub const TLS_HANDSHAKE_TIMEOUT_SEC: u64 = 15; // default as with firefox browser
|
pub const TLS_HANDSHAKE_TIMEOUT_SEC: u64 = 15; // default as with firefox browser
|
||||||
pub const MAX_CLIENTS: usize = 512;
|
pub const MAX_CLIENTS: usize = 512;
|
||||||
pub const MAX_CONCURRENT_STREAMS: u32 = 64;
|
pub const MAX_CONCURRENT_STREAMS: u32 = 64;
|
||||||
|
|
|
||||||
|
|
@ -28,6 +28,8 @@ pub enum RpxyError {
|
||||||
HyperIncomingLikeNewClosed,
|
HyperIncomingLikeNewClosed,
|
||||||
#[error("New body write aborted")]
|
#[error("New body write aborted")]
|
||||||
HyperNewBodyWriteAborted,
|
HyperNewBodyWriteAborted,
|
||||||
|
#[error("Hyper error in serving request or response body type: {0}")]
|
||||||
|
HyperBodyError(#[from] hyper::Error),
|
||||||
|
|
||||||
// http/3 errors
|
// http/3 errors
|
||||||
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
|
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
|
||||||
|
|
@ -88,6 +90,11 @@ pub enum RpxyError {
|
||||||
#[error("Unsupported upstream option")]
|
#[error("Unsupported upstream option")]
|
||||||
UnsupportedUpstreamOption,
|
UnsupportedUpstreamOption,
|
||||||
|
|
||||||
|
// Cache error map
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
#[error("Cache error: {0}")]
|
||||||
|
CacheError(#[from] crate::forwarder::CacheError),
|
||||||
|
|
||||||
// Others
|
// Others
|
||||||
#[error("Infallible")]
|
#[error("Infallible")]
|
||||||
Infallible(#[from] std::convert::Infallible),
|
Infallible(#[from] std::convert::Infallible),
|
||||||
|
|
|
||||||
50
rpxy-lib/src/forwarder/cache/cache_error.rs
vendored
Normal file
50
rpxy-lib/src/forwarder/cache/cache_error.rs
vendored
Normal file
|
|
@ -0,0 +1,50 @@
|
||||||
|
use thiserror::Error;
|
||||||
|
|
||||||
|
pub type CacheResult<T> = std::result::Result<T, CacheError>;
|
||||||
|
|
||||||
|
/// Describes things that can go wrong in the Rpxy
|
||||||
|
#[derive(Debug, Error)]
|
||||||
|
pub enum CacheError {
|
||||||
|
// Cache errors,
|
||||||
|
#[error("Invalid null request and/or response")]
|
||||||
|
NullRequestOrResponse,
|
||||||
|
|
||||||
|
#[error("Failed to write byte buffer")]
|
||||||
|
FailedToWriteByteBufferForCache,
|
||||||
|
|
||||||
|
#[error("Failed to acquire mutex lock for cache")]
|
||||||
|
FailedToAcquiredMutexLockForCache,
|
||||||
|
|
||||||
|
#[error("Failed to acquire mutex lock for check")]
|
||||||
|
FailedToAcquiredMutexLockForCheck,
|
||||||
|
|
||||||
|
#[error("Failed to create file cache")]
|
||||||
|
FailedToCreateFileCache,
|
||||||
|
|
||||||
|
#[error("Failed to write file cache")]
|
||||||
|
FailedToWriteFileCache,
|
||||||
|
|
||||||
|
#[error("Failed to open cache file")]
|
||||||
|
FailedToOpenCacheFile,
|
||||||
|
|
||||||
|
#[error("Too large to cache")]
|
||||||
|
TooLargeToCache,
|
||||||
|
|
||||||
|
#[error("Failed to cache bytes: {0}")]
|
||||||
|
FailedToCacheBytes(String),
|
||||||
|
|
||||||
|
#[error("Failed to send frame to cache {0}")]
|
||||||
|
FailedToSendFrameToCache(String),
|
||||||
|
|
||||||
|
#[error("Failed to send frame from file cache {0}")]
|
||||||
|
FailedToSendFrameFromCache(String),
|
||||||
|
|
||||||
|
#[error("Failed to remove cache file: {0}")]
|
||||||
|
FailedToRemoveCacheFile(String),
|
||||||
|
|
||||||
|
#[error("Invalid cache target")]
|
||||||
|
InvalidCacheTarget,
|
||||||
|
|
||||||
|
#[error("Hash mismatched in cache file")]
|
||||||
|
HashMismatchedInCacheFile,
|
||||||
|
}
|
||||||
523
rpxy-lib/src/forwarder/cache/cache_main.rs
vendored
Normal file
523
rpxy-lib/src/forwarder/cache/cache_main.rs
vendored
Normal file
|
|
@ -0,0 +1,523 @@
|
||||||
|
use super::cache_error::*;
|
||||||
|
use crate::{
|
||||||
|
globals::Globals,
|
||||||
|
hyper_ext::body::{full, BoxBody, ResponseBody, UnboundedStreamBody},
|
||||||
|
log::*,
|
||||||
|
};
|
||||||
|
use base64::{engine::general_purpose, Engine as _};
|
||||||
|
use bytes::{Buf, Bytes, BytesMut};
|
||||||
|
use futures::channel::mpsc;
|
||||||
|
use http::{Request, Response, Uri};
|
||||||
|
use http_body_util::{BodyExt, StreamBody};
|
||||||
|
use http_cache_semantics::CachePolicy;
|
||||||
|
use hyper::body::{Frame, Incoming};
|
||||||
|
use lru::LruCache;
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
use std::{
|
||||||
|
path::{Path, PathBuf},
|
||||||
|
sync::{
|
||||||
|
atomic::{AtomicUsize, Ordering},
|
||||||
|
Arc, Mutex,
|
||||||
|
},
|
||||||
|
time::SystemTime,
|
||||||
|
};
|
||||||
|
use tokio::{
|
||||||
|
fs::{self, File},
|
||||||
|
io::{AsyncReadExt, AsyncWriteExt},
|
||||||
|
sync::RwLock,
|
||||||
|
};
|
||||||
|
|
||||||
|
/* ---------------------------------------------- */
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
/// Cache main manager
|
||||||
|
pub struct RpxyCache {
|
||||||
|
/// Inner lru cache manager storing http message caching policy
|
||||||
|
inner: LruCacheManager,
|
||||||
|
/// Managing cache file objects through RwLock's lock mechanism for file lock
|
||||||
|
file_store: FileStore,
|
||||||
|
/// Async runtime
|
||||||
|
runtime_handle: tokio::runtime::Handle,
|
||||||
|
/// Maximum size of each cache file object
|
||||||
|
max_each_size: usize,
|
||||||
|
/// Maximum size of cache object on memory
|
||||||
|
max_each_size_on_memory: usize,
|
||||||
|
/// Cache directory path
|
||||||
|
cache_dir: PathBuf,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RpxyCache {
|
||||||
|
/// Generate cache storage
|
||||||
|
pub async fn new(globals: &Globals) -> Option<Self> {
|
||||||
|
if !globals.proxy_config.cache_enabled {
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
let cache_dir = globals.proxy_config.cache_dir.as_ref().unwrap();
|
||||||
|
let file_store = FileStore::new(&globals.runtime_handle).await;
|
||||||
|
let inner = LruCacheManager::new(globals.proxy_config.cache_max_entry);
|
||||||
|
|
||||||
|
let max_each_size = globals.proxy_config.cache_max_each_size;
|
||||||
|
let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory;
|
||||||
|
if max_each_size < max_each_size_on_memory {
|
||||||
|
warn!(
|
||||||
|
"Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache"
|
||||||
|
);
|
||||||
|
max_each_size_on_memory = max_each_size;
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Err(e) = fs::remove_dir_all(cache_dir).await {
|
||||||
|
warn!("Failed to clean up the cache dir: {e}");
|
||||||
|
};
|
||||||
|
fs::create_dir_all(&cache_dir).await.unwrap();
|
||||||
|
|
||||||
|
Some(Self {
|
||||||
|
file_store,
|
||||||
|
inner,
|
||||||
|
runtime_handle: globals.runtime_handle.clone(),
|
||||||
|
max_each_size,
|
||||||
|
max_each_size_on_memory,
|
||||||
|
cache_dir: cache_dir.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Count cache entries
|
||||||
|
pub async fn count(&self) -> (usize, usize, usize) {
|
||||||
|
let total = self.inner.count();
|
||||||
|
let file = self.file_store.count().await;
|
||||||
|
let on_memory = total - file;
|
||||||
|
(total, on_memory, file)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Put response into the cache
|
||||||
|
pub async fn put(
|
||||||
|
&self,
|
||||||
|
uri: &hyper::Uri,
|
||||||
|
mut body: Incoming,
|
||||||
|
policy: &CachePolicy,
|
||||||
|
) -> CacheResult<UnboundedStreamBody> {
|
||||||
|
let cache_manager = self.inner.clone();
|
||||||
|
let mut file_store = self.file_store.clone();
|
||||||
|
let uri = uri.clone();
|
||||||
|
let policy_clone = policy.clone();
|
||||||
|
let max_each_size = self.max_each_size;
|
||||||
|
let max_each_size_on_memory = self.max_each_size_on_memory;
|
||||||
|
let cache_dir = self.cache_dir.clone();
|
||||||
|
|
||||||
|
let (body_tx, body_rx) = mpsc::unbounded::<Result<Frame<Bytes>, hyper::Error>>();
|
||||||
|
|
||||||
|
self.runtime_handle.spawn(async move {
|
||||||
|
let mut size = 0usize;
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let frame = match body.frame().await {
|
||||||
|
Some(frame) => frame,
|
||||||
|
None => {
|
||||||
|
debug!("Response body finished");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let frame_size = frame.as_ref().map(|f| {
|
||||||
|
if f.is_data() {
|
||||||
|
f.data_ref().map(|bytes| bytes.remaining()).unwrap_or_default()
|
||||||
|
} else {
|
||||||
|
0
|
||||||
|
}
|
||||||
|
});
|
||||||
|
size += frame_size.unwrap_or_default();
|
||||||
|
|
||||||
|
// check size
|
||||||
|
if size > max_each_size {
|
||||||
|
warn!("Too large to cache");
|
||||||
|
return Err(CacheError::TooLargeToCache);
|
||||||
|
}
|
||||||
|
frame
|
||||||
|
.as_ref()
|
||||||
|
.map(|f| {
|
||||||
|
if f.is_data() {
|
||||||
|
let data_bytes = f.data_ref().unwrap().clone();
|
||||||
|
debug!("cache data bytes of {} bytes", data_bytes.len());
|
||||||
|
// We do not use stream-type buffering since it needs to lock file during operation.
|
||||||
|
buf.extend(data_bytes.as_ref());
|
||||||
|
}
|
||||||
|
})
|
||||||
|
.map_err(|e| CacheError::FailedToCacheBytes(e.to_string()))?;
|
||||||
|
|
||||||
|
// send data to use response downstream
|
||||||
|
body_tx
|
||||||
|
.unbounded_send(frame)
|
||||||
|
.map_err(|e| CacheError::FailedToSendFrameToCache(e.to_string()))?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let buf = buf.freeze();
|
||||||
|
// Calculate hash of the cached data, after all data is received.
|
||||||
|
// In-operation calculation is possible but it blocks sending data.
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(buf.as_ref());
|
||||||
|
let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref());
|
||||||
|
debug!("Cached data: {} bytes, hash = {:?}", size, hash_bytes);
|
||||||
|
|
||||||
|
// Create cache object
|
||||||
|
let cache_key = derive_cache_key_from_uri(&uri);
|
||||||
|
let cache_object = CacheObject {
|
||||||
|
policy: policy_clone,
|
||||||
|
target: CacheFileOrOnMemory::build(&cache_dir, &uri, &buf, max_each_size_on_memory),
|
||||||
|
hash: hash_bytes,
|
||||||
|
};
|
||||||
|
|
||||||
|
if let Some((k, v)) = cache_manager.push(&cache_key, &cache_object)? {
|
||||||
|
if k != cache_key {
|
||||||
|
info!("Over the cache capacity. Evict least recent used entry");
|
||||||
|
if let CacheFileOrOnMemory::File(path) = v.target {
|
||||||
|
file_store.evict(&path).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// store cache object to file
|
||||||
|
if let CacheFileOrOnMemory::File(_) = cache_object.target {
|
||||||
|
file_store.create(&cache_object, &buf).await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(()) as CacheResult<()>
|
||||||
|
});
|
||||||
|
|
||||||
|
let stream_body = StreamBody::new(body_rx);
|
||||||
|
|
||||||
|
Ok(stream_body)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get cached response
|
||||||
|
pub async fn get<R>(&self, req: &Request<R>) -> Option<Response<ResponseBody>> {
|
||||||
|
debug!(
|
||||||
|
"Current cache status: (total, on-memory, file) = {:?}",
|
||||||
|
self.count().await
|
||||||
|
);
|
||||||
|
let cache_key = derive_cache_key_from_uri(req.uri());
|
||||||
|
|
||||||
|
// First check cache chance
|
||||||
|
let Ok(Some(cached_object)) = self.inner.get(&cache_key) else {
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Secondly check the cache freshness as an HTTP message
|
||||||
|
let now = SystemTime::now();
|
||||||
|
let http_cache_semantics::BeforeRequest::Fresh(res_parts) = cached_object.policy.before_request(req, now) else {
|
||||||
|
// Evict stale cache entry.
|
||||||
|
// This might be okay to keep as is since it would be updated later.
|
||||||
|
// However, there is no guarantee that newly got objects will be still cacheable.
|
||||||
|
// So, we have to evict stale cache entries and cache file objects if found.
|
||||||
|
debug!("Stale cache entry: {cache_key}");
|
||||||
|
let _evicted_entry = self.inner.evict(&cache_key);
|
||||||
|
// For cache file
|
||||||
|
if let CacheFileOrOnMemory::File(path) = &cached_object.target {
|
||||||
|
self.file_store.evict(&path).await;
|
||||||
|
}
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
|
||||||
|
// Finally retrieve the file/on-memory object
|
||||||
|
let response_body = match cached_object.target {
|
||||||
|
CacheFileOrOnMemory::File(path) => {
|
||||||
|
let stream_body = match self.file_store.read(path.clone(), &cached_object.hash).await {
|
||||||
|
Ok(s) => s,
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Failed to read from file cache: {e}");
|
||||||
|
let _evicted_entry = self.inner.evict(&cache_key);
|
||||||
|
self.file_store.evict(path).await;
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
debug!("Cache hit from file: {cache_key}");
|
||||||
|
ResponseBody::Streamed(stream_body)
|
||||||
|
}
|
||||||
|
CacheFileOrOnMemory::OnMemory(object) => {
|
||||||
|
debug!("Cache hit from on memory: {cache_key}");
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(object.as_ref());
|
||||||
|
let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref());
|
||||||
|
if hash_bytes != cached_object.hash {
|
||||||
|
warn!("Hash mismatched. Cache object is corrupted");
|
||||||
|
let _evicted_entry = self.inner.evict(&cache_key);
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
ResponseBody::Boxed(BoxBody::new(full(object)))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Some(Response::from_parts(res_parts, response_body))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ---------------------------------------------- */
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
/// Cache file manager outer that is responsible to handle `RwLock`
|
||||||
|
struct FileStore {
|
||||||
|
/// Inner file store main object
|
||||||
|
inner: Arc<RwLock<FileStoreInner>>,
|
||||||
|
}
|
||||||
|
impl FileStore {
|
||||||
|
/// Build manager
|
||||||
|
async fn new(runtime_handle: &tokio::runtime::Handle) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Arc::new(RwLock::new(FileStoreInner::new(runtime_handle).await)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Count file cache entries
|
||||||
|
async fn count(&self) -> usize {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
inner.cnt
|
||||||
|
}
|
||||||
|
/// Create a temporary file cache
|
||||||
|
async fn create(&mut self, cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> {
|
||||||
|
let mut inner = self.inner.write().await;
|
||||||
|
inner.create(cache_object, body_bytes).await
|
||||||
|
}
|
||||||
|
/// Evict a temporary file cache
|
||||||
|
async fn evict(&self, path: impl AsRef<Path>) {
|
||||||
|
// Acquire the write lock
|
||||||
|
let mut inner = self.inner.write().await;
|
||||||
|
if let Err(e) = inner.remove(path).await {
|
||||||
|
warn!("Eviction failed during file object removal: {:?}", e);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
/// Read a temporary file cache
|
||||||
|
async fn read(
|
||||||
|
&self,
|
||||||
|
path: impl AsRef<Path> + Send + Sync + 'static,
|
||||||
|
hash: &Bytes,
|
||||||
|
) -> CacheResult<UnboundedStreamBody> {
|
||||||
|
let inner = self.inner.read().await;
|
||||||
|
inner.read(path, hash).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
/// Manager inner for cache on file system
|
||||||
|
struct FileStoreInner {
|
||||||
|
/// Counter of current cached files
|
||||||
|
cnt: usize,
|
||||||
|
/// Async runtime
|
||||||
|
runtime_handle: tokio::runtime::Handle,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FileStoreInner {
|
||||||
|
/// Build new cache file manager.
|
||||||
|
/// This first creates cache file dir if not exists, and cleans up the file inside the directory.
|
||||||
|
/// TODO: Persistent cache is really difficult. `sqlite` or something like that is needed.
|
||||||
|
async fn new(runtime_handle: &tokio::runtime::Handle) -> Self {
|
||||||
|
Self {
|
||||||
|
cnt: 0,
|
||||||
|
runtime_handle: runtime_handle.clone(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Create a new temporary file cache
|
||||||
|
async fn create(&mut self, cache_object: &CacheObject, body_bytes: &Bytes) -> CacheResult<()> {
|
||||||
|
let cache_filepath = match cache_object.target {
|
||||||
|
CacheFileOrOnMemory::File(ref path) => path.clone(),
|
||||||
|
CacheFileOrOnMemory::OnMemory(_) => {
|
||||||
|
return Err(CacheError::InvalidCacheTarget);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
let Ok(mut file) = File::create(&cache_filepath).await else {
|
||||||
|
return Err(CacheError::FailedToCreateFileCache);
|
||||||
|
};
|
||||||
|
let mut bytes_clone = body_bytes.clone();
|
||||||
|
while bytes_clone.has_remaining() {
|
||||||
|
if let Err(e) = file.write_buf(&mut bytes_clone).await {
|
||||||
|
error!("Failed to write file cache: {e}");
|
||||||
|
return Err(CacheError::FailedToWriteFileCache);
|
||||||
|
};
|
||||||
|
}
|
||||||
|
self.cnt += 1;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Retrieve a stored temporary file cache
|
||||||
|
async fn read(
|
||||||
|
&self,
|
||||||
|
path: impl AsRef<Path> + Send + Sync + 'static,
|
||||||
|
hash: &Bytes,
|
||||||
|
) -> CacheResult<UnboundedStreamBody> {
|
||||||
|
let Ok(mut file) = File::open(&path).await else {
|
||||||
|
warn!("Cache file object cannot be opened");
|
||||||
|
return Err(CacheError::FailedToOpenCacheFile);
|
||||||
|
};
|
||||||
|
let hash_clone = hash.clone();
|
||||||
|
let mut self_clone = self.clone();
|
||||||
|
|
||||||
|
let (body_tx, body_rx) = mpsc::unbounded::<Result<Frame<Bytes>, hyper::Error>>();
|
||||||
|
|
||||||
|
self.runtime_handle.spawn(async move {
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
let mut buf = BytesMut::new();
|
||||||
|
loop {
|
||||||
|
match file.read_buf(&mut buf).await {
|
||||||
|
Ok(0) => break,
|
||||||
|
Ok(_) => {
|
||||||
|
let bytes = buf.copy_to_bytes(buf.remaining());
|
||||||
|
hasher.update(bytes.as_ref());
|
||||||
|
body_tx
|
||||||
|
.unbounded_send(Ok(Frame::data(bytes)))
|
||||||
|
.map_err(|e| CacheError::FailedToSendFrameFromCache(e.to_string()))?
|
||||||
|
}
|
||||||
|
Err(_) => break,
|
||||||
|
};
|
||||||
|
}
|
||||||
|
let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref());
|
||||||
|
if hash_bytes != hash_clone {
|
||||||
|
warn!("Hash mismatched. Cache object is corrupted. Force to remove the cache file.");
|
||||||
|
// only file can be evicted
|
||||||
|
let _evicted_entry = self_clone.remove(&path).await;
|
||||||
|
return Err(CacheError::HashMismatchedInCacheFile);
|
||||||
|
}
|
||||||
|
Ok(()) as CacheResult<()>
|
||||||
|
});
|
||||||
|
|
||||||
|
let stream_body = StreamBody::new(body_rx);
|
||||||
|
|
||||||
|
Ok(stream_body)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Remove file
|
||||||
|
async fn remove(&mut self, path: impl AsRef<Path>) -> CacheResult<()> {
|
||||||
|
fs::remove_file(path.as_ref())
|
||||||
|
.await
|
||||||
|
.map_err(|e| CacheError::FailedToRemoveCacheFile(e.to_string()))?;
|
||||||
|
self.cnt -= 1;
|
||||||
|
debug!("Removed a cache file at {:?} (file count: {})", path.as_ref(), self.cnt);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ---------------------------------------------- */
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
/// Cache target in hybrid manner of on-memory and file system
|
||||||
|
pub enum CacheFileOrOnMemory {
|
||||||
|
/// Pointer to the temporary cache file
|
||||||
|
File(PathBuf),
|
||||||
|
/// Cached body itself
|
||||||
|
OnMemory(Bytes),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CacheFileOrOnMemory {
|
||||||
|
/// Get cache object target
|
||||||
|
fn build(cache_dir: &Path, uri: &Uri, object: &Bytes, max_each_size_on_memory: usize) -> Self {
|
||||||
|
if object.len() > max_each_size_on_memory {
|
||||||
|
let cache_filename = derive_filename_from_uri(uri);
|
||||||
|
let cache_filepath = cache_dir.join(cache_filename);
|
||||||
|
CacheFileOrOnMemory::File(cache_filepath)
|
||||||
|
} else {
|
||||||
|
CacheFileOrOnMemory::OnMemory(object.clone())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
/// Cache object definition
|
||||||
|
struct CacheObject {
|
||||||
|
/// Cache policy to determine if the stored cache can be used as a response to a new incoming request
|
||||||
|
pub policy: CachePolicy,
|
||||||
|
/// Cache target: on-memory object or temporary file
|
||||||
|
pub target: CacheFileOrOnMemory,
|
||||||
|
/// SHA256 hash of target to strongly bind the cache metadata (this object) and file target
|
||||||
|
pub hash: Bytes,
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ---------------------------------------------- */
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
/// Lru cache manager that is responsible to handle `Mutex` as an outer of `LruCache`
|
||||||
|
struct LruCacheManager {
|
||||||
|
/// Inner lru cache manager main object
|
||||||
|
inner: Arc<Mutex<LruCache<String, CacheObject>>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう
|
||||||
|
/// Counter of current cached object (total)
|
||||||
|
cnt: Arc<AtomicUsize>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl LruCacheManager {
|
||||||
|
/// Build LruCache
|
||||||
|
fn new(cache_max_entry: usize) -> Self {
|
||||||
|
Self {
|
||||||
|
inner: Arc::new(Mutex::new(LruCache::new(
|
||||||
|
std::num::NonZeroUsize::new(cache_max_entry).unwrap(),
|
||||||
|
))),
|
||||||
|
cnt: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Count entries
|
||||||
|
fn count(&self) -> usize {
|
||||||
|
self.cnt.load(Ordering::Relaxed)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Evict an entry
|
||||||
|
fn evict(&self, cache_key: &str) -> Option<(String, CacheObject)> {
|
||||||
|
let Ok(mut lock) = self.inner.lock() else {
|
||||||
|
error!("Mutex can't be locked to evict a cache entry");
|
||||||
|
return None;
|
||||||
|
};
|
||||||
|
let res = lock.pop_entry(cache_key);
|
||||||
|
// This may be inconsistent with the actual number of entries
|
||||||
|
self.cnt.store(lock.len(), Ordering::Relaxed);
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Push an entry
|
||||||
|
fn push(&self, cache_key: &str, cache_object: &CacheObject) -> CacheResult<Option<(String, CacheObject)>> {
|
||||||
|
let Ok(mut lock) = self.inner.lock() else {
|
||||||
|
error!("Failed to acquire mutex lock for writing cache entry");
|
||||||
|
return Err(CacheError::FailedToAcquiredMutexLockForCache);
|
||||||
|
};
|
||||||
|
let res = Ok(lock.push(cache_key.to_string(), cache_object.clone()));
|
||||||
|
// This may be inconsistent with the actual number of entries
|
||||||
|
self.cnt.store(lock.len(), Ordering::Relaxed);
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Get an entry
|
||||||
|
fn get(&self, cache_key: &str) -> CacheResult<Option<CacheObject>> {
|
||||||
|
let Ok(mut lock) = self.inner.lock() else {
|
||||||
|
error!("Mutex can't be locked for checking cache entry");
|
||||||
|
return Err(CacheError::FailedToAcquiredMutexLockForCheck);
|
||||||
|
};
|
||||||
|
let Some(cached_object) = lock.get(cache_key) else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
Ok(Some(cached_object.clone()))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ---------------------------------------------- */
|
||||||
|
/// Generate cache policy if the response is cacheable
|
||||||
|
pub fn get_policy_if_cacheable<B1, B2>(
|
||||||
|
req: Option<&Request<B1>>,
|
||||||
|
res: Option<&Response<B2>>,
|
||||||
|
) -> CacheResult<Option<CachePolicy>>
|
||||||
|
// where
|
||||||
|
// B1: core::fmt::Debug,
|
||||||
|
{
|
||||||
|
// deduce cache policy from req and res
|
||||||
|
let (Some(req), Some(res)) = (req, res) else {
|
||||||
|
return Err(CacheError::NullRequestOrResponse);
|
||||||
|
};
|
||||||
|
|
||||||
|
let new_policy = CachePolicy::new(req, res);
|
||||||
|
if new_policy.is_storable() {
|
||||||
|
// debug!("Response is cacheable: {:?}\n{:?}", req, res.headers());
|
||||||
|
Ok(Some(new_policy))
|
||||||
|
} else {
|
||||||
|
Ok(None)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn derive_filename_from_uri(uri: &hyper::Uri) -> String {
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
hasher.update(uri.to_string());
|
||||||
|
let digest = hasher.finalize();
|
||||||
|
general_purpose::URL_SAFE_NO_PAD.encode(digest)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn derive_cache_key_from_uri(uri: &hyper::Uri) -> String {
|
||||||
|
uri.to_string()
|
||||||
|
}
|
||||||
5
rpxy-lib/src/forwarder/cache/mod.rs
vendored
Normal file
5
rpxy-lib/src/forwarder/cache/mod.rs
vendored
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
mod cache_error;
|
||||||
|
mod cache_main;
|
||||||
|
|
||||||
|
pub use cache_error::CacheError;
|
||||||
|
pub use cache_main::{get_policy_if_cacheable, CacheFileOrOnMemory, RpxyCache};
|
||||||
|
|
@ -1,21 +1,21 @@
|
||||||
use crate::{
|
use crate::{
|
||||||
error::{RpxyError, RpxyResult},
|
error::{RpxyError, RpxyResult},
|
||||||
globals::Globals,
|
globals::Globals,
|
||||||
hyper_ext::{
|
hyper_ext::{body::ResponseBody, rt::LocalExecutor},
|
||||||
body::{wrap_incoming_body_response, IncomingOr},
|
|
||||||
rt::LocalExecutor,
|
|
||||||
},
|
|
||||||
log::*,
|
log::*,
|
||||||
};
|
};
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use http::{Request, Response, Version};
|
use http::{Request, Response, Version};
|
||||||
use hyper::body::Body;
|
use hyper::body::{Body, Incoming};
|
||||||
use hyper_util::client::legacy::{
|
use hyper_util::client::legacy::{
|
||||||
connect::{Connect, HttpConnector},
|
connect::{Connect, HttpConnector},
|
||||||
Client,
|
Client,
|
||||||
};
|
};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
use super::cache::{get_policy_if_cacheable, RpxyCache};
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
/// Definition of the forwarder that simply forward requests from downstream client to upstream app servers.
|
/// Definition of the forwarder that simply forward requests from downstream client to upstream app servers.
|
||||||
pub trait ForwardRequest<B1, B2> {
|
pub trait ForwardRequest<B1, B2> {
|
||||||
|
|
@ -25,27 +25,72 @@ pub trait ForwardRequest<B1, B2> {
|
||||||
|
|
||||||
/// Forwarder http client struct responsible to cache handling
|
/// Forwarder http client struct responsible to cache handling
|
||||||
pub struct Forwarder<C, B> {
|
pub struct Forwarder<C, B> {
|
||||||
// #[cfg(feature = "cache")]
|
#[cfg(feature = "cache")]
|
||||||
// cache: Option<RpxyCache>,
|
cache: Option<RpxyCache>,
|
||||||
inner: Client<C, B>,
|
inner: Client<C, B>,
|
||||||
inner_h2: Client<C, B>, // `h2c` or http/2-only client is defined separately
|
inner_h2: Client<C, B>, // `h2c` or http/2-only client is defined separately
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<C, B1, B2> ForwardRequest<B1, IncomingOr<B2>> for Forwarder<C, B1>
|
impl<C, B1> ForwardRequest<B1, ResponseBody> for Forwarder<C, B1>
|
||||||
where
|
where
|
||||||
C: Send + Sync + Connect + Clone + 'static,
|
C: Send + Sync + Connect + Clone + 'static,
|
||||||
B1: Body + Send + Sync + Unpin + 'static,
|
B1: Body + Send + Sync + Unpin + 'static,
|
||||||
<B1 as Body>::Data: Send,
|
<B1 as Body>::Data: Send,
|
||||||
<B1 as Body>::Error: Into<Box<(dyn std::error::Error + Send + Sync + 'static)>>,
|
<B1 as Body>::Error: Into<Box<(dyn std::error::Error + Send + Sync + 'static)>>,
|
||||||
B2: Body,
|
|
||||||
{
|
{
|
||||||
type Error = RpxyError;
|
type Error = RpxyError;
|
||||||
|
|
||||||
async fn request(&self, req: Request<B1>) -> Result<Response<IncomingOr<B2>>, Self::Error> {
|
async fn request(&self, req: Request<B1>) -> Result<Response<ResponseBody>, Self::Error> {
|
||||||
// TODO: cache handling
|
// TODO: cache handling
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
{
|
||||||
|
let mut synth_req = None;
|
||||||
|
if self.cache.is_some() {
|
||||||
|
// try reading from cache
|
||||||
|
if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await {
|
||||||
|
// if found, return it as response.
|
||||||
|
info!("Cache hit - Return from cache");
|
||||||
|
return Ok(cached_response);
|
||||||
|
};
|
||||||
|
|
||||||
self.request_directly(req).await
|
// Synthetic request copy used just for caching (cannot clone request object...)
|
||||||
|
synth_req = Some(build_synth_req_for_cache(&req));
|
||||||
|
}
|
||||||
|
let res = self.request_directly(req).await;
|
||||||
|
|
||||||
|
if self.cache.is_none() {
|
||||||
|
return res.map(|inner| inner.map(ResponseBody::Incoming));
|
||||||
|
}
|
||||||
|
|
||||||
|
// check cacheability and store it if cacheable
|
||||||
|
let Ok(Some(cache_policy)) = get_policy_if_cacheable(synth_req.as_ref(), res.as_ref().ok()) else {
|
||||||
|
return res.map(|inner| inner.map(ResponseBody::Incoming));
|
||||||
|
};
|
||||||
|
let (parts, body) = res.unwrap().into_parts();
|
||||||
|
|
||||||
|
// Get streamed body without waiting for the arrival of the body,
|
||||||
|
// which is done simultaneously with caching.
|
||||||
|
let stream_body = self
|
||||||
|
.cache
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.put(synth_req.unwrap().uri(), body, &cache_policy)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
// response with body being cached in background
|
||||||
|
let new_res = Response::from_parts(parts, ResponseBody::Streamed(stream_body));
|
||||||
|
Ok(new_res)
|
||||||
|
}
|
||||||
|
|
||||||
|
// No cache handling
|
||||||
|
#[cfg(not(feature = "cache"))]
|
||||||
|
{
|
||||||
|
self
|
||||||
|
.request_directly(req)
|
||||||
|
.await
|
||||||
|
.map(|inner| inner.map(ResponseBody::Incoming))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -56,13 +101,15 @@ where
|
||||||
<B1 as Body>::Data: Send,
|
<B1 as Body>::Data: Send,
|
||||||
<B1 as Body>::Error: Into<Box<(dyn std::error::Error + Send + Sync + 'static)>>,
|
<B1 as Body>::Error: Into<Box<(dyn std::error::Error + Send + Sync + 'static)>>,
|
||||||
{
|
{
|
||||||
async fn request_directly<B2: Body>(&self, req: Request<B1>) -> RpxyResult<Response<IncomingOr<B2>>> {
|
async fn request_directly(&self, req: Request<B1>) -> RpxyResult<Response<Incoming>> {
|
||||||
|
// TODO: This 'match' condition is always evaluated at every 'request' invocation. So, it is inefficient.
|
||||||
|
// Needs to be reconsidered. Currently, this is a kind of work around.
|
||||||
|
// This possibly relates to https://github.com/hyperium/hyper/issues/2417.
|
||||||
match req.version() {
|
match req.version() {
|
||||||
Version::HTTP_2 => self.inner_h2.request(req).await, // handles `h2c` requests
|
Version::HTTP_2 => self.inner_h2.request(req).await, // handles `h2c` requests
|
||||||
_ => self.inner.request(req).await,
|
_ => self.inner.request(req).await,
|
||||||
}
|
}
|
||||||
.map_err(|e| RpxyError::FailedToFetchFromUpstream(e.to_string()))
|
.map_err(|e| RpxyError::FailedToFetchFromUpstream(e.to_string()))
|
||||||
.map(wrap_incoming_body_response::<B2>)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -90,7 +137,9 @@ Please enable native-tls-backend or rustls-backend feature to enable TLS support
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
inner,
|
inner,
|
||||||
inner_h2: inner.clone(),
|
inner_h2,
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
cache: RpxyCache::new(_globals).await,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -118,6 +167,7 @@ where
|
||||||
let mut http = HttpConnector::new();
|
let mut http = HttpConnector::new();
|
||||||
http.enforce_http(false);
|
http.enforce_http(false);
|
||||||
http.set_reuse_address(true);
|
http.set_reuse_address(true);
|
||||||
|
http.set_keepalive(Some(_globals.proxy_config.upstream_idle_timeout));
|
||||||
hyper_tls::HttpsConnector::from((http, tls.into()))
|
hyper_tls::HttpsConnector::from((http, tls.into()))
|
||||||
})
|
})
|
||||||
};
|
};
|
||||||
|
|
@ -130,13 +180,12 @@ where
|
||||||
.http2_only(true)
|
.http2_only(true)
|
||||||
.build::<_, B1>(connector_h2);
|
.build::<_, B1>(connector_h2);
|
||||||
|
|
||||||
// #[cfg(feature = "cache")]
|
Ok(Self {
|
||||||
// {
|
inner,
|
||||||
// let cache = RpxyCache::new(_globals).await;
|
inner_h2,
|
||||||
// Self { inner, inner_h2, cache }
|
#[cfg(feature = "cache")]
|
||||||
// }
|
cache: RpxyCache::new(_globals).await,
|
||||||
// #[cfg(not(feature = "cache"))]
|
})
|
||||||
Ok(Self { inner, inner_h2 })
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -172,3 +221,17 @@ where
|
||||||
// let inner_h2 = Client::builder().http2_only(true).build::<_, Body>(connector_h2);
|
// let inner_h2 = Client::builder().http2_only(true).build::<_, Body>(connector_h2);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
/// Build synthetic request to cache
|
||||||
|
fn build_synth_req_for_cache<T>(req: &Request<T>) -> Request<()> {
|
||||||
|
let mut builder = Request::builder()
|
||||||
|
.method(req.method())
|
||||||
|
.uri(req.uri())
|
||||||
|
.version(req.version());
|
||||||
|
// TODO: omit extensions. is this approach correct?
|
||||||
|
for (header_key, header_value) in req.headers() {
|
||||||
|
builder = builder.header(header_key, header_value);
|
||||||
|
}
|
||||||
|
builder.body(()).unwrap()
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,11 @@
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
mod cache;
|
||||||
mod client;
|
mod client;
|
||||||
|
|
||||||
use crate::hyper_ext::body::{IncomingLike, IncomingOr};
|
use crate::hyper_ext::body::RequestBody;
|
||||||
pub type Forwarder<C> = client::Forwarder<C, IncomingOr<IncomingLike>>;
|
|
||||||
|
|
||||||
pub use client::ForwardRequest;
|
pub(crate) type Forwarder<C> = client::Forwarder<C, RequestBody>;
|
||||||
|
pub(crate) use client::ForwardRequest;
|
||||||
|
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
pub(crate) use cache::CacheError;
|
||||||
|
|
|
||||||
|
|
@ -33,8 +33,10 @@ pub struct ProxyConfig {
|
||||||
/// tcp listen backlog
|
/// tcp listen backlog
|
||||||
pub tcp_listen_backlog: u32,
|
pub tcp_listen_backlog: u32,
|
||||||
|
|
||||||
pub proxy_timeout: Duration, // when serving requests at Proxy
|
/// Idle timeout as an HTTP server, used as the keep alive interval and timeout for reading request header
|
||||||
pub upstream_timeout: Duration, // when serving requests at Handler
|
pub proxy_idle_timeout: Duration,
|
||||||
|
/// Idle timeout as an HTTP client, used as the keep alive interval for upstream connections
|
||||||
|
pub upstream_idle_timeout: Duration,
|
||||||
|
|
||||||
pub max_clients: usize, // when serving requests
|
pub max_clients: usize, // when serving requests
|
||||||
pub max_concurrent_streams: u32, // when instantiate server
|
pub max_concurrent_streams: u32, // when instantiate server
|
||||||
|
|
@ -80,8 +82,8 @@ impl Default for ProxyConfig {
|
||||||
tcp_listen_backlog: TCP_LISTEN_BACKLOG,
|
tcp_listen_backlog: TCP_LISTEN_BACKLOG,
|
||||||
|
|
||||||
// TODO: Reconsider each timeout values
|
// TODO: Reconsider each timeout values
|
||||||
proxy_timeout: Duration::from_secs(PROXY_TIMEOUT_SEC),
|
proxy_idle_timeout: Duration::from_secs(PROXY_IDLE_TIMEOUT_SEC),
|
||||||
upstream_timeout: Duration::from_secs(UPSTREAM_TIMEOUT_SEC),
|
upstream_idle_timeout: Duration::from_secs(UPSTREAM_IDLE_TIMEOUT_SEC),
|
||||||
|
|
||||||
max_clients: MAX_CLIENTS,
|
max_clients: MAX_CLIENTS,
|
||||||
max_concurrent_streams: MAX_CONCURRENT_STREAMS,
|
max_concurrent_streams: MAX_CONCURRENT_STREAMS,
|
||||||
|
|
|
||||||
|
|
@ -1,24 +1,11 @@
|
||||||
use http::Response;
|
use super::body::IncomingLike;
|
||||||
use http_body_util::{combinators, BodyExt, Either, Empty, Full};
|
use crate::error::RpxyError;
|
||||||
use hyper::body::{Bytes, Incoming};
|
use http_body_util::{combinators, BodyExt, Empty, Full};
|
||||||
|
use hyper::body::{Body, Bytes, Incoming};
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
/// Type for synthetic boxed body
|
/// Type for synthetic boxed body
|
||||||
pub(crate) type BoxBody = combinators::BoxBody<Bytes, hyper::Error>;
|
pub(crate) type BoxBody = combinators::BoxBody<Bytes, hyper::Error>;
|
||||||
/// Type for either passthrough body or given body type, specifically synthetic boxed body
|
|
||||||
pub(crate) type IncomingOr<B> = Either<Incoming, B>;
|
|
||||||
|
|
||||||
/// helper function to build http response with passthrough body
|
|
||||||
pub(crate) fn wrap_incoming_body_response<B>(response: Response<Incoming>) -> Response<IncomingOr<B>>
|
|
||||||
where
|
|
||||||
B: hyper::body::Body,
|
|
||||||
{
|
|
||||||
response.map(IncomingOr::Left)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// helper function to build http response with synthetic body
|
|
||||||
pub(crate) fn wrap_synthetic_body_response<B>(response: Response<B>) -> Response<IncomingOr<B>> {
|
|
||||||
response.map(IncomingOr::Right)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// helper function to build a empty body
|
/// helper function to build a empty body
|
||||||
pub(crate) fn empty() -> BoxBody {
|
pub(crate) fn empty() -> BoxBody {
|
||||||
|
|
@ -29,3 +16,68 @@ pub(crate) fn empty() -> BoxBody {
|
||||||
pub(crate) fn full(body: Bytes) -> BoxBody {
|
pub(crate) fn full(body: Bytes) -> BoxBody {
|
||||||
Full::new(body).map_err(|never| match never {}).boxed()
|
Full::new(body).map_err(|never| match never {}).boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/* ------------------------------------ */
|
||||||
|
/// Request body used in this project
|
||||||
|
/// - Incoming: just a type that only forwards the downstream request body to upstream.
|
||||||
|
/// - IncomingLike: a Incoming-like type in which channel is used
|
||||||
|
pub(crate) enum RequestBody {
|
||||||
|
Incoming(Incoming),
|
||||||
|
IncomingLike(IncomingLike),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Body for RequestBody {
|
||||||
|
type Data = bytes::Bytes;
|
||||||
|
type Error = RpxyError;
|
||||||
|
|
||||||
|
fn poll_frame(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
||||||
|
match self.get_mut() {
|
||||||
|
RequestBody::Incoming(incoming) => Pin::new(incoming).poll_frame(cx).map_err(RpxyError::HyperBodyError),
|
||||||
|
RequestBody::IncomingLike(incoming_like) => Pin::new(incoming_like).poll_frame(cx),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/* ------------------------------------ */
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
use futures::channel::mpsc::UnboundedReceiver;
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
use http_body_util::StreamBody;
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
use hyper::body::Frame;
|
||||||
|
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
pub(crate) type UnboundedStreamBody = StreamBody<UnboundedReceiver<Result<Frame<bytes::Bytes>, hyper::Error>>>;
|
||||||
|
|
||||||
|
/// Response body use in this project
|
||||||
|
/// - Incoming: just a type that only forwards the upstream response body to downstream.
|
||||||
|
/// - Boxed: a type that is generated from cache or synthetic response body, e.g.,, small byte object.
|
||||||
|
/// - Streamed: another type that is generated from stream, e.g., large byte object.
|
||||||
|
pub(crate) enum ResponseBody {
|
||||||
|
Incoming(Incoming),
|
||||||
|
Boxed(BoxBody),
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
Streamed(UnboundedStreamBody),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Body for ResponseBody {
|
||||||
|
type Data = bytes::Bytes;
|
||||||
|
type Error = RpxyError;
|
||||||
|
|
||||||
|
fn poll_frame(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut std::task::Context<'_>,
|
||||||
|
) -> std::task::Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
|
||||||
|
match self.get_mut() {
|
||||||
|
ResponseBody::Incoming(incoming) => Pin::new(incoming).poll_frame(cx),
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
ResponseBody::Boxed(boxed) => Pin::new(boxed).poll_frame(cx),
|
||||||
|
#[cfg(feature = "cache")]
|
||||||
|
ResponseBody::Streamed(streamed) => Pin::new(streamed).poll_frame(cx),
|
||||||
|
}
|
||||||
|
.map_err(RpxyError::HyperBodyError)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -1,14 +1,16 @@
|
||||||
mod body_incoming_like;
|
mod body_incoming_like;
|
||||||
mod body_type;
|
mod body_type;
|
||||||
mod executor;
|
mod executor;
|
||||||
|
mod tokio_timer;
|
||||||
mod watch;
|
mod watch;
|
||||||
|
|
||||||
|
#[allow(unused)]
|
||||||
pub(crate) mod rt {
|
pub(crate) mod rt {
|
||||||
pub(crate) use super::executor::LocalExecutor;
|
pub(crate) use super::executor::LocalExecutor;
|
||||||
|
pub(crate) use super::tokio_timer::{TokioSleep, TokioTimer};
|
||||||
}
|
}
|
||||||
|
#[allow(unused)]
|
||||||
pub(crate) mod body {
|
pub(crate) mod body {
|
||||||
pub(crate) use super::body_incoming_like::IncomingLike;
|
pub(crate) use super::body_incoming_like::IncomingLike;
|
||||||
pub(crate) use super::body_type::{
|
pub(crate) use super::body_type::{empty, full, BoxBody, RequestBody, ResponseBody, UnboundedStreamBody};
|
||||||
empty, full, wrap_incoming_body_response, wrap_synthetic_body_response, BoxBody, IncomingOr,
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
55
rpxy-lib/src/hyper_ext/tokio_timer.rs
Normal file
55
rpxy-lib/src/hyper_ext/tokio_timer.rs
Normal file
|
|
@ -0,0 +1,55 @@
|
||||||
|
use std::{
|
||||||
|
future::Future,
|
||||||
|
pin::Pin,
|
||||||
|
task::{Context, Poll},
|
||||||
|
time::{Duration, Instant},
|
||||||
|
};
|
||||||
|
|
||||||
|
use hyper::rt::{Sleep, Timer};
|
||||||
|
use pin_project_lite::pin_project;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
pub struct TokioTimer;
|
||||||
|
|
||||||
|
impl Timer for TokioTimer {
|
||||||
|
fn sleep(&self, duration: Duration) -> Pin<Box<dyn Sleep>> {
|
||||||
|
Box::pin(TokioSleep {
|
||||||
|
inner: tokio::time::sleep(duration),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sleep_until(&self, deadline: Instant) -> Pin<Box<dyn Sleep>> {
|
||||||
|
Box::pin(TokioSleep {
|
||||||
|
inner: tokio::time::sleep_until(deadline.into()),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset(&self, sleep: &mut Pin<Box<dyn Sleep>>, new_deadline: Instant) {
|
||||||
|
if let Some(sleep) = sleep.as_mut().downcast_mut_pin::<TokioSleep>() {
|
||||||
|
sleep.reset(new_deadline)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pin_project! {
|
||||||
|
pub(crate) struct TokioSleep {
|
||||||
|
#[pin]
|
||||||
|
pub(crate) inner: tokio::time::Sleep,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Future for TokioSleep {
|
||||||
|
type Output = ();
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
|
self.project().inner.poll(cx)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Sleep for TokioSleep {}
|
||||||
|
|
||||||
|
impl TokioSleep {
|
||||||
|
pub fn reset(self: Pin<&mut Self>, deadline: Instant) {
|
||||||
|
self.project().inner.as_mut().reset(deadline.into());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -11,7 +11,7 @@ use crate::{
|
||||||
error::*,
|
error::*,
|
||||||
forwarder::{ForwardRequest, Forwarder},
|
forwarder::{ForwardRequest, Forwarder},
|
||||||
globals::Globals,
|
globals::Globals,
|
||||||
hyper_ext::body::{BoxBody, IncomingLike, IncomingOr},
|
hyper_ext::body::{RequestBody, ResponseBody},
|
||||||
log::*,
|
log::*,
|
||||||
name_exp::ServerName,
|
name_exp::ServerName,
|
||||||
};
|
};
|
||||||
|
|
@ -19,7 +19,7 @@ use derive_builder::Builder;
|
||||||
use http::{Request, Response, StatusCode};
|
use http::{Request, Response, StatusCode};
|
||||||
use hyper_util::{client::legacy::connect::Connect, rt::TokioIo};
|
use hyper_util::{client::legacy::connect::Connect, rt::TokioIo};
|
||||||
use std::{net::SocketAddr, sync::Arc};
|
use std::{net::SocketAddr, sync::Arc};
|
||||||
use tokio::{io::copy_bidirectional, time::timeout};
|
use tokio::io::copy_bidirectional;
|
||||||
|
|
||||||
#[allow(dead_code)]
|
#[allow(dead_code)]
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
|
|
@ -53,12 +53,12 @@ where
|
||||||
/// Responsible to passthrough responses from backend applications or generate synthetic error responses.
|
/// Responsible to passthrough responses from backend applications or generate synthetic error responses.
|
||||||
pub async fn handle_request(
|
pub async fn handle_request(
|
||||||
&self,
|
&self,
|
||||||
req: Request<IncomingOr<IncomingLike>>,
|
req: Request<RequestBody>,
|
||||||
client_addr: SocketAddr, // For access control
|
client_addr: SocketAddr, // For access control
|
||||||
listen_addr: SocketAddr,
|
listen_addr: SocketAddr,
|
||||||
tls_enabled: bool,
|
tls_enabled: bool,
|
||||||
tls_server_name: Option<ServerName>,
|
tls_server_name: Option<ServerName>,
|
||||||
) -> RpxyResult<Response<IncomingOr<BoxBody>>> {
|
) -> RpxyResult<Response<ResponseBody>> {
|
||||||
// preparing log data
|
// preparing log data
|
||||||
let mut log_data = HttpMessageLog::from(&req);
|
let mut log_data = HttpMessageLog::from(&req);
|
||||||
log_data.client_addr(&client_addr);
|
log_data.client_addr(&client_addr);
|
||||||
|
|
@ -94,12 +94,12 @@ where
|
||||||
async fn handle_request_inner(
|
async fn handle_request_inner(
|
||||||
&self,
|
&self,
|
||||||
log_data: &mut HttpMessageLog,
|
log_data: &mut HttpMessageLog,
|
||||||
mut req: Request<IncomingOr<IncomingLike>>,
|
mut req: Request<RequestBody>,
|
||||||
client_addr: SocketAddr, // For access control
|
client_addr: SocketAddr, // For access control
|
||||||
listen_addr: SocketAddr,
|
listen_addr: SocketAddr,
|
||||||
tls_enabled: bool,
|
tls_enabled: bool,
|
||||||
tls_server_name: Option<ServerName>,
|
tls_server_name: Option<ServerName>,
|
||||||
) -> HttpResult<Response<IncomingOr<BoxBody>>> {
|
) -> HttpResult<Response<ResponseBody>> {
|
||||||
// Here we start to inspect and parse with server_name
|
// Here we start to inspect and parse with server_name
|
||||||
let server_name = req
|
let server_name = req
|
||||||
.inspect_parse_host()
|
.inspect_parse_host()
|
||||||
|
|
@ -172,15 +172,10 @@ where
|
||||||
|
|
||||||
//////////////
|
//////////////
|
||||||
// Forward request to a chosen backend
|
// Forward request to a chosen backend
|
||||||
let mut res_backend = {
|
let mut res_backend = match self.forwarder.request(req).await {
|
||||||
let Ok(result) = timeout(self.globals.proxy_config.upstream_timeout, self.forwarder.request(req)).await else {
|
Ok(v) => v,
|
||||||
return Err(HttpError::TimeoutUpstreamRequest);
|
Err(e) => {
|
||||||
};
|
return Err(HttpError::FailedToGetResponseFromBackend(e.to_string()));
|
||||||
match result {
|
|
||||||
Ok(res) => res,
|
|
||||||
Err(e) => {
|
|
||||||
return Err(HttpError::FailedToGetResponseFromBackend(e.to_string()));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
//////////////
|
//////////////
|
||||||
|
|
|
||||||
|
|
@ -22,8 +22,6 @@ pub enum HttpError {
|
||||||
NoUpstreamCandidates,
|
NoUpstreamCandidates,
|
||||||
#[error("Failed to generate upstream request for backend application: {0}")]
|
#[error("Failed to generate upstream request for backend application: {0}")]
|
||||||
FailedToGenerateUpstreamRequest(String),
|
FailedToGenerateUpstreamRequest(String),
|
||||||
#[error("Timeout in upstream request")]
|
|
||||||
TimeoutUpstreamRequest,
|
|
||||||
#[error("Failed to get response from backend: {0}")]
|
#[error("Failed to get response from backend: {0}")]
|
||||||
FailedToGetResponseFromBackend(String),
|
FailedToGetResponseFromBackend(String),
|
||||||
|
|
||||||
|
|
@ -53,7 +51,6 @@ impl From<HttpError> for StatusCode {
|
||||||
HttpError::FailedToRedirect(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
HttpError::FailedToRedirect(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
HttpError::NoUpstreamCandidates => StatusCode::NOT_FOUND,
|
HttpError::NoUpstreamCandidates => StatusCode::NOT_FOUND,
|
||||||
HttpError::FailedToGenerateUpstreamRequest(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
HttpError::FailedToGenerateUpstreamRequest(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
HttpError::TimeoutUpstreamRequest => StatusCode::GATEWAY_TIMEOUT,
|
|
||||||
HttpError::FailedToAddSetCookeInResponse(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
HttpError::FailedToAddSetCookeInResponse(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
HttpError::FailedToGenerateDownstreamResponse(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
HttpError::FailedToGenerateDownstreamResponse(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
HttpError::FailedToUpgrade(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
HttpError::FailedToUpgrade(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
|
|
|
||||||
|
|
@ -1,16 +1,16 @@
|
||||||
use super::http_result::{HttpError, HttpResult};
|
use super::http_result::{HttpError, HttpResult};
|
||||||
use crate::{
|
use crate::{
|
||||||
error::*,
|
error::*,
|
||||||
hyper_ext::body::{empty, BoxBody, IncomingOr},
|
hyper_ext::body::{empty, ResponseBody},
|
||||||
name_exp::ServerName,
|
name_exp::ServerName,
|
||||||
};
|
};
|
||||||
use http::{Request, Response, StatusCode, Uri};
|
use http::{Request, Response, StatusCode, Uri};
|
||||||
|
|
||||||
/// build http response with status code of 4xx and 5xx
|
/// build http response with status code of 4xx and 5xx
|
||||||
pub(crate) fn synthetic_error_response(status_code: StatusCode) -> RpxyResult<Response<IncomingOr<BoxBody>>> {
|
pub(crate) fn synthetic_error_response(status_code: StatusCode) -> RpxyResult<Response<ResponseBody>> {
|
||||||
let res = Response::builder()
|
let res = Response::builder()
|
||||||
.status(status_code)
|
.status(status_code)
|
||||||
.body(IncomingOr::Right(empty()))
|
.body(ResponseBody::Boxed(empty()))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
Ok(res)
|
Ok(res)
|
||||||
}
|
}
|
||||||
|
|
@ -20,7 +20,7 @@ pub(super) fn secure_redirection_response<B>(
|
||||||
server_name: &ServerName,
|
server_name: &ServerName,
|
||||||
tls_port: Option<u16>,
|
tls_port: Option<u16>,
|
||||||
req: &Request<B>,
|
req: &Request<B>,
|
||||||
) -> HttpResult<Response<IncomingOr<BoxBody>>> {
|
) -> HttpResult<Response<ResponseBody>> {
|
||||||
let server_name: String = server_name.try_into().unwrap_or_default();
|
let server_name: String = server_name.try_into().unwrap_or_default();
|
||||||
let pq = match req.uri().path_and_query() {
|
let pq = match req.uri().path_and_query() {
|
||||||
Some(x) => x.as_str(),
|
Some(x) => x.as_str(),
|
||||||
|
|
@ -36,7 +36,7 @@ pub(super) fn secure_redirection_response<B>(
|
||||||
let response = Response::builder()
|
let response = Response::builder()
|
||||||
.status(StatusCode::MOVED_PERMANENTLY)
|
.status(StatusCode::MOVED_PERMANENTLY)
|
||||||
.header("Location", dest_uri.to_string())
|
.header("Location", dest_uri.to_string())
|
||||||
.body(IncomingOr::Right(empty()))
|
.body(ResponseBody::Boxed(empty()))
|
||||||
.map_err(|e| HttpError::FailedToRedirect(e.to_string()))?;
|
.map_err(|e| HttpError::FailedToRedirect(e.to_string()))?;
|
||||||
Ok(response)
|
Ok(response)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,10 @@ mod proxy_quic_quinn;
|
||||||
mod proxy_quic_s2n;
|
mod proxy_quic_s2n;
|
||||||
mod socket;
|
mod socket;
|
||||||
|
|
||||||
use crate::{globals::Globals, hyper_ext::rt::LocalExecutor};
|
use crate::{
|
||||||
|
globals::Globals,
|
||||||
|
hyper_ext::rt::{LocalExecutor, TokioTimer},
|
||||||
|
};
|
||||||
use hyper_util::server::{self, conn::auto::Builder as ConnectionBuilder};
|
use hyper_util::server::{self, conn::auto::Builder as ConnectionBuilder};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
|
@ -19,9 +22,13 @@ pub(crate) fn connection_builder(globals: &Arc<Globals>) -> Arc<ConnectionBuilde
|
||||||
http_server
|
http_server
|
||||||
.http1()
|
.http1()
|
||||||
.keep_alive(globals.proxy_config.keepalive)
|
.keep_alive(globals.proxy_config.keepalive)
|
||||||
|
.header_read_timeout(globals.proxy_config.proxy_idle_timeout)
|
||||||
|
.timer(TokioTimer)
|
||||||
.pipeline_flush(true);
|
.pipeline_flush(true);
|
||||||
http_server
|
http_server
|
||||||
.http2()
|
.http2()
|
||||||
|
.keep_alive_interval(Some(globals.proxy_config.proxy_idle_timeout))
|
||||||
|
.timer(TokioTimer)
|
||||||
.max_concurrent_streams(globals.proxy_config.max_concurrent_streams);
|
.max_concurrent_streams(globals.proxy_config.max_concurrent_streams);
|
||||||
Arc::new(http_server)
|
Arc::new(http_server)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -2,7 +2,7 @@ use super::proxy_main::Proxy;
|
||||||
use crate::{
|
use crate::{
|
||||||
crypto::CryptoSource,
|
crypto::CryptoSource,
|
||||||
error::*,
|
error::*,
|
||||||
hyper_ext::body::{IncomingLike, IncomingOr},
|
hyper_ext::body::{IncomingLike, RequestBody},
|
||||||
log::*,
|
log::*,
|
||||||
name_exp::ServerName,
|
name_exp::ServerName,
|
||||||
};
|
};
|
||||||
|
|
@ -10,8 +10,7 @@ use bytes::{Buf, Bytes};
|
||||||
use http::{Request, Response};
|
use http::{Request, Response};
|
||||||
use http_body_util::BodyExt;
|
use http_body_util::BodyExt;
|
||||||
use hyper_util::client::legacy::connect::Connect;
|
use hyper_util::client::legacy::connect::Connect;
|
||||||
use std::{net::SocketAddr, time::Duration};
|
use std::net::SocketAddr;
|
||||||
use tokio::time::timeout;
|
|
||||||
|
|
||||||
#[cfg(feature = "http3-quinn")]
|
#[cfg(feature = "http3-quinn")]
|
||||||
use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream};
|
use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream};
|
||||||
|
|
@ -71,13 +70,11 @@ where
|
||||||
let self_inner = self.clone();
|
let self_inner = self.clone();
|
||||||
let tls_server_name_inner = tls_server_name.clone();
|
let tls_server_name_inner = tls_server_name.clone();
|
||||||
self.globals.runtime_handle.spawn(async move {
|
self.globals.runtime_handle.spawn(async move {
|
||||||
if let Err(e) = timeout(
|
if let Err(e) = self_inner
|
||||||
self_inner.globals.proxy_config.proxy_timeout + Duration::from_secs(1), // timeout per stream are considered as same as one in http2
|
.h3_serve_stream(req, stream, client_addr, tls_server_name_inner)
|
||||||
self_inner.h3_serve_stream(req, stream, client_addr, tls_server_name_inner),
|
.await
|
||||||
)
|
|
||||||
.await
|
|
||||||
{
|
{
|
||||||
error!("HTTP/3 failed to process stream: {}", e);
|
warn!("HTTP/3 error on serve stream: {}", e);
|
||||||
}
|
}
|
||||||
request_count.decrement();
|
request_count.decrement();
|
||||||
debug!("Request processed: current # {}", request_count.current());
|
debug!("Request processed: current # {}", request_count.current());
|
||||||
|
|
@ -122,7 +119,7 @@ where
|
||||||
size += body.remaining();
|
size += body.remaining();
|
||||||
if size > max_body_size {
|
if size > max_body_size {
|
||||||
error!(
|
error!(
|
||||||
"Exceeds max request body size for HTTP/3: received {}, maximum_allowd {}",
|
"Exceeds max request body size for HTTP/3: received {}, maximum_allowed {}",
|
||||||
size, max_body_size
|
size, max_body_size
|
||||||
);
|
);
|
||||||
return Err(RpxyError::H3TooLargeBody);
|
return Err(RpxyError::H3TooLargeBody);
|
||||||
|
|
@ -140,8 +137,7 @@ where
|
||||||
Ok(()) as RpxyResult<()>
|
Ok(()) as RpxyResult<()>
|
||||||
});
|
});
|
||||||
|
|
||||||
let new_req: Request<IncomingOr<IncomingLike>> = Request::from_parts(req_parts, IncomingOr::Right(req_body));
|
let new_req: Request<RequestBody> = Request::from_parts(req_parts, RequestBody::IncomingLike(req_body));
|
||||||
// Response<IncomingOr<BoxBody>> wrapped by RpxyResult
|
|
||||||
let res = self
|
let res = self
|
||||||
.message_handler
|
.message_handler
|
||||||
.handle_request(
|
.handle_request(
|
||||||
|
|
@ -153,22 +149,33 @@ where
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let (new_res_parts, new_body) = res.into_parts();
|
let (new_res_parts, mut new_body) = res.into_parts();
|
||||||
let new_res = Response::from_parts(new_res_parts, ());
|
let new_res = Response::from_parts(new_res_parts, ());
|
||||||
|
|
||||||
match send_stream.send_response(new_res).await {
|
match send_stream.send_response(new_res).await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
debug!("HTTP/3 response to connection successful");
|
debug!("HTTP/3 response to connection successful");
|
||||||
// aggregate body without copying
|
// on-demand body streaming to downstream without expanding the object onto memory.
|
||||||
let body_data = new_body
|
loop {
|
||||||
.collect()
|
let frame = match new_body.frame().await {
|
||||||
.await
|
Some(frame) => frame,
|
||||||
|
None => {
|
||||||
|
debug!("Response body finished");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
.map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?;
|
.map_err(|e| RpxyError::HyperBodyManipulationError(e.to_string()))?;
|
||||||
|
|
||||||
// create stream body to save memory, shallow copy (increment of ref-count) to Bytes using copy_to_bytes inside to_bytes()
|
if frame.is_data() {
|
||||||
send_stream.send_data(body_data.to_bytes()).await?;
|
let data = frame.into_data().unwrap_or_default();
|
||||||
|
debug!("Write data to HTTP/3 stream");
|
||||||
// TODO: needs handling trailer? should be included in body from handler.
|
send_stream.send_data(data).await?;
|
||||||
|
} else if frame.is_trailers() {
|
||||||
|
let trailers = frame.into_trailers().unwrap_or_default();
|
||||||
|
debug!("Write trailer to HTTP/3 stream");
|
||||||
|
send_stream.send_trailers(trailers).await?;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!("Unable to send response to connection peer: {:?}", err);
|
error!("Unable to send response to connection peer: {:?}", err);
|
||||||
|
|
|
||||||
|
|
@ -5,7 +5,7 @@ use crate::{
|
||||||
error::*,
|
error::*,
|
||||||
globals::Globals,
|
globals::Globals,
|
||||||
hyper_ext::{
|
hyper_ext::{
|
||||||
body::{BoxBody, IncomingOr},
|
body::{RequestBody, ResponseBody},
|
||||||
rt::LocalExecutor,
|
rt::LocalExecutor,
|
||||||
},
|
},
|
||||||
log::*,
|
log::*,
|
||||||
|
|
@ -32,14 +32,14 @@ async fn serve_request<U, T>(
|
||||||
listen_addr: SocketAddr,
|
listen_addr: SocketAddr,
|
||||||
tls_enabled: bool,
|
tls_enabled: bool,
|
||||||
tls_server_name: Option<ServerName>,
|
tls_server_name: Option<ServerName>,
|
||||||
) -> RpxyResult<Response<IncomingOr<BoxBody>>>
|
) -> RpxyResult<Response<ResponseBody>>
|
||||||
where
|
where
|
||||||
T: Send + Sync + Connect + Clone,
|
T: Send + Sync + Connect + Clone,
|
||||||
U: CryptoSource + Clone,
|
U: CryptoSource + Clone,
|
||||||
{
|
{
|
||||||
handler
|
handler
|
||||||
.handle_request(
|
.handle_request(
|
||||||
req.map(IncomingOr::Left),
|
req.map(RequestBody::Incoming),
|
||||||
client_addr,
|
client_addr,
|
||||||
listen_addr,
|
listen_addr,
|
||||||
tls_enabled,
|
tls_enabled,
|
||||||
|
|
@ -86,13 +86,11 @@ where
|
||||||
|
|
||||||
let server_clone = self.connection_builder.clone();
|
let server_clone = self.connection_builder.clone();
|
||||||
let message_handler_clone = self.message_handler.clone();
|
let message_handler_clone = self.message_handler.clone();
|
||||||
let timeout_sec = self.globals.proxy_config.proxy_timeout;
|
|
||||||
let tls_enabled = self.tls_enabled;
|
let tls_enabled = self.tls_enabled;
|
||||||
let listening_on = self.listening_on;
|
let listening_on = self.listening_on;
|
||||||
self.globals.runtime_handle.clone().spawn(async move {
|
self.globals.runtime_handle.clone().spawn(async move {
|
||||||
timeout(
|
server_clone
|
||||||
timeout_sec + Duration::from_secs(1),
|
.serve_connection_with_upgrades(
|
||||||
server_clone.serve_connection_with_upgrades(
|
|
||||||
stream,
|
stream,
|
||||||
service_fn(move |req: Request<Incoming>| {
|
service_fn(move |req: Request<Incoming>| {
|
||||||
serve_request(
|
serve_request(
|
||||||
|
|
@ -104,10 +102,9 @@ where
|
||||||
tls_server_name.clone(),
|
tls_server_name.clone(),
|
||||||
)
|
)
|
||||||
}),
|
}),
|
||||||
),
|
)
|
||||||
)
|
.await
|
||||||
.await
|
.ok();
|
||||||
.ok();
|
|
||||||
|
|
||||||
request_count.decrement();
|
request_count.decrement();
|
||||||
debug!("Request processed: current # {}", request_count.current());
|
debug!("Request processed: current # {}", request_count.current());
|
||||||
|
|
@ -201,8 +198,7 @@ where
|
||||||
return Err(RpxyError::FailedToTlsHandshake(e.to_string()));
|
return Err(RpxyError::FailedToTlsHandshake(e.to_string()));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
self_inner.serve_connection(stream, client_addr, server_name);
|
Ok((stream, client_addr, server_name))
|
||||||
Ok(()) as RpxyResult<()>
|
|
||||||
};
|
};
|
||||||
|
|
||||||
self.globals.runtime_handle.spawn( async move {
|
self.globals.runtime_handle.spawn( async move {
|
||||||
|
|
@ -214,8 +210,13 @@ where
|
||||||
error!("Timeout to handshake TLS");
|
error!("Timeout to handshake TLS");
|
||||||
return;
|
return;
|
||||||
};
|
};
|
||||||
if let Err(e) = v {
|
match v {
|
||||||
error!("{}", e);
|
Ok((stream, client_addr, server_name)) => {
|
||||||
|
self_inner.serve_connection(stream, client_addr, server_name);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!("{}", e);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -1 +1 @@
|
||||||
Subproject commit 3cd09170305753309d86e88b9427827cca0de0dd
|
Subproject commit 88d23c2f5a3ac36295dff4a804968c43932ba46b
|
||||||
Loading…
Add table
Add a link
Reference in a new issue