Merge pull request #79 from junkurihara/feat/cache

feat: initial implementation of caching feature
This commit is contained in:
Jun Kurihara 2023-08-18 19:24:04 +09:00 committed by GitHub
commit 4593f95251
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
27 changed files with 543 additions and 44 deletions

View file

@ -42,7 +42,7 @@ jobs:
- target: "s2n"
dockerfile: ./docker/Dockerfile
build-args: |
"CARGO_FEATURES=--no-default-features --features http3-s2n"
"CARGO_FEATURES=--no-default-features --features=http3-s2n,cache"
"ADDITIONAL_DEPS=pkg-config libssl-dev cmake libclang1 gcc g++"
platforms: linux/amd64,linux/arm64
tags-suffix: "-s2n"

15
.gitmodules vendored
View file

@ -1,9 +1,12 @@
[submodule "h3"]
path = h3
[submodule "submodules/h3"]
path = submodules/h3
url = git@github.com:junkurihara/h3.git
[submodule "quinn"]
path = quinn
[submodule "submodules/quinn"]
path = submodules/quinn
url = git@github.com:junkurihara/quinn.git
[submodule "s2n-quic"]
path = s2n-quic
[submodule "submodules/s2n-quic"]
path = submodules/s2n-quic
url = git@github.com:junkurihara/s2n-quic.git
[submodule "submodules/rusty-http-cache-semantics"]
path = submodules/rusty-http-cache-semantics
url = git@github.com:junkurihara/rusty-http-cache-semantics.git

View file

@ -5,6 +5,7 @@
### Improvement
- Feat: Enabled `h2c` (HTTP/2 cleartext) requests to upstream app servers (in the previous versions, only HTTP/1.1 is allowed for cleartext requests)
- Feat: Initial implementation of caching feature using file + on memory cache. (Caveats: No persistance of the cache. Once config is updated, the cache is totally eliminated.)
- Refactor: logs of minor improvements
### Bugfix

View file

@ -1,7 +1,7 @@
[workspace]
members = ["rpxy-bin", "rpxy-lib"]
exclude = ["quinn", "h3-quinn", "h3", "s2n-quic"]
exclude = ["submodules"]
[profile.release]
codegen-units = 1

View file

@ -257,7 +257,7 @@ Other than them, all you need is to mount your `config.toml` as `/etc/rpxy.toml`
### HTTP/3
`rpxy` can serves HTTP/3 requests thanks to `quinn` and `hyperium/h3`. To enable this experimental feature, add an entry `experimental.h3` in your `config.toml` like follows. Any values in the entry like `alt_svc_max_age` are optional.
`rpxy` can serves HTTP/3 requests thanks to `quinn`, `s2n-quic` and `hyperium/h3`. To enable this experimental feature, add an entry `experimental.h3` in your `config.toml` like follows. Any values in the entry like `alt_svc_max_age` are optional.
```toml
[experimental.h3]
@ -281,6 +281,19 @@ tls = { https_redirection = true, tls_cert_path = './server.crt', tls_cert_key_p
However, currently we have a limitation on HTTP/3 support for applications that enables client authentication. If an application is set with client authentication, HTTP/3 doesn't work for the application.
### Hybrid Caching Feature Using File and On-Memory
If `[experimental.cache]` is specified, you can leverage the local caching feature using temporary files and on-memory objects. Note that `max_cache_each_size` must be larger or equal to `max_cache_each_size_on_memory`.
```toml
# If this specified, file cache feature is enabled
[experimental.cache]
cache_dir = './cache' # optional. default is "./cache" relative to the current working directory
max_cache_entry = 1000 # optional. default is 1k
max_cache_each_size = 65535 # optional. default is 64k
max_cache_each_size_on_memory = 4096 # optional. default is 4k if 0, it is always file cache.
```
## TIPS
### Using Private Key Issued by Let's Encrypt

View file

@ -1,7 +1,10 @@
# TODO List
- [Done in 0.6.0] But we need more sophistication on `Forwarder` struct. ~~Fix strategy for `h2c` requests on forwarded requests upstream. This needs to update forwarder definition. Also, maybe forwarder would have a cache corresponding to the following task.~~
- [Try in v0.6.0] **Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55))**
- [Initial implementation in v0.6.0] ~~**Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55))**~~ Using `lru` crate might be inefficient in terms of the speed.
- Consider more sophisticated architecture for cache
- Persistent cache (if possible).
- etc etc
- Improvement of path matcher
- More flexible option for rewriting path
- Refactoring

View file

@ -107,3 +107,10 @@ max_concurrent_bidistream = 100
max_concurrent_unistream = 100
max_idle_timeout = 10 # secs. 0 represents an infinite timeout.
# WARNING: If a peer or its network path malfunctions or acts maliciously, an infinite idle timeout can result in permanently hung futures!
# If this specified, file cache feature is enabled
[experimental.cache]
cache_dir = './cache' # optional. default is "./cache" relative to the current working directory
max_cache_entry = 1000 # optional. default is 1k
max_cache_each_size = 65535 # optional. default is 64k
max_cache_each_size_on_memory = 4096 # optional. default is 4k if 0, it is always file cache.

1
quinn

@ -1 +0,0 @@
Subproject commit 8076ffe94d38813ce0220af9d3438e7bfb5e8429

View file

@ -12,31 +12,33 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["http3-quinn"]
default = ["http3-quinn", "cache"]
http3-quinn = ["rpxy-lib/http3-quinn"]
http3-s2n = ["rpxy-lib/http3-s2n"]
cache = ["rpxy-lib/cache"]
[dependencies]
rpxy-lib = { path = "../rpxy-lib/", default-features = false, features = [
"sticky-cookie",
] }
anyhow = "1.0.72"
anyhow = "1.0.75"
rustc-hash = "1.1.0"
serde = { version = "1.0.183", default-features = false, features = ["derive"] }
derive_builder = "0.12.0"
tokio = { version = "1.29.1", default-features = false, features = [
tokio = { version = "1.32.0", default-features = false, features = [
"net",
"rt-multi-thread",
"time",
"sync",
"macros",
] }
async-trait = "0.1.72"
async-trait = "0.1.73"
rustls-pemfile = "1.0.3"
mimalloc = { version = "*", default-features = false }
# config
clap = { version = "4.3.21", features = ["std", "cargo", "wrap_help"] }
clap = { version = "4.3.22", features = ["std", "cargo", "wrap_help"] }
toml = { version = "0.7.6", default-features = false, features = ["parse"] }
hot_reload = "0.1.4"
@ -45,8 +47,4 @@ tracing = { version = "0.1.37" }
tracing-subscriber = { version = "0.3.17", features = ["env-filter"] }
[target.'cfg(not(target_env = "msvc"))'.dependencies]
tikv-jemallocator = "0.5.4"
[dev-dependencies]

View file

@ -32,10 +32,21 @@ pub struct Http3Option {
pub max_idle_timeout: Option<u64>,
}
#[cfg(feature = "cache")]
#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)]
pub struct CacheOption {
pub cache_dir: Option<String>,
pub max_cache_entry: Option<usize>,
pub max_cache_each_size: Option<usize>,
pub max_cache_each_size_on_memory: Option<usize>,
}
#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)]
pub struct Experimental {
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
pub h3: Option<Http3Option>,
#[cfg(feature = "cache")]
pub cache: Option<CacheOption>,
pub ignore_sni_consistency: Option<bool>,
}
@ -160,6 +171,24 @@ impl TryInto<ProxyConfig> for &ConfigToml {
if let Some(ignore) = exp.ignore_sni_consistency {
proxy_config.sni_consistency = !ignore;
}
#[cfg(feature = "cache")]
if let Some(cache_option) = &exp.cache {
proxy_config.cache_enabled = true;
proxy_config.cache_dir = match &cache_option.cache_dir {
Some(cache_dir) => Some(std::path::PathBuf::from(cache_dir)),
None => Some(std::path::PathBuf::from(CACHE_DIR)),
};
if let Some(num) = cache_option.max_cache_entry {
proxy_config.cache_max_entry = num;
}
if let Some(num) = cache_option.max_cache_each_size {
proxy_config.cache_max_each_size = num;
}
if let Some(num) = cache_option.max_cache_each_size_on_memory {
proxy_config.cache_max_each_size_on_memory = num;
}
}
}
Ok(proxy_config)

View file

@ -1,3 +1,7 @@
pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"];
pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"];
pub const CONFIG_WATCH_DELAY_SECS: u32 = 20;
#[cfg(feature = "cache")]
// Cache directory
pub const CACHE_DIR: &str = "./cache";

View file

@ -1,9 +1,5 @@
#[cfg(not(target_env = "msvc"))]
use tikv_jemallocator::Jemalloc;
#[cfg(not(target_env = "msvc"))]
#[global_allocator]
static GLOBAL: Jemalloc = Jemalloc;
static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc;
mod cert_file_reader;
mod config;

View file

@ -12,10 +12,11 @@ publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features]
default = ["http3-quinn", "sticky-cookie"]
default = ["http3-quinn", "sticky-cookie", "cache"]
http3-quinn = ["quinn", "h3", "h3-quinn", "socket2"]
http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"]
sticky-cookie = ["base64", "sha2", "chrono"]
cache = ["http-cache-semantics", "lru"]
[dependencies]
rand = "0.8.5"
@ -23,19 +24,20 @@ rustc-hash = "1.1.0"
bytes = "1.4.0"
derive_builder = "0.12.0"
futures = { version = "0.3.28", features = ["alloc", "async-await"] }
tokio = { version = "1.29.1", default-features = false, features = [
tokio = { version = "1.32.0", default-features = false, features = [
"net",
"rt-multi-thread",
"time",
"sync",
"macros",
"fs",
] }
async-trait = "0.1.72"
async-trait = "0.1.73"
hot_reload = "0.1.4" # reloading certs
# Error handling
anyhow = "1.0.72"
thiserror = "1.0.44"
anyhow = "1.0.75"
thiserror = "1.0.47"
# http and tls
hyper = { version = "0.14.27", default-features = false, features = [
@ -60,17 +62,21 @@ tracing = { version = "0.1.37" }
# http/3
# quinn = { version = "0.9.3", optional = true }
quinn = { path = "../quinn/quinn", optional = true } # Tentative to support rustls-0.21
h3 = { path = "../h3/h3/", optional = true }
quinn = { path = "../submodules/quinn/quinn", optional = true } # Tentative to support rustls-0.21
h3 = { path = "../submodules/h3/h3/", optional = true }
# h3-quinn = { path = "./h3/h3-quinn/", optional = true }
h3-quinn = { path = "../h3-quinn/", optional = true } # Tentative to support rustls-0.21
h3-quinn = { path = "../submodules/h3-quinn/", optional = true } # Tentative to support rustls-0.21
# for UDP socket wit SO_REUSEADDR when h3 with quinn
socket2 = { version = "0.5.3", features = ["all"], optional = true }
s2n-quic = { path = "../s2n-quic/quic/s2n-quic/", default-features = false, features = [
s2n-quic = { path = "../submodules/s2n-quic/quic/s2n-quic/", default-features = false, features = [
"provider-tls-rustls",
], optional = true }
s2n-quic-h3 = { path = "../s2n-quic/quic/s2n-quic-h3/", optional = true }
s2n-quic-rustls = { path = "../s2n-quic/quic/s2n-quic-rustls/", optional = true }
s2n-quic-h3 = { path = "../submodules/s2n-quic/quic/s2n-quic-h3/", optional = true }
s2n-quic-rustls = { path = "../submodules/s2n-quic/quic/s2n-quic-rustls/", optional = true }
# cache
http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true }
lru = { version = "0.11.0", optional = true }
# cookie handling for sticky cookie
chrono = { version = "0.4.26", default-features = false, features = [

View file

@ -31,3 +31,15 @@ pub mod H3 {
#[cfg(feature = "sticky-cookie")]
/// For load-balancing with sticky cookie
pub const STICKY_COOKIE_NAME: &str = "rpxy_srv_id";
#[cfg(feature = "cache")]
// # of entries in cache
pub const MAX_CACHE_ENTRY: usize = 1_000;
#[cfg(feature = "cache")]
// max size for each file in bytes
pub const MAX_CACHE_EACH_SIZE: usize = 65_535;
#[cfg(feature = "cache")]
// on memory cache if less than or equel to
pub const MAX_CACHE_EACH_SIZE_ON_MEMORY: usize = 4_096;
// TODO: max cache size in total

View file

@ -22,6 +22,9 @@ pub enum RpxyError {
#[error("Http Message Handler Error: {0}")]
Handler(&'static str),
#[error("Cache Error: {0}")]
Cache(&'static str),
#[error("Http Request Message Error: {0}")]
Request(&'static str),

View file

@ -52,6 +52,18 @@ pub struct ProxyConfig {
// experimentals
pub sni_consistency: bool, // Handler
#[cfg(feature = "cache")]
pub cache_enabled: bool,
#[cfg(feature = "cache")]
pub cache_dir: Option<std::path::PathBuf>,
#[cfg(feature = "cache")]
pub cache_max_entry: usize,
#[cfg(feature = "cache")]
pub cache_max_each_size: usize,
#[cfg(feature = "cache")]
pub cache_max_each_size_on_memory: usize,
// All need to make packet acceptor
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
pub http3: bool,
@ -87,6 +99,17 @@ impl Default for ProxyConfig {
sni_consistency: true,
#[cfg(feature = "cache")]
cache_enabled: false,
#[cfg(feature = "cache")]
cache_dir: None,
#[cfg(feature = "cache")]
cache_max_entry: MAX_CACHE_ENTRY,
#[cfg(feature = "cache")]
cache_max_each_size: MAX_CACHE_EACH_SIZE,
#[cfg(feature = "cache")]
cache_max_each_size_on_memory: MAX_CACHE_EACH_SIZE_ON_MEMORY,
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
http3: false,
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]

View file

@ -0,0 +1,312 @@
use crate::{error::*, globals::Globals, log::*, CryptoSource};
use base64::{engine::general_purpose, Engine as _};
use bytes::{Buf, Bytes, BytesMut};
use http_cache_semantics::CachePolicy;
use hyper::{
http::{Request, Response},
Body,
};
use lru::LruCache;
use sha2::{Digest, Sha256};
use std::{
fmt::Debug,
path::{Path, PathBuf},
sync::{Arc, Mutex},
time::SystemTime,
};
use tokio::{
fs::{self, File},
io::{AsyncReadExt, AsyncWriteExt},
sync::RwLock,
};
#[derive(Clone, Debug)]
pub enum CacheFileOrOnMemory {
File(PathBuf),
OnMemory(Vec<u8>),
}
#[derive(Clone, Debug)]
struct CacheObject {
pub policy: CachePolicy,
pub target: CacheFileOrOnMemory,
}
#[derive(Debug)]
struct CacheFileManager {
cache_dir: PathBuf,
cnt: usize,
runtime_handle: tokio::runtime::Handle,
}
impl CacheFileManager {
async fn new(path: &PathBuf, runtime_handle: &tokio::runtime::Handle) -> Self {
// Create cache file dir
// Clean up the file dir before init
// TODO: Persistent cache is really difficult. maybe SQLite is needed.
if let Err(e) = fs::remove_dir_all(path).await {
warn!("Failed to clean up the cache dir: {e}");
};
fs::create_dir_all(path).await.unwrap();
Self {
cache_dir: path.clone(),
cnt: 0,
runtime_handle: runtime_handle.clone(),
}
}
async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes, policy: &CachePolicy) -> Result<CacheObject> {
let cache_filepath = self.cache_dir.join(cache_filename);
let Ok(mut file) = File::create(&cache_filepath).await else {
return Err(RpxyError::Cache("Failed to create file"));
};
let mut bytes_clone = body_bytes.clone();
while bytes_clone.has_remaining() {
if let Err(e) = file.write_buf(&mut bytes_clone).await {
error!("Failed to write file cache: {e}");
return Err(RpxyError::Cache("Failed to write file cache: {e}"));
};
}
self.cnt += 1;
Ok(CacheObject {
policy: policy.clone(),
target: CacheFileOrOnMemory::File(cache_filepath),
})
}
async fn read(&self, path: impl AsRef<Path>) -> Result<Body> {
let Ok(mut file) = File::open(&path).await else {
warn!("Cache file object cannot be opened");
return Err(RpxyError::Cache("Cache file object cannot be opened"));
};
let (body_sender, res_body) = Body::channel();
self.runtime_handle.spawn(async move {
let mut sender = body_sender;
let mut buf = BytesMut::new();
loop {
match file.read_buf(&mut buf).await {
Ok(0) => break,
Ok(_) => sender.send_data(buf.copy_to_bytes(buf.remaining())).await?,
Err(_) => break,
};
}
Ok(()) as Result<()>
});
Ok(res_body)
}
async fn remove(&mut self, path: impl AsRef<Path>) -> Result<()> {
fs::remove_file(path.as_ref()).await?;
self.cnt -= 1;
debug!("Removed a cache file at {:?} (file count: {})", path.as_ref(), self.cnt);
Ok(())
}
}
#[derive(Clone, Debug)]
pub struct RpxyCache {
/// Managing cache file objects through RwLock's lock mechanism for file lock
cache_file_manager: Arc<RwLock<CacheFileManager>>,
/// Lru cache storing http message caching policy
inner: Arc<Mutex<LruCache<String, CacheObject>>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう
/// Async runtime
runtime_handle: tokio::runtime::Handle,
/// Maximum size of each cache file object
max_each_size: usize,
/// Maximum size of cache object on memory
max_each_size_on_memory: usize,
}
impl RpxyCache {
/// Generate cache storage
pub async fn new<T: CryptoSource>(globals: &Globals<T>) -> Option<Self> {
if !globals.proxy_config.cache_enabled {
return None;
}
let path = globals.proxy_config.cache_dir.as_ref().unwrap();
let cache_file_manager = Arc::new(RwLock::new(CacheFileManager::new(path, &globals.runtime_handle).await));
let inner = Arc::new(Mutex::new(LruCache::new(
std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry).unwrap(),
)));
let max_each_size = globals.proxy_config.cache_max_each_size;
let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory;
if max_each_size < max_each_size_on_memory {
warn!(
"Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache"
);
max_each_size_on_memory = max_each_size;
}
Some(Self {
cache_file_manager,
inner,
runtime_handle: globals.runtime_handle.clone(),
max_each_size,
max_each_size_on_memory,
})
}
fn evict_cache_entry(&self, cache_key: &str) -> Option<(String, CacheObject)> {
let Ok(mut lock) = self.inner.lock() else {
error!("Mutex can't be locked to evict a cache entry");
return None;
};
lock.pop_entry(cache_key)
}
async fn evict_cache_file(&self, filepath: impl AsRef<Path>) {
// Acquire the write lock
let mut mgr = self.cache_file_manager.write().await;
if let Err(e) = mgr.remove(filepath).await {
warn!("Eviction failed during file object removal: {:?}", e);
};
}
/// Get cached response
pub async fn get<R>(&self, req: &Request<R>) -> Option<Response<Body>> {
debug!("Current cache entries: {:?}", self.inner);
let cache_key = req.uri().to_string();
// First check cache chance
let cached_object = {
let Ok(mut lock) = self.inner.lock() else {
error!("Mutex can't be locked for checking cache entry");
return None;
};
let Some(cached_object) = lock.get(&cache_key) else {
return None;
};
cached_object.clone()
};
// Secondly check the cache freshness as an HTTP message
let now = SystemTime::now();
let http_cache_semantics::BeforeRequest::Fresh(res_parts) = cached_object.policy.before_request(req, now) else {
// Evict stale cache entry.
// This might be okay to keep as is since it would be updated later.
// However, there is no guarantee that newly got objects will be still cacheable.
// So, we have to evict stale cache entries and cache file objects if found.
debug!("Stale cache entry and file object: {cache_key}");
let _evicted_entry = self.evict_cache_entry(&cache_key);
// For cache file
if let CacheFileOrOnMemory::File(path) = cached_object.target {
self.evict_cache_file(&path).await;
}
return None;
};
// Finally retrieve the file/on-memory object
match cached_object.target {
CacheFileOrOnMemory::File(path) => {
let mgr = self.cache_file_manager.read().await;
let res_body = match mgr.read(&path).await {
Ok(res_body) => res_body,
Err(e) => {
warn!("Failed to read from file cache: {e}");
let _evicted_entry = self.evict_cache_entry(&cache_key);
self.evict_cache_file(&path).await;
return None;
}
};
debug!("Cache hit from file: {cache_key}");
Some(Response::from_parts(res_parts, res_body))
}
CacheFileOrOnMemory::OnMemory(object) => {
debug!("Cache hit from on memory: {cache_key}");
Some(Response::from_parts(res_parts, Body::from(object)))
}
}
}
pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> {
let my_cache = self.inner.clone();
let mgr = self.cache_file_manager.clone();
let uri = uri.clone();
let bytes_clone = body_bytes.clone();
let policy_clone = policy.clone();
let max_each_size = self.max_each_size;
let max_each_size_on_memory = self.max_each_size_on_memory;
self.runtime_handle.spawn(async move {
if bytes_clone.len() > max_each_size {
warn!("Too large to cache");
return Err(RpxyError::Cache("Too large to cache"));
}
let cache_key = derive_cache_key_from_uri(&uri);
let cache_filename = derive_filename_from_uri(&uri);
debug!("Cache file of {:?} bytes to be written", bytes_clone.len());
let cache_object = if bytes_clone.len() > max_each_size_on_memory {
let mut mgr = mgr.write().await;
let Ok(cache_object) = mgr.create(&cache_filename, &bytes_clone, &policy_clone).await else {
error!("Failed to put the body into the file object or cache entry");
return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry"));
};
debug!("Cached a new file: {} - {}", cache_key, cache_filename);
cache_object
} else {
debug!("Cached a new object on memory: {}", cache_key);
CacheObject {
policy: policy_clone,
target: CacheFileOrOnMemory::OnMemory(bytes_clone.to_vec()),
}
};
let push_opt = {
let Ok(mut lock) = my_cache.lock() else {
error!("Failed to acquire mutex lock for writing cache entry");
return Err(RpxyError::Cache("Failed to acquire mutex lock for writing cache entry"));
};
lock.push(cache_key.clone(), cache_object)
};
if let Some((k, v)) = push_opt {
if k != cache_key {
info!("Over the cache capacity. Evict least recent used entry");
if let CacheFileOrOnMemory::File(path) = v.target {
let mut mgr = mgr.write().await;
if let Err(e) = mgr.remove(&path).await {
warn!("Eviction failed during file object removal over the capacity: {:?}", e);
};
}
}
}
Ok(())
});
Ok(())
}
}
fn derive_filename_from_uri(uri: &hyper::Uri) -> String {
let mut hasher = Sha256::new();
hasher.update(uri.to_string());
let digest = hasher.finalize();
general_purpose::URL_SAFE_NO_PAD.encode(digest)
}
fn derive_cache_key_from_uri(uri: &hyper::Uri) -> String {
uri.to_string()
}
pub fn get_policy_if_cacheable<R>(req: Option<&Request<R>>, res: Option<&Response<Body>>) -> Result<Option<CachePolicy>>
where
R: Debug,
{
// deduce cache policy from req and res
let (Some(req), Some(res)) = (req, res) else {
return Err(RpxyError::Cache("Invalid null request and/or response"));
};
let new_policy = CachePolicy::new(req, res);
if new_policy.is_storable() {
debug!("Response is cacheable: {:?}\n{:?}", req, res.headers());
Ok(Some(new_policy))
} else {
Ok(None)
}
}

View file

@ -1,6 +1,11 @@
use crate::error::RpxyError;
#[cfg(feature = "cache")]
use super::cache::{get_policy_if_cacheable, RpxyCache};
#[cfg(feature = "cache")]
use crate::log::*;
use crate::{error::RpxyError, globals::Globals, CryptoSource};
use async_trait::async_trait;
use derive_builder::Builder;
#[cfg(feature = "cache")]
use bytes::Buf;
use hyper::{
body::{Body, HttpBody},
client::{connect::Connect, HttpConnector},
@ -9,6 +14,20 @@ use hyper::{
};
use hyper_rustls::HttpsConnector;
#[cfg(feature = "cache")]
/// Build synthetic request to cache
fn build_synth_req_for_cache<T>(req: &Request<T>) -> Request<()> {
let mut builder = Request::builder()
.method(req.method())
.uri(req.uri())
.version(req.version());
// TODO: omit extensions. is this approach correct?
for (header_key, header_value) in req.headers() {
builder = builder.header(header_key, header_value);
}
builder.body(()).unwrap()
}
#[async_trait]
/// Definition of the forwarder that simply forward requests from downstream client to upstream app servers.
pub trait ForwardRequest<B> {
@ -16,13 +35,13 @@ pub trait ForwardRequest<B> {
async fn request(&self, req: Request<B>) -> Result<Response<Body>, Self::Error>;
}
#[derive(Builder, Clone)]
/// Forwarder struct
/// Forwarder struct responsible to cache handling
pub struct Forwarder<C, B = Body>
where
C: Connect + Clone + Sync + Send + 'static,
{
// TODO: maybe this forwarder definition is suitable for cache handling.
#[cfg(feature = "cache")]
cache: Option<RpxyCache>,
inner: Client<C, B>,
inner_h2: Client<C, B>, // `h2c` or http/2-only client is defined separately
}
@ -36,10 +55,60 @@ where
C: Connect + Clone + Sync + Send + 'static,
{
type Error = RpxyError;
#[cfg(feature = "cache")]
async fn request(&self, req: Request<B>) -> Result<Response<Body>, Self::Error> {
let mut synth_req = None;
if self.cache.is_some() {
if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await {
// if found, return it as response.
info!("Cache hit - Return from cache");
return Ok(cached_response);
};
// Synthetic request copy used just for caching (cannot clone request object...)
synth_req = Some(build_synth_req_for_cache(&req));
}
// TODO: This 'match' condition is always evaluated at every 'request' invocation. So, it is inefficient.
// Needs to be reconsidered. Currently, this is a kind of work around.
// This possibly relates to https://github.com/hyperium/hyper/issues/2417.
let res = match req.version() {
Version::HTTP_2 => self.inner_h2.request(req).await.map_err(RpxyError::Hyper), // handles `h2c` requests
_ => self.inner.request(req).await.map_err(RpxyError::Hyper),
};
if self.cache.is_none() {
return res;
}
// check cacheability and store it if cacheable
let Ok(Some(cache_policy)) = get_policy_if_cacheable(synth_req.as_ref(), res.as_ref().ok()) else {
return res;
};
let (parts, body) = res.unwrap().into_parts();
// TODO: Inefficient?
let Ok(mut bytes) = hyper::body::aggregate(body).await else {
return Err(RpxyError::Cache("Failed to write byte buffer"));
};
let aggregated = bytes.copy_to_bytes(bytes.remaining());
if let Err(cache_err) = self
.cache
.as_ref()
.unwrap()
.put(synth_req.unwrap().uri(), &aggregated, &cache_policy)
.await
{
error!("{:?}", cache_err);
};
// res
Ok(Response::from_parts(parts, Body::from(aggregated)))
}
#[cfg(not(feature = "cache"))]
async fn request(&self, req: Request<B>) -> Result<Response<Body>, Self::Error> {
match req.version() {
Version::HTTP_2 => self.inner_h2.request(req).await.map_err(RpxyError::Hyper), // handles `h2c` requests
_ => self.inner.request(req).await.map_err(RpxyError::Hyper),
@ -48,7 +117,8 @@ where
}
impl Forwarder<HttpsConnector<HttpConnector>, Body> {
pub async fn new() -> Self {
/// Build forwarder
pub async fn new<T: CryptoSource>(_globals: &std::sync::Arc<Globals<T>>) -> Self {
// let connector = TrustDnsResolver::default().into_rustls_webpki_https_connector();
let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
@ -64,6 +134,13 @@ impl Forwarder<HttpsConnector<HttpConnector>, Body> {
let inner = Client::builder().build::<_, Body>(connector);
let inner_h2 = Client::builder().http2_only(true).build::<_, Body>(connector_h2);
#[cfg(feature = "cache")]
{
let cache = RpxyCache::new(_globals).await;
Self { inner, inner_h2, cache }
}
#[cfg(not(feature = "cache"))]
Self { inner, inner_h2 }
}
}

View file

@ -1,3 +1,5 @@
#[cfg(feature = "cache")]
mod cache;
mod forwarder;
mod handler_main;
mod utils_headers;

View file

@ -60,6 +60,15 @@ where
if !proxy_config.sni_consistency {
info!("Ignore consistency between TLS SNI and Host header (or Request line). Note it violates RFC.");
}
#[cfg(feature = "cache")]
if proxy_config.cache_enabled {
info!(
"Cache is enabled: cache dir = {:?}",
proxy_config.cache_dir.as_ref().unwrap()
);
} else {
info!("Cache is disabled")
}
// build global
let globals = Arc::new(Globals {
@ -72,7 +81,7 @@ where
// build message handler including a request forwarder
let msg_handler = Arc::new(
HttpMessageHandlerBuilder::default()
.forwarder(Arc::new(Forwarder::new().await))
.forwarder(Arc::new(Forwarder::new(&globals).await))
.globals(globals.clone())
.build()?,
);

@ -1 +0,0 @@
Subproject commit 1ff2cd230fdf46596fe77830966857c438a8b31a

View file

1
submodules/quinn Submodule

@ -0,0 +1 @@
Subproject commit 4f25f501ef4d009af9d3bef44d322c09c327b2df

@ -0,0 +1 @@
Subproject commit 3cd09170305753309d86e88b9427827cca0de0dd

1
submodules/s2n-quic Submodule

@ -0,0 +1 @@
Subproject commit c7e3f517f3267f2e19b605cb53e6bf265e70e5af