diff --git a/.gitmodules b/.gitmodules index 59b7ea8..65fcd3b 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,9 +1,12 @@ -[submodule "h3"] - path = h3 +[submodule "submodules/h3"] + path = submodules/h3 url = git@github.com:junkurihara/h3.git -[submodule "quinn"] - path = quinn +[submodule "submodules/quinn"] + path = submodules/quinn url = git@github.com:junkurihara/quinn.git -[submodule "s2n-quic"] - path = s2n-quic +[submodule "submodules/s2n-quic"] + path = submodules/s2n-quic url = git@github.com:junkurihara/s2n-quic.git +[submodule "submodules/rusty-http-cache-semantics"] + path = submodules/rusty-http-cache-semantics + url = git@github.com:junkurihara/rusty-http-cache-semantics.git diff --git a/Cargo.toml b/Cargo.toml index aa65657..0a32c32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] members = ["rpxy-bin", "rpxy-lib"] -exclude = ["quinn", "h3-quinn", "h3", "s2n-quic"] +exclude = ["submodules", "h3-quinn"] [profile.release] codegen-units = 1 diff --git a/config-example.toml b/config-example.toml index 561ebc2..41d70e7 100644 --- a/config-example.toml +++ b/config-example.toml @@ -107,3 +107,7 @@ max_concurrent_bidistream = 100 max_concurrent_unistream = 100 max_idle_timeout = 10 # secs. 0 represents an infinite timeout. # WARNING: If a peer or its network path malfunctions or acts maliciously, an infinite idle timeout can result in permanently hung futures! + +# If this specified, file cache feature is enabled +[experimental.cache] +cache_dir = './cache' # optional. default is "./cache" relative to the current working directory diff --git a/quinn b/quinn deleted file mode 160000 index 8076ffe..0000000 --- a/quinn +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 8076ffe94d38813ce0220af9d3438e7bfb5e8429 diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index 65fe24d..96771c1 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -12,27 +12,28 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["http3-quinn"] +default = ["http3-quinn", "cache"] http3-quinn = ["rpxy-lib/http3-quinn"] http3-s2n = ["rpxy-lib/http3-s2n"] +cache = [] [dependencies] rpxy-lib = { path = "../rpxy-lib/", default-features = false, features = [ "sticky-cookie", ] } -anyhow = "1.0.72" +anyhow = "1.0.73" rustc-hash = "1.1.0" serde = { version = "1.0.183", default-features = false, features = ["derive"] } derive_builder = "0.12.0" -tokio = { version = "1.29.1", default-features = false, features = [ +tokio = { version = "1.31.0", default-features = false, features = [ "net", "rt-multi-thread", "time", "sync", "macros", ] } -async-trait = "0.1.72" +async-trait = "0.1.73" rustls-pemfile = "1.0.3" # config diff --git a/rpxy-bin/src/config/toml.rs b/rpxy-bin/src/config/toml.rs index 5f6ab4a..60f13b3 100644 --- a/rpxy-bin/src/config/toml.rs +++ b/rpxy-bin/src/config/toml.rs @@ -6,7 +6,7 @@ use crate::{ use rpxy_lib::{reexports::Uri, AppConfig, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri}; use rustc_hash::FxHashMap as HashMap; use serde::Deserialize; -use std::{fs, net::SocketAddr}; +use std::{fs, net::SocketAddr, path::PathBuf}; #[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct ConfigToml { @@ -32,10 +32,17 @@ pub struct Http3Option { pub max_idle_timeout: Option, } +#[cfg(feature = "cache")] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] +pub struct CacheOption { + pub cache_dir: Option, +} + #[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct Experimental { #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] pub h3: Option, + pub cache: Option, pub ignore_sni_consistency: Option, } @@ -160,6 +167,14 @@ impl TryInto for &ConfigToml { if let Some(ignore) = exp.ignore_sni_consistency { proxy_config.sni_consistency = !ignore; } + + if let Some(cache_option) = &exp.cache { + proxy_config.cache_enabled = true; + proxy_config.cache_dir = match &cache_option.cache_dir { + Some(cache_dir) => Some(PathBuf::from(cache_dir)), + None => Some(PathBuf::from(CACHE_DIR)), + } + } } Ok(proxy_config) diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs index 323615f..a7e811a 100644 --- a/rpxy-bin/src/constants.rs +++ b/rpxy-bin/src/constants.rs @@ -1,3 +1,4 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; pub const CONFIG_WATCH_DELAY_SECS: u32 = 20; +pub const CACHE_DIR: &str = "./cache"; diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index bcb56e3..a859131 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -23,19 +23,20 @@ rustc-hash = "1.1.0" bytes = "1.4.0" derive_builder = "0.12.0" futures = { version = "0.3.28", features = ["alloc", "async-await"] } -tokio = { version = "1.29.1", default-features = false, features = [ +tokio = { version = "1.31.0", default-features = false, features = [ "net", "rt-multi-thread", "time", "sync", "macros", + "fs", ] } -async-trait = "0.1.72" +async-trait = "0.1.73" hot_reload = "0.1.4" # reloading certs # Error handling -anyhow = "1.0.72" -thiserror = "1.0.44" +anyhow = "1.0.73" +thiserror = "1.0.45" # http and tls hyper = { version = "0.14.27", default-features = false, features = [ @@ -60,17 +61,21 @@ tracing = { version = "0.1.37" } # http/3 # quinn = { version = "0.9.3", optional = true } -quinn = { path = "../quinn/quinn", optional = true } # Tentative to support rustls-0.21 -h3 = { path = "../h3/h3/", optional = true } +quinn = { path = "../submodules/quinn/quinn", optional = true } # Tentative to support rustls-0.21 +h3 = { path = "../submodules/h3/h3/", optional = true } # h3-quinn = { path = "./h3/h3-quinn/", optional = true } -h3-quinn = { path = "../h3-quinn/", optional = true } # Tentative to support rustls-0.21 +h3-quinn = { path = "../submodules/h3-quinn/", optional = true } # Tentative to support rustls-0.21 # for UDP socket wit SO_REUSEADDR when h3 with quinn socket2 = { version = "0.5.3", features = ["all"], optional = true } -s2n-quic = { path = "../s2n-quic/quic/s2n-quic/", default-features = false, features = [ +s2n-quic = { path = "../submodules/s2n-quic/quic/s2n-quic/", default-features = false, features = [ "provider-tls-rustls", ], optional = true } -s2n-quic-h3 = { path = "../s2n-quic/quic/s2n-quic-h3/", optional = true } -s2n-quic-rustls = { path = "../s2n-quic/quic/s2n-quic-rustls/", optional = true } +s2n-quic-h3 = { path = "../submodules/s2n-quic/quic/s2n-quic-h3/", optional = true } +s2n-quic-rustls = { path = "../submodules/s2n-quic/quic/s2n-quic-rustls/", optional = true } + +# cache +http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/" } +moka = { version = "0.11.3", features = ["future"] } # cookie handling for sticky cookie chrono = { version = "0.4.26", default-features = false, features = [ diff --git a/rpxy-lib/src/constants.rs b/rpxy-lib/src/constants.rs index b7b0bff..93b54bf 100644 --- a/rpxy-lib/src/constants.rs +++ b/rpxy-lib/src/constants.rs @@ -31,3 +31,5 @@ pub mod H3 { #[cfg(feature = "sticky-cookie")] /// For load-balancing with sticky cookie pub const STICKY_COOKIE_NAME: &str = "rpxy_srv_id"; + +pub const MAX_CACHE_ENTRY: u64 = 10_000; diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index da56dac..c672682 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -22,6 +22,9 @@ pub enum RpxyError { #[error("Http Message Handler Error: {0}")] Handler(&'static str), + #[error("Cache Error: {0}")] + Cache(&'static str), + #[error("Http Request Message Error: {0}")] Request(&'static str), diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index 0bed623..9442507 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -9,11 +9,11 @@ use crate::{ utils::{BytesName, PathNameBytesExp}, }; use rustc_hash::FxHashMap as HashMap; -use std::net::SocketAddr; use std::sync::{ atomic::{AtomicUsize, Ordering}, Arc, }; +use std::{net::SocketAddr, path::PathBuf}; use tokio::time::Duration; /// Global object containing proxy configurations and shared object like counters. @@ -52,6 +52,10 @@ pub struct ProxyConfig { // experimentals pub sni_consistency: bool, // Handler + + pub cache_enabled: bool, + pub cache_dir: Option, + // All need to make packet acceptor #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] pub http3: bool, @@ -87,6 +91,9 @@ impl Default for ProxyConfig { sni_consistency: true, + cache_enabled: false, + cache_dir: None, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] http3: false, #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] diff --git a/rpxy-lib/src/handler/cache.rs b/rpxy-lib/src/handler/cache.rs new file mode 100644 index 0000000..6b672d3 --- /dev/null +++ b/rpxy-lib/src/handler/cache.rs @@ -0,0 +1,196 @@ +use crate::{constants::MAX_CACHE_ENTRY, error::*, globals::Globals, log::*, CryptoSource}; +use base64::{engine::general_purpose, Engine as _}; +use bytes::{Buf, Bytes, BytesMut}; +use http_cache_semantics::CachePolicy; +use hyper::{ + http::{Request, Response}, + Body, +}; +use moka::future::Cache as MokaCache; +use sha2::{Digest, Sha256}; +use std::{fmt::Debug, path::PathBuf, time::SystemTime}; +use tokio::{ + fs::{self, File}, + io::{AsyncReadExt, AsyncWriteExt}, +}; + +// #[async_trait] +// pub trait CacheTarget { +// type TargetInput; +// type TargetOutput; +// type Error; +// /// Get target object from somewhere +// async fn get(&self) -> Self::TargetOutput; +// /// Write target object into somewhere +// async fn put(&self, taget: Self::TargetOutput) -> Result<(), Self::Error>; +// /// Remove target object from somewhere (when evicted self) +// async fn remove(&self) -> Result<(), Self::Error>; +// } + +fn derive_filename_from_uri(uri: &hyper::Uri) -> String { + let mut hasher = Sha256::new(); + hasher.update(uri.to_string()); + let digest = hasher.finalize(); + general_purpose::URL_SAFE_NO_PAD.encode(digest) +} + +fn derive_moka_key_from_uri(uri: &hyper::Uri) -> String { + uri.to_string() +} + +#[derive(Clone, Debug)] +pub struct CacheObject { + pub policy: CachePolicy, + pub target: Option, +} + +#[derive(Clone, Debug)] +pub struct RpxyCache { + cache_dir: PathBuf, + inner: MokaCache, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + runtime_handle: tokio::runtime::Handle, +} + +impl RpxyCache { + /// Generate cache storage + pub async fn new(globals: &Globals) -> Option { + if !globals.proxy_config.cache_enabled { + return None; + } + let runtime_handle = globals.runtime_handle.clone(); + let runtime_handle_clone = globals.runtime_handle.clone(); + let eviction_listener = move |k, v: CacheObject, cause| { + debug!("Cache entry is being evicted : {k} {:?}", cause); + runtime_handle.block_on(async { + if let Some(filepath) = v.target { + debug!("Evict file object: {k}"); + if let Err(e) = fs::remove_file(filepath).await { + warn!("Eviction failed during file object removal: {:?}", e); + }; + } + }) + }; + + // Create cache file dir + // Clean up the file dir before init + // TODO: Persistent cache is really difficult. maybe SQLite is needed. + let path = globals.proxy_config.cache_dir.as_ref().unwrap(); + if let Err(e) = fs::remove_dir_all(path).await { + warn!("Failed to clean up the cache dir: {e}"); + }; + fs::create_dir_all(path).await.unwrap(); + + Some(Self { + cache_dir: path.clone(), + inner: MokaCache::builder() + .max_capacity(MAX_CACHE_ENTRY) + .eviction_listener_with_queued_delivery_mode(eviction_listener) + .build(), // TODO: make this configurable, and along with size + runtime_handle: runtime_handle_clone, + }) + } + + /// Get cached response + pub async fn get(&self, req: &Request) -> Option> { + let moka_key = req.uri().to_string(); + + // First check cache chance + let Some(cached_object) = self.inner.get(&moka_key) else { + return None; + }; + + let now = SystemTime::now(); + if let http_cache_semantics::BeforeRequest::Fresh(res_parts) = cached_object.policy.before_request(req, now) { + let Some(filepath) = cached_object.target else { + return None; + }; + + let Ok(mut file) = File::open(&filepath.clone()).await else { + warn!("Cache file doesn't exist. Remove pointer cache."); + let my_cache = self.inner.clone(); + self.runtime_handle.spawn(async move { + my_cache.invalidate(&moka_key).await; + }); + return None; + }; + let (body_sender, res_body) = Body::channel(); + self.runtime_handle.spawn(async move { + let mut sender = body_sender; + // let mut size = 0usize; + let mut buf = BytesMut::new(); + loop { + match file.read_buf(&mut buf).await { + Ok(0) => break, + Ok(_) => sender.send_data(buf.copy_to_bytes(buf.remaining())).await?, + Err(_) => break, + }; + } + Ok(()) as Result<()> + }); + + let res = Response::from_parts(res_parts, res_body); + debug!("Cache hit: {moka_key}"); + Some(res) + } else { + // Evict stale cache entry here + debug!("Evict stale cache entry and file object: {moka_key}"); + let my_cache = self.inner.clone(); + self.runtime_handle.spawn(async move { + // eviction listener will be activated during invalidation. + my_cache.invalidate(&moka_key).await; + }); + None + } + } + + pub fn is_cacheable(&self, req: Option<&Request>, res: Option<&Response>) -> Result> + where + R: Debug, + { + // deduce cache policy from req and res + let (Some(req), Some(res)) = (req, res) else { + return Err(RpxyError::Cache("Invalid null request and/or response")); + }; + + let new_policy = CachePolicy::new(req, res); + if new_policy.is_storable() { + debug!("Response is cacheable: {:?}\n{:?}", req, res.headers()); + Ok(Some(new_policy)) + } else { + Ok(None) + } + } + + pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: CachePolicy) -> Result<()> { + let my_cache = self.inner.clone(); + let uri = uri.clone(); + let cache_dir = self.cache_dir.clone(); + let mut bytes_clone = body_bytes.clone(); + + self.runtime_handle.spawn(async move { + let moka_key = derive_moka_key_from_uri(&uri); + let cache_filename = derive_filename_from_uri(&uri); + let cache_filepath = cache_dir.join(cache_filename); + + let _x = my_cache + .get_with(moka_key, async { + let mut file = File::create(&cache_filepath).await.unwrap(); + while bytes_clone.has_remaining() { + if let Err(e) = file.write_buf(&mut bytes_clone).await { + error!("Failed to write file cache: {e}"); + return CacheObject { policy, target: None }; + }; + } + CacheObject { + policy, + target: Some(cache_filepath), + } + }) + .await; + + debug!("Current cache entries: {}", my_cache.entry_count()); + }); + + Ok(()) + } +} diff --git a/rpxy-lib/src/handler/forwarder.rs b/rpxy-lib/src/handler/forwarder.rs index 4bfd15b..fab1342 100644 --- a/rpxy-lib/src/handler/forwarder.rs +++ b/rpxy-lib/src/handler/forwarder.rs @@ -1,5 +1,7 @@ -use crate::error::RpxyError; +use super::cache::RpxyCache; +use crate::{error::RpxyError, globals::Globals, log::*, CryptoSource}; use async_trait::async_trait; +use bytes::Buf; use derive_builder::Builder; use hyper::{ body::{Body, HttpBody}, @@ -9,6 +11,18 @@ use hyper::{ }; use hyper_rustls::HttpsConnector; +fn build_synth_req_for_cache(req: &Request) -> Request<()> { + let mut builder = Request::builder() + .method(req.method()) + .uri(req.uri()) + .version(req.version()); + // TODO: omit extensions. is this approach correct? + for (header_key, header_value) in req.headers() { + builder = builder.header(header_key, header_value); + } + builder.body(()).unwrap() +} + #[async_trait] /// Definition of the forwarder that simply forward requests from downstream client to upstream app servers. pub trait ForwardRequest { @@ -17,12 +31,12 @@ pub trait ForwardRequest { } #[derive(Builder, Clone)] -/// Forwarder struct +/// Forwarder struct responsible to cache handling pub struct Forwarder where C: Connect + Clone + Sync + Send + 'static, { - // TODO: maybe this forwarder definition is suitable for cache handling. + cache: Option, inner: Client, inner_h2: Client, // `h2c` or http/2-only client is defined separately } @@ -37,18 +51,63 @@ where { type Error = RpxyError; async fn request(&self, req: Request) -> Result, Self::Error> { + let mut synth_req = None; + if self.cache.is_some() { + debug!("Search cache first"); + if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { + // if found, return it as response. + debug!("Cache hit - Return from cache"); + return Ok(cached_response); + }; + + // Synthetic request copy used just for caching (cannot clone request object...) + synth_req = Some(build_synth_req_for_cache(&req)); + } + // TODO: This 'match' condition is always evaluated at every 'request' invocation. So, it is inefficient. // Needs to be reconsidered. Currently, this is a kind of work around. // This possibly relates to https://github.com/hyperium/hyper/issues/2417. - match req.version() { + let res = match req.version() { Version::HTTP_2 => self.inner_h2.request(req).await.map_err(RpxyError::Hyper), // handles `h2c` requests _ => self.inner.request(req).await.map_err(RpxyError::Hyper), + }; + + if self.cache.is_none() { + return res; } + + // check cacheability and store it if cacheable + let Ok(Some(cache_policy)) = self + .cache + .as_ref() + .unwrap() + .is_cacheable(synth_req.as_ref(), res.as_ref().ok()) else { + return res; + }; + let (parts, body) = res.unwrap().into_parts(); + // TODO: Inefficient? + let Ok(mut bytes) = hyper::body::aggregate(body).await else { + return Err(RpxyError::Cache("Failed to write byte buffer")); + }; + let aggregated = bytes.copy_to_bytes(bytes.remaining()); + + if let Err(cache_err) = self + .cache + .as_ref() + .unwrap() + .put(synth_req.unwrap().uri(), &aggregated, cache_policy) + .await + { + error!("{:?}", cache_err); + }; + + // res + Ok(Response::from_parts(parts, Body::from(aggregated))) } } impl Forwarder, Body> { - pub async fn new() -> Self { + pub async fn new(globals: &std::sync::Arc>) -> Self { // let connector = TrustDnsResolver::default().into_rustls_webpki_https_connector(); let connector = hyper_rustls::HttpsConnectorBuilder::new() .with_webpki_roots() @@ -64,6 +123,8 @@ impl Forwarder, Body> { let inner = Client::builder().build::<_, Body>(connector); let inner_h2 = Client::builder().http2_only(true).build::<_, Body>(connector_h2); - Self { inner, inner_h2 } + + let cache = RpxyCache::new(globals).await; + Self { inner, inner_h2, cache } } } diff --git a/rpxy-lib/src/handler/mod.rs b/rpxy-lib/src/handler/mod.rs index 854bd8f..beea073 100644 --- a/rpxy-lib/src/handler/mod.rs +++ b/rpxy-lib/src/handler/mod.rs @@ -1,3 +1,4 @@ +mod cache; mod forwarder; mod handler_main; mod utils_headers; @@ -6,6 +7,7 @@ mod utils_synth_response; #[cfg(feature = "sticky-cookie")] use crate::backend::LbContext; +pub use cache::CacheObject; pub use { forwarder::Forwarder, handler_main::{HttpMessageHandler, HttpMessageHandlerBuilder, HttpMessageHandlerBuilderError}, diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index b3af2a8..d3f7bca 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -22,6 +22,7 @@ use std::sync::Arc; pub use crate::{ certs::{CertsAndKeys, CryptoSource}, globals::{AppConfig, AppConfigList, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri}, + handler::CacheObject, }; pub mod reexports { pub use hyper::Uri; @@ -60,6 +61,12 @@ where if !proxy_config.sni_consistency { info!("Ignore consistency between TLS SNI and Host header (or Request line). Note it violates RFC."); } + if proxy_config.cache_enabled { + info!( + "Cache is enabled: cache dir = {:?}", + proxy_config.cache_dir.as_ref().unwrap() + ); + } // build global let globals = Arc::new(Globals { @@ -72,7 +79,7 @@ where // build message handler including a request forwarder let msg_handler = Arc::new( HttpMessageHandlerBuilder::default() - .forwarder(Arc::new(Forwarder::new().await)) + .forwarder(Arc::new(Forwarder::new(&globals).await)) .globals(globals.clone()) .build()?, ); diff --git a/s2n-quic b/s2n-quic deleted file mode 160000 index 1ff2cd2..0000000 --- a/s2n-quic +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 1ff2cd230fdf46596fe77830966857c438a8b31a diff --git a/h3 b/submodules/h3 similarity index 100% rename from h3 rename to submodules/h3 diff --git a/h3-quinn/Cargo.toml b/submodules/h3-quinn/Cargo.toml similarity index 100% rename from h3-quinn/Cargo.toml rename to submodules/h3-quinn/Cargo.toml diff --git a/h3-quinn/src/lib.rs b/submodules/h3-quinn/src/lib.rs similarity index 100% rename from h3-quinn/src/lib.rs rename to submodules/h3-quinn/src/lib.rs diff --git a/submodules/quinn b/submodules/quinn new file mode 160000 index 0000000..4f25f50 --- /dev/null +++ b/submodules/quinn @@ -0,0 +1 @@ +Subproject commit 4f25f501ef4d009af9d3bef44d322c09c327b2df diff --git a/submodules/rusty-http-cache-semantics b/submodules/rusty-http-cache-semantics new file mode 160000 index 0000000..3cd0917 --- /dev/null +++ b/submodules/rusty-http-cache-semantics @@ -0,0 +1 @@ +Subproject commit 3cd09170305753309d86e88b9427827cca0de0dd diff --git a/submodules/s2n-quic b/submodules/s2n-quic new file mode 160000 index 0000000..c7e3f51 --- /dev/null +++ b/submodules/s2n-quic @@ -0,0 +1 @@ +Subproject commit c7e3f517f3267f2e19b605cb53e6bf265e70e5af