diff --git a/.github/workflows/docker_build_push.yml b/.github/workflows/docker_build_push.yml index e2d801c..f7cea2b 100644 --- a/.github/workflows/docker_build_push.yml +++ b/.github/workflows/docker_build_push.yml @@ -42,7 +42,7 @@ jobs: - target: "s2n" dockerfile: ./docker/Dockerfile build-args: | - "CARGO_FEATURES=--no-default-features --features http3-s2n" + "CARGO_FEATURES=--no-default-features --features=http3-s2n,cache" "ADDITIONAL_DEPS=pkg-config libssl-dev cmake libclang1 gcc g++" platforms: linux/amd64,linux/arm64 tags-suffix: "-s2n" diff --git a/.gitmodules b/.gitmodules index 59b7ea8..65fcd3b 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,9 +1,12 @@ -[submodule "h3"] - path = h3 +[submodule "submodules/h3"] + path = submodules/h3 url = git@github.com:junkurihara/h3.git -[submodule "quinn"] - path = quinn +[submodule "submodules/quinn"] + path = submodules/quinn url = git@github.com:junkurihara/quinn.git -[submodule "s2n-quic"] - path = s2n-quic +[submodule "submodules/s2n-quic"] + path = submodules/s2n-quic url = git@github.com:junkurihara/s2n-quic.git +[submodule "submodules/rusty-http-cache-semantics"] + path = submodules/rusty-http-cache-semantics + url = git@github.com:junkurihara/rusty-http-cache-semantics.git diff --git a/CHANGELOG.md b/CHANGELOG.md index ff5cd18..492db57 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,7 @@ ### Improvement - Feat: Enabled `h2c` (HTTP/2 cleartext) requests to upstream app servers (in the previous versions, only HTTP/1.1 is allowed for cleartext requests) +- Feat: Initial implementation of caching feature using file + on memory cache. (Caveats: No persistance of the cache. Once config is updated, the cache is totally eliminated.) - Refactor: logs of minor improvements ### Bugfix diff --git a/Cargo.toml b/Cargo.toml index aa65657..29e2277 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] members = ["rpxy-bin", "rpxy-lib"] -exclude = ["quinn", "h3-quinn", "h3", "s2n-quic"] +exclude = ["submodules"] [profile.release] codegen-units = 1 diff --git a/README.md b/README.md index f14668d..9956935 100644 --- a/README.md +++ b/README.md @@ -257,7 +257,7 @@ Other than them, all you need is to mount your `config.toml` as `/etc/rpxy.toml` ### HTTP/3 -`rpxy` can serves HTTP/3 requests thanks to `quinn` and `hyperium/h3`. To enable this experimental feature, add an entry `experimental.h3` in your `config.toml` like follows. Any values in the entry like `alt_svc_max_age` are optional. +`rpxy` can serves HTTP/3 requests thanks to `quinn`, `s2n-quic` and `hyperium/h3`. To enable this experimental feature, add an entry `experimental.h3` in your `config.toml` like follows. Any values in the entry like `alt_svc_max_age` are optional. ```toml [experimental.h3] @@ -281,6 +281,19 @@ tls = { https_redirection = true, tls_cert_path = './server.crt', tls_cert_key_p However, currently we have a limitation on HTTP/3 support for applications that enables client authentication. If an application is set with client authentication, HTTP/3 doesn't work for the application. +### Hybrid Caching Feature Using File and On-Memory + +If `[experimental.cache]` is specified, you can leverage the local caching feature using temporary files and on-memory objects. Note that `max_cache_each_size` must be larger or equal to `max_cache_each_size_on_memory`. + +```toml +# If this specified, file cache feature is enabled +[experimental.cache] +cache_dir = './cache' # optional. default is "./cache" relative to the current working directory +max_cache_entry = 1000 # optional. default is 1k +max_cache_each_size = 65535 # optional. default is 64k +max_cache_each_size_on_memory = 4096 # optional. default is 4k if 0, it is always file cache. +``` + ## TIPS ### Using Private Key Issued by Let's Encrypt diff --git a/TODO.md b/TODO.md index 8ec6d5d..1e25ee1 100644 --- a/TODO.md +++ b/TODO.md @@ -1,7 +1,10 @@ # TODO List - [Done in 0.6.0] But we need more sophistication on `Forwarder` struct. ~~Fix strategy for `h2c` requests on forwarded requests upstream. This needs to update forwarder definition. Also, maybe forwarder would have a cache corresponding to the following task.~~ -- [Try in v0.6.0] **Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55))** +- [Initial implementation in v0.6.0] ~~**Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55))**~~ Using `lru` crate might be inefficient in terms of the speed. + - Consider more sophisticated architecture for cache + - Persistent cache (if possible). + - etc etc - Improvement of path matcher - More flexible option for rewriting path - Refactoring diff --git a/config-example.toml b/config-example.toml index 561ebc2..ec79f3d 100644 --- a/config-example.toml +++ b/config-example.toml @@ -107,3 +107,10 @@ max_concurrent_bidistream = 100 max_concurrent_unistream = 100 max_idle_timeout = 10 # secs. 0 represents an infinite timeout. # WARNING: If a peer or its network path malfunctions or acts maliciously, an infinite idle timeout can result in permanently hung futures! + +# If this specified, file cache feature is enabled +[experimental.cache] +cache_dir = './cache' # optional. default is "./cache" relative to the current working directory +max_cache_entry = 1000 # optional. default is 1k +max_cache_each_size = 65535 # optional. default is 64k +max_cache_each_size_on_memory = 4096 # optional. default is 4k if 0, it is always file cache. diff --git a/quinn b/quinn deleted file mode 160000 index 8076ffe..0000000 --- a/quinn +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 8076ffe94d38813ce0220af9d3438e7bfb5e8429 diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index 65fe24d..326a0fc 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -12,31 +12,33 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["http3-quinn"] +default = ["http3-quinn", "cache"] http3-quinn = ["rpxy-lib/http3-quinn"] http3-s2n = ["rpxy-lib/http3-s2n"] +cache = ["rpxy-lib/cache"] [dependencies] rpxy-lib = { path = "../rpxy-lib/", default-features = false, features = [ "sticky-cookie", ] } -anyhow = "1.0.72" +anyhow = "1.0.75" rustc-hash = "1.1.0" serde = { version = "1.0.183", default-features = false, features = ["derive"] } derive_builder = "0.12.0" -tokio = { version = "1.29.1", default-features = false, features = [ +tokio = { version = "1.32.0", default-features = false, features = [ "net", "rt-multi-thread", "time", "sync", "macros", ] } -async-trait = "0.1.72" +async-trait = "0.1.73" rustls-pemfile = "1.0.3" +mimalloc = { version = "*", default-features = false } # config -clap = { version = "4.3.21", features = ["std", "cargo", "wrap_help"] } +clap = { version = "4.3.22", features = ["std", "cargo", "wrap_help"] } toml = { version = "0.7.6", default-features = false, features = ["parse"] } hot_reload = "0.1.4" @@ -45,8 +47,4 @@ tracing = { version = "0.1.37" } tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } -[target.'cfg(not(target_env = "msvc"))'.dependencies] -tikv-jemallocator = "0.5.4" - - [dev-dependencies] diff --git a/rpxy-bin/src/config/toml.rs b/rpxy-bin/src/config/toml.rs index 5f6ab4a..e678012 100644 --- a/rpxy-bin/src/config/toml.rs +++ b/rpxy-bin/src/config/toml.rs @@ -32,10 +32,21 @@ pub struct Http3Option { pub max_idle_timeout: Option, } +#[cfg(feature = "cache")] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] +pub struct CacheOption { + pub cache_dir: Option, + pub max_cache_entry: Option, + pub max_cache_each_size: Option, + pub max_cache_each_size_on_memory: Option, +} + #[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct Experimental { #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] pub h3: Option, + #[cfg(feature = "cache")] + pub cache: Option, pub ignore_sni_consistency: Option, } @@ -160,6 +171,24 @@ impl TryInto for &ConfigToml { if let Some(ignore) = exp.ignore_sni_consistency { proxy_config.sni_consistency = !ignore; } + + #[cfg(feature = "cache")] + if let Some(cache_option) = &exp.cache { + proxy_config.cache_enabled = true; + proxy_config.cache_dir = match &cache_option.cache_dir { + Some(cache_dir) => Some(std::path::PathBuf::from(cache_dir)), + None => Some(std::path::PathBuf::from(CACHE_DIR)), + }; + if let Some(num) = cache_option.max_cache_entry { + proxy_config.cache_max_entry = num; + } + if let Some(num) = cache_option.max_cache_each_size { + proxy_config.cache_max_each_size = num; + } + if let Some(num) = cache_option.max_cache_each_size_on_memory { + proxy_config.cache_max_each_size_on_memory = num; + } + } } Ok(proxy_config) diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs index 323615f..53c8bbc 100644 --- a/rpxy-bin/src/constants.rs +++ b/rpxy-bin/src/constants.rs @@ -1,3 +1,7 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; pub const CONFIG_WATCH_DELAY_SECS: u32 = 20; + +#[cfg(feature = "cache")] +// Cache directory +pub const CACHE_DIR: &str = "./cache"; diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index 474eff1..f04a6f1 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -1,9 +1,5 @@ -#[cfg(not(target_env = "msvc"))] -use tikv_jemallocator::Jemalloc; - -#[cfg(not(target_env = "msvc"))] #[global_allocator] -static GLOBAL: Jemalloc = Jemalloc; +static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; mod cert_file_reader; mod config; diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index bcb56e3..8dc7c8f 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -12,10 +12,11 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["http3-quinn", "sticky-cookie"] +default = ["http3-quinn", "sticky-cookie", "cache"] http3-quinn = ["quinn", "h3", "h3-quinn", "socket2"] http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"] sticky-cookie = ["base64", "sha2", "chrono"] +cache = ["http-cache-semantics", "lru"] [dependencies] rand = "0.8.5" @@ -23,19 +24,20 @@ rustc-hash = "1.1.0" bytes = "1.4.0" derive_builder = "0.12.0" futures = { version = "0.3.28", features = ["alloc", "async-await"] } -tokio = { version = "1.29.1", default-features = false, features = [ +tokio = { version = "1.32.0", default-features = false, features = [ "net", "rt-multi-thread", "time", "sync", "macros", + "fs", ] } -async-trait = "0.1.72" +async-trait = "0.1.73" hot_reload = "0.1.4" # reloading certs # Error handling -anyhow = "1.0.72" -thiserror = "1.0.44" +anyhow = "1.0.75" +thiserror = "1.0.47" # http and tls hyper = { version = "0.14.27", default-features = false, features = [ @@ -60,17 +62,21 @@ tracing = { version = "0.1.37" } # http/3 # quinn = { version = "0.9.3", optional = true } -quinn = { path = "../quinn/quinn", optional = true } # Tentative to support rustls-0.21 -h3 = { path = "../h3/h3/", optional = true } +quinn = { path = "../submodules/quinn/quinn", optional = true } # Tentative to support rustls-0.21 +h3 = { path = "../submodules/h3/h3/", optional = true } # h3-quinn = { path = "./h3/h3-quinn/", optional = true } -h3-quinn = { path = "../h3-quinn/", optional = true } # Tentative to support rustls-0.21 +h3-quinn = { path = "../submodules/h3-quinn/", optional = true } # Tentative to support rustls-0.21 # for UDP socket wit SO_REUSEADDR when h3 with quinn socket2 = { version = "0.5.3", features = ["all"], optional = true } -s2n-quic = { path = "../s2n-quic/quic/s2n-quic/", default-features = false, features = [ +s2n-quic = { path = "../submodules/s2n-quic/quic/s2n-quic/", default-features = false, features = [ "provider-tls-rustls", ], optional = true } -s2n-quic-h3 = { path = "../s2n-quic/quic/s2n-quic-h3/", optional = true } -s2n-quic-rustls = { path = "../s2n-quic/quic/s2n-quic-rustls/", optional = true } +s2n-quic-h3 = { path = "../submodules/s2n-quic/quic/s2n-quic-h3/", optional = true } +s2n-quic-rustls = { path = "../submodules/s2n-quic/quic/s2n-quic-rustls/", optional = true } + +# cache +http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } +lru = { version = "0.11.0", optional = true } # cookie handling for sticky cookie chrono = { version = "0.4.26", default-features = false, features = [ diff --git a/rpxy-lib/src/constants.rs b/rpxy-lib/src/constants.rs index b7b0bff..ebec1fc 100644 --- a/rpxy-lib/src/constants.rs +++ b/rpxy-lib/src/constants.rs @@ -31,3 +31,15 @@ pub mod H3 { #[cfg(feature = "sticky-cookie")] /// For load-balancing with sticky cookie pub const STICKY_COOKIE_NAME: &str = "rpxy_srv_id"; + +#[cfg(feature = "cache")] +// # of entries in cache +pub const MAX_CACHE_ENTRY: usize = 1_000; +#[cfg(feature = "cache")] +// max size for each file in bytes +pub const MAX_CACHE_EACH_SIZE: usize = 65_535; +#[cfg(feature = "cache")] +// on memory cache if less than or equel to +pub const MAX_CACHE_EACH_SIZE_ON_MEMORY: usize = 4_096; + +// TODO: max cache size in total diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index da56dac..c672682 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -22,6 +22,9 @@ pub enum RpxyError { #[error("Http Message Handler Error: {0}")] Handler(&'static str), + #[error("Cache Error: {0}")] + Cache(&'static str), + #[error("Http Request Message Error: {0}")] Request(&'static str), diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index 0bed623..d1c0130 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -52,6 +52,18 @@ pub struct ProxyConfig { // experimentals pub sni_consistency: bool, // Handler + + #[cfg(feature = "cache")] + pub cache_enabled: bool, + #[cfg(feature = "cache")] + pub cache_dir: Option, + #[cfg(feature = "cache")] + pub cache_max_entry: usize, + #[cfg(feature = "cache")] + pub cache_max_each_size: usize, + #[cfg(feature = "cache")] + pub cache_max_each_size_on_memory: usize, + // All need to make packet acceptor #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] pub http3: bool, @@ -87,6 +99,17 @@ impl Default for ProxyConfig { sni_consistency: true, + #[cfg(feature = "cache")] + cache_enabled: false, + #[cfg(feature = "cache")] + cache_dir: None, + #[cfg(feature = "cache")] + cache_max_entry: MAX_CACHE_ENTRY, + #[cfg(feature = "cache")] + cache_max_each_size: MAX_CACHE_EACH_SIZE, + #[cfg(feature = "cache")] + cache_max_each_size_on_memory: MAX_CACHE_EACH_SIZE_ON_MEMORY, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] http3: false, #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] diff --git a/rpxy-lib/src/handler/cache.rs b/rpxy-lib/src/handler/cache.rs new file mode 100644 index 0000000..22ed51b --- /dev/null +++ b/rpxy-lib/src/handler/cache.rs @@ -0,0 +1,312 @@ +use crate::{error::*, globals::Globals, log::*, CryptoSource}; +use base64::{engine::general_purpose, Engine as _}; +use bytes::{Buf, Bytes, BytesMut}; +use http_cache_semantics::CachePolicy; +use hyper::{ + http::{Request, Response}, + Body, +}; +use lru::LruCache; +use sha2::{Digest, Sha256}; +use std::{ + fmt::Debug, + path::{Path, PathBuf}, + sync::{Arc, Mutex}, + time::SystemTime, +}; +use tokio::{ + fs::{self, File}, + io::{AsyncReadExt, AsyncWriteExt}, + sync::RwLock, +}; + +#[derive(Clone, Debug)] +pub enum CacheFileOrOnMemory { + File(PathBuf), + OnMemory(Vec), +} + +#[derive(Clone, Debug)] +struct CacheObject { + pub policy: CachePolicy, + pub target: CacheFileOrOnMemory, +} + +#[derive(Debug)] +struct CacheFileManager { + cache_dir: PathBuf, + cnt: usize, + runtime_handle: tokio::runtime::Handle, +} + +impl CacheFileManager { + async fn new(path: &PathBuf, runtime_handle: &tokio::runtime::Handle) -> Self { + // Create cache file dir + // Clean up the file dir before init + // TODO: Persistent cache is really difficult. maybe SQLite is needed. + if let Err(e) = fs::remove_dir_all(path).await { + warn!("Failed to clean up the cache dir: {e}"); + }; + fs::create_dir_all(path).await.unwrap(); + Self { + cache_dir: path.clone(), + cnt: 0, + runtime_handle: runtime_handle.clone(), + } + } + + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes, policy: &CachePolicy) -> Result { + let cache_filepath = self.cache_dir.join(cache_filename); + let Ok(mut file) = File::create(&cache_filepath).await else { + return Err(RpxyError::Cache("Failed to create file")); + }; + let mut bytes_clone = body_bytes.clone(); + while bytes_clone.has_remaining() { + if let Err(e) = file.write_buf(&mut bytes_clone).await { + error!("Failed to write file cache: {e}"); + return Err(RpxyError::Cache("Failed to write file cache: {e}")); + }; + } + self.cnt += 1; + Ok(CacheObject { + policy: policy.clone(), + target: CacheFileOrOnMemory::File(cache_filepath), + }) + } + + async fn read(&self, path: impl AsRef) -> Result { + let Ok(mut file) = File::open(&path).await else { + warn!("Cache file object cannot be opened"); + return Err(RpxyError::Cache("Cache file object cannot be opened")); + }; + let (body_sender, res_body) = Body::channel(); + self.runtime_handle.spawn(async move { + let mut sender = body_sender; + let mut buf = BytesMut::new(); + loop { + match file.read_buf(&mut buf).await { + Ok(0) => break, + Ok(_) => sender.send_data(buf.copy_to_bytes(buf.remaining())).await?, + Err(_) => break, + }; + } + Ok(()) as Result<()> + }); + + Ok(res_body) + } + + async fn remove(&mut self, path: impl AsRef) -> Result<()> { + fs::remove_file(path.as_ref()).await?; + self.cnt -= 1; + debug!("Removed a cache file at {:?} (file count: {})", path.as_ref(), self.cnt); + + Ok(()) + } +} + +#[derive(Clone, Debug)] +pub struct RpxyCache { + /// Managing cache file objects through RwLock's lock mechanism for file lock + cache_file_manager: Arc>, + /// Lru cache storing http message caching policy + inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + /// Async runtime + runtime_handle: tokio::runtime::Handle, + /// Maximum size of each cache file object + max_each_size: usize, + /// Maximum size of cache object on memory + max_each_size_on_memory: usize, +} + +impl RpxyCache { + /// Generate cache storage + pub async fn new(globals: &Globals) -> Option { + if !globals.proxy_config.cache_enabled { + return None; + } + + let path = globals.proxy_config.cache_dir.as_ref().unwrap(); + let cache_file_manager = Arc::new(RwLock::new(CacheFileManager::new(path, &globals.runtime_handle).await)); + let inner = Arc::new(Mutex::new(LruCache::new( + std::num::NonZeroUsize::new(globals.proxy_config.cache_max_entry).unwrap(), + ))); + + let max_each_size = globals.proxy_config.cache_max_each_size; + let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory; + if max_each_size < max_each_size_on_memory { + warn!( + "Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache" + ); + max_each_size_on_memory = max_each_size; + } + + Some(Self { + cache_file_manager, + inner, + runtime_handle: globals.runtime_handle.clone(), + max_each_size, + max_each_size_on_memory, + }) + } + + fn evict_cache_entry(&self, cache_key: &str) -> Option<(String, CacheObject)> { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked to evict a cache entry"); + return None; + }; + lock.pop_entry(cache_key) + } + + async fn evict_cache_file(&self, filepath: impl AsRef) { + // Acquire the write lock + let mut mgr = self.cache_file_manager.write().await; + if let Err(e) = mgr.remove(filepath).await { + warn!("Eviction failed during file object removal: {:?}", e); + }; + } + + /// Get cached response + pub async fn get(&self, req: &Request) -> Option> { + debug!("Current cache entries: {:?}", self.inner); + let cache_key = req.uri().to_string(); + + // First check cache chance + let cached_object = { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked for checking cache entry"); + return None; + }; + let Some(cached_object) = lock.get(&cache_key) else { + return None; + }; + cached_object.clone() + }; + + // Secondly check the cache freshness as an HTTP message + let now = SystemTime::now(); + let http_cache_semantics::BeforeRequest::Fresh(res_parts) = cached_object.policy.before_request(req, now) else { + // Evict stale cache entry. + // This might be okay to keep as is since it would be updated later. + // However, there is no guarantee that newly got objects will be still cacheable. + // So, we have to evict stale cache entries and cache file objects if found. + debug!("Stale cache entry and file object: {cache_key}"); + let _evicted_entry = self.evict_cache_entry(&cache_key); + // For cache file + if let CacheFileOrOnMemory::File(path) = cached_object.target { + self.evict_cache_file(&path).await; + } + return None; + }; + + // Finally retrieve the file/on-memory object + match cached_object.target { + CacheFileOrOnMemory::File(path) => { + let mgr = self.cache_file_manager.read().await; + let res_body = match mgr.read(&path).await { + Ok(res_body) => res_body, + Err(e) => { + warn!("Failed to read from file cache: {e}"); + let _evicted_entry = self.evict_cache_entry(&cache_key); + self.evict_cache_file(&path).await; + return None; + } + }; + + debug!("Cache hit from file: {cache_key}"); + Some(Response::from_parts(res_parts, res_body)) + } + CacheFileOrOnMemory::OnMemory(object) => { + debug!("Cache hit from on memory: {cache_key}"); + Some(Response::from_parts(res_parts, Body::from(object))) + } + } + } + + pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> { + let my_cache = self.inner.clone(); + let mgr = self.cache_file_manager.clone(); + let uri = uri.clone(); + let bytes_clone = body_bytes.clone(); + let policy_clone = policy.clone(); + let max_each_size = self.max_each_size; + let max_each_size_on_memory = self.max_each_size_on_memory; + + self.runtime_handle.spawn(async move { + if bytes_clone.len() > max_each_size { + warn!("Too large to cache"); + return Err(RpxyError::Cache("Too large to cache")); + } + let cache_key = derive_cache_key_from_uri(&uri); + let cache_filename = derive_filename_from_uri(&uri); + + debug!("Cache file of {:?} bytes to be written", bytes_clone.len()); + + let cache_object = if bytes_clone.len() > max_each_size_on_memory { + let mut mgr = mgr.write().await; + let Ok(cache_object) = mgr.create(&cache_filename, &bytes_clone, &policy_clone).await else { + error!("Failed to put the body into the file object or cache entry"); + return Err(RpxyError::Cache("Failed to put the body into the file object or cache entry")); + }; + debug!("Cached a new file: {} - {}", cache_key, cache_filename); + cache_object + } else { + debug!("Cached a new object on memory: {}", cache_key); + CacheObject { + policy: policy_clone, + target: CacheFileOrOnMemory::OnMemory(bytes_clone.to_vec()), + } + }; + let push_opt = { + let Ok(mut lock) = my_cache.lock() else { + error!("Failed to acquire mutex lock for writing cache entry"); + return Err(RpxyError::Cache("Failed to acquire mutex lock for writing cache entry")); + }; + lock.push(cache_key.clone(), cache_object) + }; + if let Some((k, v)) = push_opt { + if k != cache_key { + info!("Over the cache capacity. Evict least recent used entry"); + if let CacheFileOrOnMemory::File(path) = v.target { + let mut mgr = mgr.write().await; + if let Err(e) = mgr.remove(&path).await { + warn!("Eviction failed during file object removal over the capacity: {:?}", e); + }; + } + } + } + Ok(()) + }); + + Ok(()) + } +} + +fn derive_filename_from_uri(uri: &hyper::Uri) -> String { + let mut hasher = Sha256::new(); + hasher.update(uri.to_string()); + let digest = hasher.finalize(); + general_purpose::URL_SAFE_NO_PAD.encode(digest) +} + +fn derive_cache_key_from_uri(uri: &hyper::Uri) -> String { + uri.to_string() +} + +pub fn get_policy_if_cacheable(req: Option<&Request>, res: Option<&Response>) -> Result> +where + R: Debug, +{ + // deduce cache policy from req and res + let (Some(req), Some(res)) = (req, res) else { + return Err(RpxyError::Cache("Invalid null request and/or response")); + }; + + let new_policy = CachePolicy::new(req, res); + if new_policy.is_storable() { + debug!("Response is cacheable: {:?}\n{:?}", req, res.headers()); + Ok(Some(new_policy)) + } else { + Ok(None) + } +} diff --git a/rpxy-lib/src/handler/forwarder.rs b/rpxy-lib/src/handler/forwarder.rs index 4bfd15b..ce49433 100644 --- a/rpxy-lib/src/handler/forwarder.rs +++ b/rpxy-lib/src/handler/forwarder.rs @@ -1,6 +1,11 @@ -use crate::error::RpxyError; +#[cfg(feature = "cache")] +use super::cache::{get_policy_if_cacheable, RpxyCache}; +#[cfg(feature = "cache")] +use crate::log::*; +use crate::{error::RpxyError, globals::Globals, CryptoSource}; use async_trait::async_trait; -use derive_builder::Builder; +#[cfg(feature = "cache")] +use bytes::Buf; use hyper::{ body::{Body, HttpBody}, client::{connect::Connect, HttpConnector}, @@ -9,6 +14,20 @@ use hyper::{ }; use hyper_rustls::HttpsConnector; +#[cfg(feature = "cache")] +/// Build synthetic request to cache +fn build_synth_req_for_cache(req: &Request) -> Request<()> { + let mut builder = Request::builder() + .method(req.method()) + .uri(req.uri()) + .version(req.version()); + // TODO: omit extensions. is this approach correct? + for (header_key, header_value) in req.headers() { + builder = builder.header(header_key, header_value); + } + builder.body(()).unwrap() +} + #[async_trait] /// Definition of the forwarder that simply forward requests from downstream client to upstream app servers. pub trait ForwardRequest { @@ -16,13 +35,13 @@ pub trait ForwardRequest { async fn request(&self, req: Request) -> Result, Self::Error>; } -#[derive(Builder, Clone)] -/// Forwarder struct +/// Forwarder struct responsible to cache handling pub struct Forwarder where C: Connect + Clone + Sync + Send + 'static, { - // TODO: maybe this forwarder definition is suitable for cache handling. + #[cfg(feature = "cache")] + cache: Option, inner: Client, inner_h2: Client, // `h2c` or http/2-only client is defined separately } @@ -36,10 +55,60 @@ where C: Connect + Clone + Sync + Send + 'static, { type Error = RpxyError; + + #[cfg(feature = "cache")] async fn request(&self, req: Request) -> Result, Self::Error> { + let mut synth_req = None; + if self.cache.is_some() { + if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { + // if found, return it as response. + info!("Cache hit - Return from cache"); + return Ok(cached_response); + }; + + // Synthetic request copy used just for caching (cannot clone request object...) + synth_req = Some(build_synth_req_for_cache(&req)); + } + // TODO: This 'match' condition is always evaluated at every 'request' invocation. So, it is inefficient. // Needs to be reconsidered. Currently, this is a kind of work around. // This possibly relates to https://github.com/hyperium/hyper/issues/2417. + let res = match req.version() { + Version::HTTP_2 => self.inner_h2.request(req).await.map_err(RpxyError::Hyper), // handles `h2c` requests + _ => self.inner.request(req).await.map_err(RpxyError::Hyper), + }; + + if self.cache.is_none() { + return res; + } + + // check cacheability and store it if cacheable + let Ok(Some(cache_policy)) = get_policy_if_cacheable(synth_req.as_ref(), res.as_ref().ok()) else { + return res; + }; + let (parts, body) = res.unwrap().into_parts(); + // TODO: Inefficient? + let Ok(mut bytes) = hyper::body::aggregate(body).await else { + return Err(RpxyError::Cache("Failed to write byte buffer")); + }; + let aggregated = bytes.copy_to_bytes(bytes.remaining()); + + if let Err(cache_err) = self + .cache + .as_ref() + .unwrap() + .put(synth_req.unwrap().uri(), &aggregated, &cache_policy) + .await + { + error!("{:?}", cache_err); + }; + + // res + Ok(Response::from_parts(parts, Body::from(aggregated))) + } + + #[cfg(not(feature = "cache"))] + async fn request(&self, req: Request) -> Result, Self::Error> { match req.version() { Version::HTTP_2 => self.inner_h2.request(req).await.map_err(RpxyError::Hyper), // handles `h2c` requests _ => self.inner.request(req).await.map_err(RpxyError::Hyper), @@ -48,7 +117,8 @@ where } impl Forwarder, Body> { - pub async fn new() -> Self { + /// Build forwarder + pub async fn new(_globals: &std::sync::Arc>) -> Self { // let connector = TrustDnsResolver::default().into_rustls_webpki_https_connector(); let connector = hyper_rustls::HttpsConnectorBuilder::new() .with_webpki_roots() @@ -64,6 +134,13 @@ impl Forwarder, Body> { let inner = Client::builder().build::<_, Body>(connector); let inner_h2 = Client::builder().http2_only(true).build::<_, Body>(connector_h2); + + #[cfg(feature = "cache")] + { + let cache = RpxyCache::new(_globals).await; + Self { inner, inner_h2, cache } + } + #[cfg(not(feature = "cache"))] Self { inner, inner_h2 } } } diff --git a/rpxy-lib/src/handler/mod.rs b/rpxy-lib/src/handler/mod.rs index 854bd8f..84e0226 100644 --- a/rpxy-lib/src/handler/mod.rs +++ b/rpxy-lib/src/handler/mod.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "cache")] +mod cache; mod forwarder; mod handler_main; mod utils_headers; diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index b3af2a8..fd242c5 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -60,6 +60,15 @@ where if !proxy_config.sni_consistency { info!("Ignore consistency between TLS SNI and Host header (or Request line). Note it violates RFC."); } + #[cfg(feature = "cache")] + if proxy_config.cache_enabled { + info!( + "Cache is enabled: cache dir = {:?}", + proxy_config.cache_dir.as_ref().unwrap() + ); + } else { + info!("Cache is disabled") + } // build global let globals = Arc::new(Globals { @@ -72,7 +81,7 @@ where // build message handler including a request forwarder let msg_handler = Arc::new( HttpMessageHandlerBuilder::default() - .forwarder(Arc::new(Forwarder::new().await)) + .forwarder(Arc::new(Forwarder::new(&globals).await)) .globals(globals.clone()) .build()?, ); diff --git a/s2n-quic b/s2n-quic deleted file mode 160000 index 1ff2cd2..0000000 --- a/s2n-quic +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 1ff2cd230fdf46596fe77830966857c438a8b31a diff --git a/h3 b/submodules/h3 similarity index 100% rename from h3 rename to submodules/h3 diff --git a/h3-quinn/Cargo.toml b/submodules/h3-quinn/Cargo.toml similarity index 100% rename from h3-quinn/Cargo.toml rename to submodules/h3-quinn/Cargo.toml diff --git a/h3-quinn/src/lib.rs b/submodules/h3-quinn/src/lib.rs similarity index 100% rename from h3-quinn/src/lib.rs rename to submodules/h3-quinn/src/lib.rs diff --git a/submodules/quinn b/submodules/quinn new file mode 160000 index 0000000..4f25f50 --- /dev/null +++ b/submodules/quinn @@ -0,0 +1 @@ +Subproject commit 4f25f501ef4d009af9d3bef44d322c09c327b2df diff --git a/submodules/rusty-http-cache-semantics b/submodules/rusty-http-cache-semantics new file mode 160000 index 0000000..3cd0917 --- /dev/null +++ b/submodules/rusty-http-cache-semantics @@ -0,0 +1 @@ +Subproject commit 3cd09170305753309d86e88b9427827cca0de0dd diff --git a/submodules/s2n-quic b/submodules/s2n-quic new file mode 160000 index 0000000..c7e3f51 --- /dev/null +++ b/submodules/s2n-quic @@ -0,0 +1 @@ +Subproject commit c7e3f517f3267f2e19b605cb53e6bf265e70e5af