diff --git a/.github/workflows/docker_build_push.yml b/.github/workflows/docker_build_push.yml index e2d801c..f7cea2b 100644 --- a/.github/workflows/docker_build_push.yml +++ b/.github/workflows/docker_build_push.yml @@ -42,7 +42,7 @@ jobs: - target: "s2n" dockerfile: ./docker/Dockerfile build-args: | - "CARGO_FEATURES=--no-default-features --features http3-s2n" + "CARGO_FEATURES=--no-default-features --features=http3-s2n,cache" "ADDITIONAL_DEPS=pkg-config libssl-dev cmake libclang1 gcc g++" platforms: linux/amd64,linux/arm64 tags-suffix: "-s2n" diff --git a/.gitmodules b/.gitmodules index 59b7ea8..65fcd3b 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,9 +1,12 @@ -[submodule "h3"] - path = h3 +[submodule "submodules/h3"] + path = submodules/h3 url = git@github.com:junkurihara/h3.git -[submodule "quinn"] - path = quinn +[submodule "submodules/quinn"] + path = submodules/quinn url = git@github.com:junkurihara/quinn.git -[submodule "s2n-quic"] - path = s2n-quic +[submodule "submodules/s2n-quic"] + path = submodules/s2n-quic url = git@github.com:junkurihara/s2n-quic.git +[submodule "submodules/rusty-http-cache-semantics"] + path = submodules/rusty-http-cache-semantics + url = git@github.com:junkurihara/rusty-http-cache-semantics.git diff --git a/CHANGELOG.md b/CHANGELOG.md index 2df06ae..f094cf8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,19 @@ # CHANGELOG -## 0.6.0 (unreleased) +## 0.7.0 (unreleased) + +## 0.6.0 + +### Improvement + +- Feat: Enabled `h2c` (HTTP/2 cleartext) requests to upstream app servers (in the previous versions, only HTTP/1.1 is allowed for cleartext requests) +- Feat: Initial implementation of caching feature using file + on memory cache. (Caveats: No persistance of the cache. Once config is updated, the cache is totally eliminated.) +- Refactor: lots of minor improvements + +### Bugfix + +- Fix: fix `server` in the response header (`rpxy_lib` -> `rpxy`) +- Fix: fix bug for hot-reloading configuration file (Add termination notification receiver in proxy services) ## 0.5.0 diff --git a/Cargo.toml b/Cargo.toml index aa65657..29e2277 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,7 +1,7 @@ [workspace] members = ["rpxy-bin", "rpxy-lib"] -exclude = ["quinn", "h3-quinn", "h3", "s2n-quic"] +exclude = ["submodules"] [profile.release] codegen-units = 1 diff --git a/README.md b/README.md index f14668d..5561511 100644 --- a/README.md +++ b/README.md @@ -257,7 +257,7 @@ Other than them, all you need is to mount your `config.toml` as `/etc/rpxy.toml` ### HTTP/3 -`rpxy` can serves HTTP/3 requests thanks to `quinn` and `hyperium/h3`. To enable this experimental feature, add an entry `experimental.h3` in your `config.toml` like follows. Any values in the entry like `alt_svc_max_age` are optional. +`rpxy` can serves HTTP/3 requests thanks to `quinn`, `s2n-quic` and `hyperium/h3`. To enable this experimental feature, add an entry `experimental.h3` in your `config.toml` like follows. Any values in the entry like `alt_svc_max_age` are optional. ```toml [experimental.h3] @@ -281,6 +281,21 @@ tls = { https_redirection = true, tls_cert_path = './server.crt', tls_cert_key_p However, currently we have a limitation on HTTP/3 support for applications that enables client authentication. If an application is set with client authentication, HTTP/3 doesn't work for the application. +### Hybrid Caching Feature with Temporary File and On-Memory Cache + +If `[experimental.cache]` is specified in `config.toml`, you can leverage the local caching feature using temporary files and on-memory objects. An example configuration is as follows. + +```toml +# If this specified, file cache feature is enabled +[experimental.cache] +cache_dir = './cache' # optional. default is "./cache" relative to the current working directory +max_cache_entry = 1000 # optional. default is 1k +max_cache_each_size = 65535 # optional. default is 64k +max_cache_each_size_on_memory = 4096 # optional. default is 4k if 0, it is always file cache. +``` + +A *storable* (in the context of an HTTP message) response is stored if its size is less than or equal to `max_cache_each_size` in bytes. If it is also less than or equal to `max_cache_each_size_on_memory`, it is stored as an on-memory object. Otherwise, it is stored as a temporary file. Note that `max_cache_each_size` must be larger or equal to `max_cache_each_size_on_memory`. Also note that once `rpxy` restarts or the config is updated, the cache is totally eliminated not only from the on-memory table but also from the file system. + ## TIPS ### Using Private Key Issued by Let's Encrypt diff --git a/TODO.md b/TODO.md index bf783c7..1e25ee1 100644 --- a/TODO.md +++ b/TODO.md @@ -1,6 +1,10 @@ # TODO List -- [Try in v0.6.0] **Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55))** +- [Done in 0.6.0] But we need more sophistication on `Forwarder` struct. ~~Fix strategy for `h2c` requests on forwarded requests upstream. This needs to update forwarder definition. Also, maybe forwarder would have a cache corresponding to the following task.~~ +- [Initial implementation in v0.6.0] ~~**Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55))**~~ Using `lru` crate might be inefficient in terms of the speed. + - Consider more sophisticated architecture for cache + - Persistent cache (if possible). + - etc etc - Improvement of path matcher - More flexible option for rewriting path - Refactoring @@ -31,5 +35,7 @@ ~~Benchmark with other reverse proxy implementations like Sozu ([#58](https://github.com/junkurihara/rust-rpxy/issues/58)) Currently, Sozu can work only on `amd64` format due to its HTTP message parser limitation... Since the main developer have only `arm64` (Apple M1) laptops, so we should do that on VPS?~~ - Done in v0.4.0: ~~Split `rpxy` source codes into `rpxy-lib` and `rpxy-bin` to make the core part (reverse proxy) isolated from the misc part like toml file loader. This is in order to make the configuration-related part more flexible (related to [#33](https://github.com/junkurihara/rust-rpxy/issues/33))~~ +- Done in 0.6.0: + ~~Fix dynamic reloading of configuration file~~ - etc. diff --git a/config-example.toml b/config-example.toml index 605067c..ec79f3d 100644 --- a/config-example.toml +++ b/config-example.toml @@ -56,7 +56,10 @@ upstream = [ { location = 'www.yahoo.co.jp', tls = true }, ] load_balance = "round_robin" # or "random" or "sticky" (sticky session) or "none" (fix to the first one, default) -upstream_options = ["override_host", "convert_https_to_2"] +upstream_options = [ + "override_host", + "force_http2_upstream", # mutually exclusive with "force_http11_upstream" +] # Non-default destination in "localhost" app, which is routed by "path" [[apps.localhost.reverse_proxy]] @@ -75,7 +78,7 @@ load_balance = "random" # or "round_robin" or "sticky" (sticky session) or "none upstream_options = [ "override_host", "upgrade_insecure_requests", - "convert_https_to_11", + "force_http11_upstream", ] ###################################################################### @@ -104,3 +107,10 @@ max_concurrent_bidistream = 100 max_concurrent_unistream = 100 max_idle_timeout = 10 # secs. 0 represents an infinite timeout. # WARNING: If a peer or its network path malfunctions or acts maliciously, an infinite idle timeout can result in permanently hung futures! + +# If this specified, file cache feature is enabled +[experimental.cache] +cache_dir = './cache' # optional. default is "./cache" relative to the current working directory +max_cache_entry = 1000 # optional. default is 1k +max_cache_each_size = 65535 # optional. default is 64k +max_cache_each_size_on_memory = 4096 # optional. default is 4k if 0, it is always file cache. diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index 180ab93..63e997b 100644 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -58,8 +58,9 @@ EOF ####################################### function setup_ubuntu () { + id ${USER} > /dev/null # Check the existence of the user, if not exist, create it. - if [ ! $(id ${USER}) ]; then + if [ $? -eq 1 ]; then echo "rpxy: Create user ${USER} with ${USER_ID}:${GROUP_ID}" groupadd -g ${GROUP_ID} ${USER} useradd -u ${USER_ID} -g ${GROUP_ID} ${USER} @@ -81,8 +82,9 @@ function setup_ubuntu () { ####################################### function setup_alpine () { + id ${USER} > /dev/null # Check the existence of the user, if not exist, create it. - if [ ! $(id ${USER}) ]; then + if [ $? -eq 1 ]; then echo "rpxy: Create user ${USER} with ${USER_ID}:${GROUP_ID}" addgroup -g ${GROUP_ID} ${USER} adduser -H -D -u ${USER_ID} -G ${USER} ${USER} diff --git a/quinn b/quinn deleted file mode 160000 index 70e14b5..0000000 --- a/quinn +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 70e14b5c26b45ee1e3d5dd64b2a184e2d6376880 diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index d7f5808..99ee25f 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rpxy" -version = "0.5.0" +version = "0.6.0" authors = ["Jun Kurihara"] homepage = "https://github.com/junkurihara/rust-rpxy" repository = "https://github.com/junkurihara/rust-rpxy" @@ -12,31 +12,33 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["http3-quinn"] +default = ["http3-quinn", "cache"] http3-quinn = ["rpxy-lib/http3-quinn"] http3-s2n = ["rpxy-lib/http3-s2n"] +cache = ["rpxy-lib/cache"] [dependencies] rpxy-lib = { path = "../rpxy-lib/", default-features = false, features = [ "sticky-cookie", ] } -anyhow = "1.0.72" +anyhow = "1.0.75" rustc-hash = "1.1.0" -serde = { version = "1.0.180", default-features = false, features = ["derive"] } +serde = { version = "1.0.186", default-features = false, features = ["derive"] } derive_builder = "0.12.0" -tokio = { version = "1.29.1", default-features = false, features = [ +tokio = { version = "1.32.0", default-features = false, features = [ "net", "rt-multi-thread", "time", "sync", "macros", ] } -async-trait = "0.1.72" +async-trait = "0.1.73" rustls-pemfile = "1.0.3" +mimalloc = { version = "*", default-features = false } # config -clap = { version = "4.3.19", features = ["std", "cargo", "wrap_help"] } +clap = { version = "4.3.24", features = ["std", "cargo", "wrap_help"] } toml = { version = "0.7.6", default-features = false, features = ["parse"] } hot_reload = "0.1.4" @@ -45,8 +47,4 @@ tracing = { version = "0.1.37" } tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } -[target.'cfg(not(target_env = "msvc"))'.dependencies] -tikv-jemallocator = "0.5.4" - - [dev-dependencies] diff --git a/rpxy-bin/src/config/toml.rs b/rpxy-bin/src/config/toml.rs index 5f6ab4a..e678012 100644 --- a/rpxy-bin/src/config/toml.rs +++ b/rpxy-bin/src/config/toml.rs @@ -32,10 +32,21 @@ pub struct Http3Option { pub max_idle_timeout: Option, } +#[cfg(feature = "cache")] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] +pub struct CacheOption { + pub cache_dir: Option, + pub max_cache_entry: Option, + pub max_cache_each_size: Option, + pub max_cache_each_size_on_memory: Option, +} + #[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct Experimental { #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] pub h3: Option, + #[cfg(feature = "cache")] + pub cache: Option, pub ignore_sni_consistency: Option, } @@ -160,6 +171,24 @@ impl TryInto for &ConfigToml { if let Some(ignore) = exp.ignore_sni_consistency { proxy_config.sni_consistency = !ignore; } + + #[cfg(feature = "cache")] + if let Some(cache_option) = &exp.cache { + proxy_config.cache_enabled = true; + proxy_config.cache_dir = match &cache_option.cache_dir { + Some(cache_dir) => Some(std::path::PathBuf::from(cache_dir)), + None => Some(std::path::PathBuf::from(CACHE_DIR)), + }; + if let Some(num) = cache_option.max_cache_entry { + proxy_config.cache_max_entry = num; + } + if let Some(num) = cache_option.max_cache_each_size { + proxy_config.cache_max_each_size = num; + } + if let Some(num) = cache_option.max_cache_each_size_on_memory { + proxy_config.cache_max_each_size_on_memory = num; + } + } } Ok(proxy_config) diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs index 323615f..53c8bbc 100644 --- a/rpxy-bin/src/constants.rs +++ b/rpxy-bin/src/constants.rs @@ -1,3 +1,7 @@ pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; pub const CONFIG_WATCH_DELAY_SECS: u32 = 20; + +#[cfg(feature = "cache")] +// Cache directory +pub const CACHE_DIR: &str = "./cache"; diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs index 861c3d5..f04a6f1 100644 --- a/rpxy-bin/src/main.rs +++ b/rpxy-bin/src/main.rs @@ -1,9 +1,5 @@ -#[cfg(not(target_env = "msvc"))] -use tikv_jemallocator::Jemalloc; - -#[cfg(not(target_env = "msvc"))] #[global_allocator] -static GLOBAL: Jemalloc = Jemalloc; +static ALLOC: mimalloc::MiMalloc = mimalloc::MiMalloc; mod cert_file_reader; mod config; @@ -84,7 +80,7 @@ async fn rpxy_service_without_watcher( return Err(anyhow::anyhow!(e)); } }; - entrypoint(&proxy_conf, &app_conf, &runtime_handle) + entrypoint(&proxy_conf, &app_conf, &runtime_handle, None) .await .map_err(|e| anyhow::anyhow!(e)) } @@ -105,10 +101,13 @@ async fn rpxy_service_with_watcher( } }; + // Notifier for proxy service termination + let term_notify = std::sync::Arc::new(tokio::sync::Notify::new()); + // Continuous monitoring loop { tokio::select! { - _ = entrypoint(&proxy_conf, &app_conf, &runtime_handle) => { + _ = entrypoint(&proxy_conf, &app_conf, &runtime_handle, Some(term_notify.clone())) => { error!("rpxy entrypoint exited"); break; } @@ -127,7 +126,9 @@ async fn rpxy_service_with_watcher( continue; } }; - info!("Configuration updated. Force to re-bind TCP/UDP sockets"); + info!("Configuration updated. Terminate all spawned proxy services and force to re-bind TCP/UDP sockets"); + term_notify.notify_waiters(); + // tokio::time::sleep(tokio::time::Duration::from_secs(1)).await; } else => break } diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index e1327f7..8dc7c8f 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rpxy-lib" -version = "0.5.0" +version = "0.6.0" authors = ["Jun Kurihara"] homepage = "https://github.com/junkurihara/rust-rpxy" repository = "https://github.com/junkurihara/rust-rpxy" @@ -12,10 +12,11 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["http3-quinn", "sticky-cookie"] +default = ["http3-quinn", "sticky-cookie", "cache"] http3-quinn = ["quinn", "h3", "h3-quinn", "socket2"] http3-s2n = ["h3", "s2n-quic", "s2n-quic-rustls", "s2n-quic-h3"] sticky-cookie = ["base64", "sha2", "chrono"] +cache = ["http-cache-semantics", "lru"] [dependencies] rand = "0.8.5" @@ -23,19 +24,20 @@ rustc-hash = "1.1.0" bytes = "1.4.0" derive_builder = "0.12.0" futures = { version = "0.3.28", features = ["alloc", "async-await"] } -tokio = { version = "1.29.1", default-features = false, features = [ +tokio = { version = "1.32.0", default-features = false, features = [ "net", "rt-multi-thread", "time", "sync", "macros", + "fs", ] } -async-trait = "0.1.72" +async-trait = "0.1.73" hot_reload = "0.1.4" # reloading certs # Error handling -anyhow = "1.0.72" -thiserror = "1.0.44" +anyhow = "1.0.75" +thiserror = "1.0.47" # http and tls hyper = { version = "0.14.27", default-features = false, features = [ @@ -51,26 +53,30 @@ hyper-rustls = { version = "0.24.1", default-features = false, features = [ "http2", ] } tokio-rustls = { version = "0.24.1", features = ["early-data"] } -rustls = { version = "0.21.5", default-features = false } +rustls = { version = "0.21.6", default-features = false } webpki = "0.22.0" -x509-parser = "0.15.0" +x509-parser = "0.15.1" # logging tracing = { version = "0.1.37" } # http/3 # quinn = { version = "0.9.3", optional = true } -quinn = { path = "../quinn/quinn", optional = true } # Tentative to support rustls-0.21 -h3 = { path = "../h3/h3/", optional = true } +quinn = { path = "../submodules/quinn/quinn", optional = true } # Tentative to support rustls-0.21 +h3 = { path = "../submodules/h3/h3/", optional = true } # h3-quinn = { path = "./h3/h3-quinn/", optional = true } -h3-quinn = { path = "../h3-quinn/", optional = true } # Tentative to support rustls-0.21 +h3-quinn = { path = "../submodules/h3-quinn/", optional = true } # Tentative to support rustls-0.21 # for UDP socket wit SO_REUSEADDR when h3 with quinn socket2 = { version = "0.5.3", features = ["all"], optional = true } -s2n-quic = { path = "../s2n-quic/quic/s2n-quic/", default-features = false, features = [ +s2n-quic = { path = "../submodules/s2n-quic/quic/s2n-quic/", default-features = false, features = [ "provider-tls-rustls", ], optional = true } -s2n-quic-h3 = { path = "../s2n-quic/quic/s2n-quic-h3/", optional = true } -s2n-quic-rustls = { path = "../s2n-quic/quic/s2n-quic-rustls/", optional = true } +s2n-quic-h3 = { path = "../submodules/s2n-quic/quic/s2n-quic-h3/", optional = true } +s2n-quic-rustls = { path = "../submodules/s2n-quic/quic/s2n-quic-rustls/", optional = true } + +# cache +http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics/", optional = true } +lru = { version = "0.11.0", optional = true } # cookie handling for sticky cookie chrono = { version = "0.4.26", default-features = false, features = [ diff --git a/rpxy-lib/src/backend/upstream_opts.rs b/rpxy-lib/src/backend/upstream_opts.rs index 1cdb2a5..a96bb58 100644 --- a/rpxy-lib/src/backend/upstream_opts.rs +++ b/rpxy-lib/src/backend/upstream_opts.rs @@ -4,8 +4,8 @@ use crate::error::*; pub enum UpstreamOption { OverrideHost, UpgradeInsecureRequests, - ConvertHttpsTo11, - ConvertHttpsTo2, + ForceHttp11Upstream, + ForceHttp2Upstream, // TODO: Adds more options for heder override } impl TryFrom<&str> for UpstreamOption { @@ -14,8 +14,8 @@ impl TryFrom<&str> for UpstreamOption { match val { "override_host" => Ok(Self::OverrideHost), "upgrade_insecure_requests" => Ok(Self::UpgradeInsecureRequests), - "convert_https_to_11" => Ok(Self::ConvertHttpsTo11), - "convert_https_to_2" => Ok(Self::ConvertHttpsTo2), + "force_http11_upstream" => Ok(Self::ForceHttp11Upstream), + "force_http2_upstream" => Ok(Self::ForceHttp2Upstream), _ => Err(RpxyError::Other(anyhow!("Unsupported header option"))), } } diff --git a/rpxy-lib/src/constants.rs b/rpxy-lib/src/constants.rs index 39a93e7..ebec1fc 100644 --- a/rpxy-lib/src/constants.rs +++ b/rpxy-lib/src/constants.rs @@ -1,3 +1,4 @@ +pub const RESPONSE_HEADER_SERVER: &str = "rpxy"; // pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; // pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; pub const TCP_LISTEN_BACKLOG: u32 = 1024; @@ -30,3 +31,15 @@ pub mod H3 { #[cfg(feature = "sticky-cookie")] /// For load-balancing with sticky cookie pub const STICKY_COOKIE_NAME: &str = "rpxy_srv_id"; + +#[cfg(feature = "cache")] +// # of entries in cache +pub const MAX_CACHE_ENTRY: usize = 1_000; +#[cfg(feature = "cache")] +// max size for each file in bytes +pub const MAX_CACHE_EACH_SIZE: usize = 65_535; +#[cfg(feature = "cache")] +// on memory cache if less than or equel to +pub const MAX_CACHE_EACH_SIZE_ON_MEMORY: usize = 4_096; + +// TODO: max cache size in total diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index dd88a9a..c672682 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -7,13 +7,13 @@ pub type Result = std::result::Result; /// Describes things that can go wrong in the Rpxy #[derive(Debug, Error)] pub enum RpxyError { - #[error("Proxy build error")] + #[error("Proxy build error: {0}")] ProxyBuild(#[from] crate::proxy::ProxyBuilderError), - #[error("Backend build error")] + #[error("Backend build error: {0}")] BackendBuild(#[from] crate::backend::BackendBuilderError), - #[error("MessageHandler build error")] + #[error("MessageHandler build error: {0}")] HandlerBuild(#[from] crate::handler::HttpMessageHandlerBuilderError), #[error("Config builder error: {0}")] @@ -22,6 +22,9 @@ pub enum RpxyError { #[error("Http Message Handler Error: {0}")] Handler(&'static str), + #[error("Cache Error: {0}")] + Cache(&'static str), + #[error("Http Request Message Error: {0}")] Request(&'static str), @@ -32,40 +35,40 @@ pub enum RpxyError { #[error("LoadBalance Layer Error: {0}")] LoadBalance(String), - #[error("I/O Error")] + #[error("I/O Error: {0}")] Io(#[from] io::Error), // #[error("Toml Deserialization Error")] // TomlDe(#[from] toml::de::Error), #[cfg(feature = "http3-quinn")] - #[error("Quic Connection Error")] + #[error("Quic Connection Error [quinn]: {0}")] QuicConn(#[from] quinn::ConnectionError), #[cfg(feature = "http3-s2n")] - #[error("Quic Connection Error [s2n-quic]")] + #[error("Quic Connection Error [s2n-quic]: {0}")] QUicConn(#[from] s2n_quic::connection::Error), #[cfg(feature = "http3-quinn")] - #[error("H3 Error")] + #[error("H3 Error [quinn]: {0}")] H3(#[from] h3::Error), #[cfg(feature = "http3-s2n")] - #[error("H3 Error [s2n-quic]")] + #[error("H3 Error [s2n-quic]: {0}")] H3(#[from] s2n_quic_h3::h3::Error), - #[error("rustls Connection Error")] + #[error("rustls Connection Error: {0}")] Rustls(#[from] rustls::Error), - #[error("Hyper Error")] + #[error("Hyper Error: {0}")] Hyper(#[from] hyper::Error), - #[error("Hyper Http Error")] + #[error("Hyper Http Error: {0}")] HyperHttp(#[from] hyper::http::Error), - #[error("Hyper Http HeaderValue Error")] + #[error("Hyper Http HeaderValue Error: {0}")] HyperHeaderValue(#[from] hyper::header::InvalidHeaderValue), - #[error("Hyper Http HeaderName Error")] + #[error("Hyper Http HeaderName Error: {0}")] HyperHeaderName(#[from] hyper::header::InvalidHeaderName), #[error(transparent)] diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs index 6186d84..d1c0130 100644 --- a/rpxy-lib/src/globals.rs +++ b/rpxy-lib/src/globals.rs @@ -52,6 +52,18 @@ pub struct ProxyConfig { // experimentals pub sni_consistency: bool, // Handler + + #[cfg(feature = "cache")] + pub cache_enabled: bool, + #[cfg(feature = "cache")] + pub cache_dir: Option, + #[cfg(feature = "cache")] + pub cache_max_entry: usize, + #[cfg(feature = "cache")] + pub cache_max_each_size: usize, + #[cfg(feature = "cache")] + pub cache_max_each_size_on_memory: usize, + // All need to make packet acceptor #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] pub http3: bool, @@ -87,6 +99,17 @@ impl Default for ProxyConfig { sni_consistency: true, + #[cfg(feature = "cache")] + cache_enabled: false, + #[cfg(feature = "cache")] + cache_dir: None, + #[cfg(feature = "cache")] + cache_max_entry: MAX_CACHE_ENTRY, + #[cfg(feature = "cache")] + cache_max_each_size: MAX_CACHE_EACH_SIZE, + #[cfg(feature = "cache")] + cache_max_each_size_on_memory: MAX_CACHE_EACH_SIZE_ON_MEMORY, + #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] http3: false, #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] @@ -225,7 +248,8 @@ where } if !(upstream.iter().all(|(_, elem)| { - !(elem.opts.contains(&UpstreamOption::ConvertHttpsTo11) && elem.opts.contains(&UpstreamOption::ConvertHttpsTo2)) + !(elem.opts.contains(&UpstreamOption::ForceHttp11Upstream) + && elem.opts.contains(&UpstreamOption::ForceHttp2Upstream)) })) { error!("Either one of force_http11 or force_http2 can be enabled"); return Err(RpxyError::ConfigBuild("Invalid upstream option setting")); diff --git a/rpxy-lib/src/handler/cache.rs b/rpxy-lib/src/handler/cache.rs new file mode 100644 index 0000000..44cdc11 --- /dev/null +++ b/rpxy-lib/src/handler/cache.rs @@ -0,0 +1,393 @@ +use crate::{error::*, globals::Globals, log::*, CryptoSource}; +use base64::{engine::general_purpose, Engine as _}; +use bytes::{Buf, Bytes, BytesMut}; +use http_cache_semantics::CachePolicy; +use hyper::{ + http::{Request, Response}, + Body, +}; +use lru::LruCache; +use sha2::{Digest, Sha256}; +use std::{ + fmt::Debug, + path::{Path, PathBuf}, + sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, Mutex, + }, + time::SystemTime, +}; +use tokio::{ + fs::{self, File}, + io::{AsyncReadExt, AsyncWriteExt}, + sync::RwLock, +}; + +#[derive(Clone, Debug)] +/// Cache target in hybrid manner of on-memory and file system +pub enum CacheFileOrOnMemory { + /// Pointer to the temporary cache file + File(PathBuf), + /// Cached body itself + OnMemory(Vec), +} + +#[derive(Clone, Debug)] +/// Cache object definition +struct CacheObject { + /// Cache policy to determine if the stored cache can be used as a response to a new incoming request + pub policy: CachePolicy, + /// Cache target: on-memory object or temporary file + pub target: CacheFileOrOnMemory, +} + +#[derive(Debug)] +/// Manager inner for cache on file system +struct CacheFileManagerInner { + /// Directory of temporary files + cache_dir: PathBuf, + /// Counter of current cached files + cnt: usize, + /// Async runtime + runtime_handle: tokio::runtime::Handle, +} + +impl CacheFileManagerInner { + /// Build new cache file manager. + /// This first creates cache file dir if not exists, and cleans up the file inside the directory. + /// TODO: Persistent cache is really difficult. `sqlite` or something like that is needed. + async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { + let path_buf = path.as_ref().to_path_buf(); + if let Err(e) = fs::remove_dir_all(path).await { + warn!("Failed to clean up the cache dir: {e}"); + }; + fs::create_dir_all(&path_buf).await.unwrap(); + Self { + cache_dir: path_buf.clone(), + cnt: 0, + runtime_handle: runtime_handle.clone(), + } + } + + /// Create a new temporary file cache + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> Result { + let cache_filepath = self.cache_dir.join(cache_filename); + let Ok(mut file) = File::create(&cache_filepath).await else { + return Err(RpxyError::Cache("Failed to create file")); + }; + let mut bytes_clone = body_bytes.clone(); + while bytes_clone.has_remaining() { + if let Err(e) = file.write_buf(&mut bytes_clone).await { + error!("Failed to write file cache: {e}"); + return Err(RpxyError::Cache("Failed to write file cache: {e}")); + }; + } + self.cnt += 1; + Ok(CacheFileOrOnMemory::File(cache_filepath)) + } + + /// Retrieve a stored temporary file cache + async fn read(&self, path: impl AsRef) -> Result { + let Ok(mut file) = File::open(&path).await else { + warn!("Cache file object cannot be opened"); + return Err(RpxyError::Cache("Cache file object cannot be opened")); + }; + let (body_sender, res_body) = Body::channel(); + self.runtime_handle.spawn(async move { + let mut sender = body_sender; + let mut buf = BytesMut::new(); + loop { + match file.read_buf(&mut buf).await { + Ok(0) => break, + Ok(_) => sender.send_data(buf.copy_to_bytes(buf.remaining())).await?, + Err(_) => break, + }; + } + Ok(()) as Result<()> + }); + + Ok(res_body) + } + + /// Remove file + async fn remove(&mut self, path: impl AsRef) -> Result<()> { + fs::remove_file(path.as_ref()).await?; + self.cnt -= 1; + debug!("Removed a cache file at {:?} (file count: {})", path.as_ref(), self.cnt); + + Ok(()) + } +} + +#[derive(Debug, Clone)] +/// Cache file manager outer that is responsible to handle `RwLock` +struct CacheFileManager { + inner: Arc>, +} + +impl CacheFileManager { + /// Build manager + async fn new(path: impl AsRef, runtime_handle: &tokio::runtime::Handle) -> Self { + Self { + inner: Arc::new(RwLock::new(CacheFileManagerInner::new(path, runtime_handle).await)), + } + } + /// Evict a temporary file cache + async fn evict(&self, path: impl AsRef) { + // Acquire the write lock + let mut inner = self.inner.write().await; + if let Err(e) = inner.remove(path).await { + warn!("Eviction failed during file object removal: {:?}", e); + }; + } + /// Read a temporary file cache + async fn read(&self, path: impl AsRef) -> Result { + let mgr = self.inner.read().await; + mgr.read(&path).await + } + /// Create a temporary file cache + async fn create(&mut self, cache_filename: &str, body_bytes: &Bytes) -> Result { + let mut mgr = self.inner.write().await; + mgr.create(cache_filename, body_bytes).await + } + async fn count(&self) -> usize { + let mgr = self.inner.read().await; + mgr.cnt + } +} + +#[derive(Debug, Clone)] +/// Lru cache manager that is responsible to handle `Mutex` as an outer of `LruCache` +struct LruCacheManager { + inner: Arc>>, // TODO: keyはstring urlでいいのか疑問。全requestに対してcheckすることになりそう + cnt: Arc, +} + +impl LruCacheManager { + /// Build LruCache + fn new(cache_max_entry: usize) -> Self { + Self { + inner: Arc::new(Mutex::new(LruCache::new( + std::num::NonZeroUsize::new(cache_max_entry).unwrap(), + ))), + cnt: Arc::new(AtomicUsize::default()), + } + } + /// Count entries + fn count(&self) -> usize { + self.cnt.load(Ordering::Relaxed) + } + /// Evict an entry + fn evict(&self, cache_key: &str) -> Option<(String, CacheObject)> { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked to evict a cache entry"); + return None; + }; + let res = lock.pop_entry(cache_key); + self.cnt.store(lock.len(), Ordering::Relaxed); + res + } + /// Get an entry + fn get(&self, cache_key: &str) -> Result> { + let Ok(mut lock) = self.inner.lock() else { + error!("Mutex can't be locked for checking cache entry"); + return Err(RpxyError::Cache("Mutex can't be locked for checking cache entry")); + }; + let Some(cached_object) = lock.get(cache_key) else { + return Ok(None); + }; + Ok(Some(cached_object.clone())) + } + /// Push an entry + fn push(&self, cache_key: &str, cache_object: CacheObject) -> Result> { + let Ok(mut lock) = self.inner.lock() else { + error!("Failed to acquire mutex lock for writing cache entry"); + return Err(RpxyError::Cache("Failed to acquire mutex lock for writing cache entry")); + }; + let res = Ok(lock.push(cache_key.to_string(), cache_object)); + self.cnt.store(lock.len(), Ordering::Relaxed); + res + } +} + +#[derive(Clone, Debug)] +pub struct RpxyCache { + /// Managing cache file objects through RwLock's lock mechanism for file lock + cache_file_manager: CacheFileManager, + /// Lru cache storing http message caching policy + inner: LruCacheManager, + /// Async runtime + runtime_handle: tokio::runtime::Handle, + /// Maximum size of each cache file object + max_each_size: usize, + /// Maximum size of cache object on memory + max_each_size_on_memory: usize, +} + +impl RpxyCache { + /// Generate cache storage + pub async fn new(globals: &Globals) -> Option { + if !globals.proxy_config.cache_enabled { + return None; + } + + let path = globals.proxy_config.cache_dir.as_ref().unwrap(); + let cache_file_manager = CacheFileManager::new(path, &globals.runtime_handle).await; + let inner = LruCacheManager::new(globals.proxy_config.cache_max_entry); + + let max_each_size = globals.proxy_config.cache_max_each_size; + let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory; + if max_each_size < max_each_size_on_memory { + warn!( + "Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache" + ); + max_each_size_on_memory = max_each_size; + } + + Some(Self { + cache_file_manager, + inner, + runtime_handle: globals.runtime_handle.clone(), + max_each_size, + max_each_size_on_memory, + }) + } + + /// Count cache entries + pub async fn count(&self) -> (usize, usize, usize) { + let total = self.inner.count(); + let file = self.cache_file_manager.count().await; + let on_memory = total - file; + (total, on_memory, file) + } + + /// Get cached response + pub async fn get(&self, req: &Request) -> Option> { + debug!( + "Current cache status: (total, on-memory, file) = {:?}", + self.count().await + ); + let cache_key = req.uri().to_string(); + + // First check cache chance + let Ok(Some(cached_object)) = self.inner.get(&cache_key) else { + return None; + }; + + // Secondly check the cache freshness as an HTTP message + let now = SystemTime::now(); + let http_cache_semantics::BeforeRequest::Fresh(res_parts) = cached_object.policy.before_request(req, now) else { + // Evict stale cache entry. + // This might be okay to keep as is since it would be updated later. + // However, there is no guarantee that newly got objects will be still cacheable. + // So, we have to evict stale cache entries and cache file objects if found. + debug!("Stale cache entry: {cache_key}"); + let _evicted_entry = self.inner.evict(&cache_key); + // For cache file + if let CacheFileOrOnMemory::File(path) = &cached_object.target { + self.cache_file_manager.evict(&path).await; + } + return None; + }; + + // Finally retrieve the file/on-memory object + match cached_object.target { + CacheFileOrOnMemory::File(path) => { + let res_body = match self.cache_file_manager.read(&path).await { + Ok(res_body) => res_body, + Err(e) => { + warn!("Failed to read from file cache: {e}"); + let _evicted_entry = self.inner.evict(&cache_key); + self.cache_file_manager.evict(&path).await; + return None; + } + }; + + debug!("Cache hit from file: {cache_key}"); + Some(Response::from_parts(res_parts, res_body)) + } + CacheFileOrOnMemory::OnMemory(object) => { + debug!("Cache hit from on memory: {cache_key}"); + Some(Response::from_parts(res_parts, Body::from(object))) + } + } + } + + /// Put response into the cache + pub async fn put(&self, uri: &hyper::Uri, body_bytes: &Bytes, policy: &CachePolicy) -> Result<()> { + let my_cache = self.inner.clone(); + let mut mgr = self.cache_file_manager.clone(); + let uri = uri.clone(); + let bytes_clone = body_bytes.clone(); + let policy_clone = policy.clone(); + let max_each_size = self.max_each_size; + let max_each_size_on_memory = self.max_each_size_on_memory; + + self.runtime_handle.spawn(async move { + if bytes_clone.len() > max_each_size { + warn!("Too large to cache"); + return Err(RpxyError::Cache("Too large to cache")); + } + let cache_key = derive_cache_key_from_uri(&uri); + + debug!("Object of size {:?} bytes to be cached", bytes_clone.len()); + + let cache_object = if bytes_clone.len() > max_each_size_on_memory { + let cache_filename = derive_filename_from_uri(&uri); + let target = mgr.create(&cache_filename, &bytes_clone).await?; + debug!("Cached a new cache file: {} - {}", cache_key, cache_filename); + CacheObject { + policy: policy_clone, + target, + } + } else { + debug!("Cached a new object on memory: {}", cache_key); + CacheObject { + policy: policy_clone, + target: CacheFileOrOnMemory::OnMemory(bytes_clone.to_vec()), + } + }; + + if let Some((k, v)) = my_cache.push(&cache_key, cache_object)? { + if k != cache_key { + info!("Over the cache capacity. Evict least recent used entry"); + if let CacheFileOrOnMemory::File(path) = v.target { + mgr.evict(&path).await; + } + } + } + Ok(()) + }); + + Ok(()) + } +} + +fn derive_filename_from_uri(uri: &hyper::Uri) -> String { + let mut hasher = Sha256::new(); + hasher.update(uri.to_string()); + let digest = hasher.finalize(); + general_purpose::URL_SAFE_NO_PAD.encode(digest) +} + +fn derive_cache_key_from_uri(uri: &hyper::Uri) -> String { + uri.to_string() +} + +pub fn get_policy_if_cacheable(req: Option<&Request>, res: Option<&Response>) -> Result> +where + R: Debug, +{ + // deduce cache policy from req and res + let (Some(req), Some(res)) = (req, res) else { + return Err(RpxyError::Cache("Invalid null request and/or response")); + }; + + let new_policy = CachePolicy::new(req, res); + if new_policy.is_storable() { + // debug!("Response is cacheable: {:?}\n{:?}", req, res.headers()); + Ok(Some(new_policy)) + } else { + Ok(None) + } +} diff --git a/rpxy-lib/src/handler/forwarder.rs b/rpxy-lib/src/handler/forwarder.rs new file mode 100644 index 0000000..43cf098 --- /dev/null +++ b/rpxy-lib/src/handler/forwarder.rs @@ -0,0 +1,145 @@ +#[cfg(feature = "cache")] +use super::cache::{get_policy_if_cacheable, RpxyCache}; +#[cfg(feature = "cache")] +use crate::log::*; +use crate::{error::RpxyError, globals::Globals, CryptoSource}; +use async_trait::async_trait; +#[cfg(feature = "cache")] +use bytes::Buf; +use hyper::{ + body::{Body, HttpBody}, + client::{connect::Connect, HttpConnector}, + http::Version, + Client, Request, Response, +}; +use hyper_rustls::HttpsConnector; + +#[cfg(feature = "cache")] +/// Build synthetic request to cache +fn build_synth_req_for_cache(req: &Request) -> Request<()> { + let mut builder = Request::builder() + .method(req.method()) + .uri(req.uri()) + .version(req.version()); + // TODO: omit extensions. is this approach correct? + for (header_key, header_value) in req.headers() { + builder = builder.header(header_key, header_value); + } + builder.body(()).unwrap() +} + +#[async_trait] +/// Definition of the forwarder that simply forward requests from downstream client to upstream app servers. +pub trait ForwardRequest { + type Error; + async fn request(&self, req: Request) -> Result, Self::Error>; +} + +/// Forwarder struct responsible to cache handling +pub struct Forwarder +where + C: Connect + Clone + Sync + Send + 'static, +{ + #[cfg(feature = "cache")] + cache: Option, + inner: Client, + inner_h2: Client, // `h2c` or http/2-only client is defined separately +} + +#[async_trait] +impl ForwardRequest for Forwarder +where + B: HttpBody + Send + Sync + 'static, + B::Data: Send, + B::Error: Into>, + C: Connect + Clone + Sync + Send + 'static, +{ + type Error = RpxyError; + + #[cfg(feature = "cache")] + async fn request(&self, req: Request) -> Result, Self::Error> { + let mut synth_req = None; + if self.cache.is_some() { + if let Some(cached_response) = self.cache.as_ref().unwrap().get(&req).await { + // if found, return it as response. + info!("Cache hit - Return from cache"); + return Ok(cached_response); + }; + + // Synthetic request copy used just for caching (cannot clone request object...) + synth_req = Some(build_synth_req_for_cache(&req)); + } + + // TODO: This 'match' condition is always evaluated at every 'request' invocation. So, it is inefficient. + // Needs to be reconsidered. Currently, this is a kind of work around. + // This possibly relates to https://github.com/hyperium/hyper/issues/2417. + let res = match req.version() { + Version::HTTP_2 => self.inner_h2.request(req).await.map_err(RpxyError::Hyper), // handles `h2c` requests + _ => self.inner.request(req).await.map_err(RpxyError::Hyper), + }; + + if self.cache.is_none() { + return res; + } + + // check cacheability and store it if cacheable + let Ok(Some(cache_policy)) = get_policy_if_cacheable(synth_req.as_ref(), res.as_ref().ok()) else { + return res; + }; + let (parts, body) = res.unwrap().into_parts(); + let Ok(mut bytes) = hyper::body::aggregate(body).await else { + return Err(RpxyError::Cache("Failed to write byte buffer")); + }; + let aggregated = bytes.copy_to_bytes(bytes.remaining()); + + if let Err(cache_err) = self + .cache + .as_ref() + .unwrap() + .put(synth_req.unwrap().uri(), &aggregated, &cache_policy) + .await + { + error!("{:?}", cache_err); + }; + + // res + Ok(Response::from_parts(parts, Body::from(aggregated))) + } + + #[cfg(not(feature = "cache"))] + async fn request(&self, req: Request) -> Result, Self::Error> { + match req.version() { + Version::HTTP_2 => self.inner_h2.request(req).await.map_err(RpxyError::Hyper), // handles `h2c` requests + _ => self.inner.request(req).await.map_err(RpxyError::Hyper), + } + } +} + +impl Forwarder, Body> { + /// Build forwarder + pub async fn new(_globals: &std::sync::Arc>) -> Self { + // let connector = TrustDnsResolver::default().into_rustls_webpki_https_connector(); + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_webpki_roots() + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + let connector_h2 = hyper_rustls::HttpsConnectorBuilder::new() + .with_webpki_roots() + .https_or_http() + .enable_http2() + .build(); + + let inner = Client::builder().build::<_, Body>(connector); + let inner_h2 = Client::builder().http2_only(true).build::<_, Body>(connector_h2); + + #[cfg(feature = "cache")] + { + let cache = RpxyCache::new(_globals).await; + Self { inner, inner_h2, cache } + } + #[cfg(not(feature = "cache"))] + Self { inner, inner_h2 } + } +} diff --git a/rpxy-lib/src/handler/handler_main.rs b/rpxy-lib/src/handler/handler_main.rs index 0b554ae..8b13dc7 100644 --- a/rpxy-lib/src/handler/handler_main.rs +++ b/rpxy-lib/src/handler/handler_main.rs @@ -1,8 +1,15 @@ // Highly motivated by https://github.com/felipenoris/hyper-reverse-proxy -use super::{utils_headers::*, utils_request::*, utils_synth_response::*, HandlerContext}; +use super::{ + forwarder::{ForwardRequest, Forwarder}, + utils_headers::*, + utils_request::*, + utils_synth_response::*, + HandlerContext, +}; use crate::{ backend::{Backend, UpstreamGroup}, certs::CryptoSource, + constants::RESPONSE_HEADER_SERVER, error::*, globals::Globals, log::*, @@ -13,9 +20,9 @@ use hyper::{ client::connect::Connect, header::{self, HeaderValue}, http::uri::Scheme, - Body, Client, Request, Response, StatusCode, Uri, Version, + Body, Request, Response, StatusCode, Uri, Version, }; -use std::{env, net::SocketAddr, sync::Arc}; +use std::{net::SocketAddr, sync::Arc}; use tokio::{io::copy_bidirectional, time::timeout}; #[derive(Clone, Builder)] @@ -26,7 +33,7 @@ where T: Connect + Clone + Sync + Send + 'static, U: CryptoSource + Clone, { - forwarder: Arc>, + forwarder: Arc>, globals: Arc>, } @@ -43,7 +50,7 @@ where /// Handle incoming request message from a client pub async fn handle_request( - self, + &self, mut req: Request, client_addr: SocketAddr, // アクセス制御用 listen_addr: SocketAddr, @@ -208,7 +215,7 @@ where let headers = response.headers_mut(); remove_connection_header(headers); remove_hop_header(headers); - add_header_entry_overwrite_if_exist(headers, "server", env!("CARGO_PKG_NAME"))?; + add_header_entry_overwrite_if_exist(headers, "server", RESPONSE_HEADER_SERVER)?; #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] { @@ -356,15 +363,18 @@ where } // If not specified (force_httpXX_upstream) and https, version is preserved except for http/3 - apply_upstream_options_to_request_line(req, upstream_group)?; - // Maybe workaround: Change version to http/1.1 when destination scheme is http - if req.version() != Version::HTTP_11 && upstream_chosen.uri.scheme() == Some(&Scheme::HTTP) { + if upstream_chosen.uri.scheme() == Some(&Scheme::HTTP) { + // Change version to http/1.1 when destination scheme is http + debug!("Change version to http/1.1 when destination scheme is http unless upstream option enabled."); *req.version_mut() = Version::HTTP_11; } else if req.version() == Version::HTTP_3 { - debug!("HTTP/3 is currently unsupported for request to upstream. Use HTTP/2."); + // HTTP/3 is always https + debug!("HTTP/3 is currently unsupported for request to upstream."); *req.version_mut() = Version::HTTP_2; } + apply_upstream_options_to_request_line(req, upstream_group)?; + Ok(context) } } diff --git a/rpxy-lib/src/handler/mod.rs b/rpxy-lib/src/handler/mod.rs index aed9831..84e0226 100644 --- a/rpxy-lib/src/handler/mod.rs +++ b/rpxy-lib/src/handler/mod.rs @@ -1,3 +1,6 @@ +#[cfg(feature = "cache")] +mod cache; +mod forwarder; mod handler_main; mod utils_headers; mod utils_request; @@ -5,7 +8,10 @@ mod utils_synth_response; #[cfg(feature = "sticky-cookie")] use crate::backend::LbContext; -pub use handler_main::{HttpMessageHandler, HttpMessageHandlerBuilder, HttpMessageHandlerBuilderError}; +pub use { + forwarder::Forwarder, + handler_main::{HttpMessageHandler, HttpMessageHandlerBuilder, HttpMessageHandlerBuilderError}, +}; #[allow(dead_code)] #[derive(Debug)] diff --git a/rpxy-lib/src/handler/utils_headers.rs b/rpxy-lib/src/handler/utils_headers.rs index d09df79..6a09c1d 100644 --- a/rpxy-lib/src/handler/utils_headers.rs +++ b/rpxy-lib/src/handler/utils_headers.rs @@ -23,7 +23,7 @@ pub(super) fn takeout_sticky_cookie_lb_context( ) -> Result> { let mut headers_clone = headers.clone(); - match headers_clone.entry(hyper::header::COOKIE) { + match headers_clone.entry(header::COOKIE) { header::Entry::Vacant(_) => Ok(None), header::Entry::Occupied(entry) => { let cookies_iter = entry @@ -43,8 +43,8 @@ pub(super) fn takeout_sticky_cookie_lb_context( } let cookies_passed_to_upstream = without_sticky_cookies.join("; "); let cookie_passed_to_lb = sticky_cookies.first().unwrap(); - headers.remove(hyper::header::COOKIE); - headers.insert(hyper::header::COOKIE, cookies_passed_to_upstream.parse()?); + headers.remove(header::COOKIE); + headers.insert(header::COOKIE, cookies_passed_to_upstream.parse()?); let sticky_cookie = StickyCookie { value: StickyCookieValue::try_from(cookie_passed_to_lb, expected_cookie_name)?, @@ -63,7 +63,7 @@ pub(super) fn set_sticky_cookie_lb_context(headers: &mut HeaderMap, context_from let sticky_cookie_string: String = context_from_lb.sticky_cookie.clone().try_into()?; let new_header_val: HeaderValue = sticky_cookie_string.parse()?; let expected_cookie_name = &context_from_lb.sticky_cookie.value.name; - match headers.entry(hyper::header::SET_COOKIE) { + match headers.entry(header::SET_COOKIE) { header::Entry::Vacant(entry) => { entry.insert(new_header_val); } @@ -173,13 +173,13 @@ pub(super) fn add_header_entry_overwrite_if_exist( pub(super) fn make_cookie_single_line(headers: &mut HeaderMap) -> Result<()> { let cookies = headers .iter() - .filter(|(k, _)| **k == hyper::header::COOKIE) + .filter(|(k, _)| **k == header::COOKIE) .map(|(_, v)| v.to_str().unwrap_or("")) .collect::>() .join("; "); if !cookies.is_empty() { - headers.remove(hyper::header::COOKIE); - headers.insert(hyper::header::COOKIE, HeaderValue::from_bytes(cookies.as_bytes())?); + headers.remove(header::COOKIE); + headers.insert(header::COOKIE, HeaderValue::from_bytes(cookies.as_bytes())?); } Ok(()) } diff --git a/rpxy-lib/src/handler/utils_request.rs b/rpxy-lib/src/handler/utils_request.rs index 03e36a1..6204f41 100644 --- a/rpxy-lib/src/handler/utils_request.rs +++ b/rpxy-lib/src/handler/utils_request.rs @@ -11,8 +11,12 @@ use hyper::{header, Request}; pub(super) fn apply_upstream_options_to_request_line(req: &mut Request, upstream: &UpstreamGroup) -> Result<()> { for opt in upstream.opts.iter() { match opt { - UpstreamOption::ConvertHttpsTo11 => *req.version_mut() = hyper::Version::HTTP_11, - UpstreamOption::ConvertHttpsTo2 => *req.version_mut() = hyper::Version::HTTP_2, + UpstreamOption::ForceHttp11Upstream => *req.version_mut() = hyper::Version::HTTP_11, + UpstreamOption::ForceHttp2Upstream => { + // case: h2c -> https://www.rfc-editor.org/rfc/rfc9113.txt + // Upgrade from HTTP/1.1 to HTTP/2 is deprecated. So, http-2 prior knowledge is required. + *req.version_mut() = hyper::Version::HTTP_2; + } _ => (), } } diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index c472b05..fd242c5 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -8,9 +8,14 @@ mod log; mod proxy; mod utils; -use crate::{error::*, globals::Globals, handler::HttpMessageHandlerBuilder, log::*, proxy::ProxyBuilder}; +use crate::{ + error::*, + globals::Globals, + handler::{Forwarder, HttpMessageHandlerBuilder}, + log::*, + proxy::ProxyBuilder, +}; use futures::future::select_all; -use hyper::Client; // use hyper_trust_dns::TrustDnsResolver; use std::sync::Arc; @@ -31,6 +36,7 @@ pub async fn entrypoint( proxy_config: &ProxyConfig, app_config_list: &AppConfigList, runtime_handle: &tokio::runtime::Handle, + term_notify: Option>, ) -> Result<()> where T: CryptoSource + Clone + Send + Sync + 'static, @@ -54,6 +60,15 @@ where if !proxy_config.sni_consistency { info!("Ignore consistency between TLS SNI and Host header (or Request line). Note it violates RFC."); } + #[cfg(feature = "cache")] + if proxy_config.cache_enabled { + info!( + "Cache is enabled: cache dir = {:?}", + proxy_config.cache_dir.as_ref().unwrap() + ); + } else { + info!("Cache is disabled") + } // build global let globals = Arc::new(Globals { @@ -62,18 +77,14 @@ where request_count: Default::default(), runtime_handle: runtime_handle.clone(), }); - // let connector = TrustDnsResolver::default().into_rustls_webpki_https_connector(); - let connector = hyper_rustls::HttpsConnectorBuilder::new() - .with_webpki_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - let msg_handler = HttpMessageHandlerBuilder::default() - .forwarder(Arc::new(Client::builder().build::<_, hyper::Body>(connector))) - .globals(globals.clone()) - .build()?; + // build message handler including a request forwarder + let msg_handler = Arc::new( + HttpMessageHandlerBuilder::default() + .forwarder(Arc::new(Forwarder::new(&globals).await)) + .globals(globals.clone()) + .build()?, + ); let addresses = globals.proxy_config.listen_sockets.clone(); let futures = select_all(addresses.into_iter().map(|addr| { @@ -90,7 +101,7 @@ where .build() .unwrap(); - globals.runtime_handle.spawn(proxy.start()) + globals.runtime_handle.spawn(proxy.start(term_notify.clone())) })); // wait for all future diff --git a/rpxy-lib/src/log.rs b/rpxy-lib/src/log.rs index 0fb7812..6b8afbe 100644 --- a/rpxy-lib/src/log.rs +++ b/rpxy-lib/src/log.rs @@ -1,4 +1,5 @@ use crate::utils::ToCanonical; +use hyper::header; use std::net::SocketAddr; pub use tracing::{debug, error, info, warn}; @@ -20,7 +21,7 @@ pub struct MessageLog { impl From<&hyper::Request> for MessageLog { fn from(req: &hyper::Request) -> Self { - let header_mapper = |v: hyper::header::HeaderName| { + let header_mapper = |v: header::HeaderName| { req .headers() .get(v) @@ -31,7 +32,7 @@ impl From<&hyper::Request> for MessageLog { // tls_server_name: "".to_string(), client_addr: "".to_string(), method: req.method().to_string(), - host: header_mapper(hyper::header::HOST), + host: header_mapper(header::HOST), p_and_q: req .uri() .path_and_query() @@ -40,8 +41,8 @@ impl From<&hyper::Request> for MessageLog { version: req.version(), uri_scheme: req.uri().scheme_str().unwrap_or("").to_string(), uri_host: req.uri().host().unwrap_or("").to_string(), - ua: header_mapper(hyper::header::USER_AGENT), - xff: header_mapper(hyper::header::HeaderName::from_static("x-forwarded-for")), + ua: header_mapper(header::USER_AGENT), + xff: header_mapper(header::HeaderName::from_static("x-forwarded-for")), status: "".to_string(), upstream: "".to_string(), } @@ -61,7 +62,7 @@ impl MessageLog { self.status = status_code.to_string(); self } - pub fn xff(&mut self, xff: &Option<&hyper::header::HeaderValue>) -> &mut Self { + pub fn xff(&mut self, xff: &Option<&header::HeaderValue>) -> &mut Self { self.xff = xff.map_or_else(|| "", |v| v.to_str().unwrap_or("")).to_string(); self } diff --git a/rpxy-lib/src/proxy/crypto_service.rs b/rpxy-lib/src/proxy/crypto_service.rs index d6191e6..ae0f993 100644 --- a/rpxy-lib/src/proxy/crypto_service.rs +++ b/rpxy-lib/src/proxy/crypto_service.rs @@ -115,7 +115,7 @@ impl ServerCryptoBase { // add client certificate if specified match certs_and_keys.parse_client_ca_certs() { Ok((owned_trust_anchors, _subject_key_ids)) => { - client_ca_roots_local.add_server_trust_anchors(owned_trust_anchors.into_iter()); + client_ca_roots_local.add_trust_anchors(owned_trust_anchors.into_iter()); } Err(e) => { warn!( diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index 7773ad9..fd07521 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -15,7 +15,7 @@ where U: CryptoSource + Clone + Sync + Send + 'static, { pub(super) async fn connection_serve_h3( - self, + &self, quic_connection: C, tls_server_name: ServerNameBytesExp, client_addr: SocketAddr, @@ -79,7 +79,7 @@ where } async fn stream_serve_h3( - self, + &self, req: Request<()>, stream: RequestStream, client_addr: SocketAddr, diff --git a/rpxy-lib/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs index 166f048..bd52ea9 100644 --- a/rpxy-lib/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -8,6 +8,7 @@ use std::{net::SocketAddr, sync::Arc}; use tokio::{ io::{AsyncRead, AsyncWrite}, runtime::Handle, + sync::Notify, time::{timeout, Duration}, }; @@ -40,7 +41,7 @@ where { pub listening_on: SocketAddr, pub tls_enabled: bool, // TCP待受がTLSかどうか - pub msg_handler: HttpMessageHandler, + pub msg_handler: Arc>, pub globals: Arc>, } @@ -49,6 +50,21 @@ where T: Connect + Clone + Sync + Send + 'static, U: CryptoSource + Clone + Sync + Send, { + /// Wrapper function to handle request + async fn serve( + handler: Arc>, + req: Request, + client_addr: SocketAddr, + listen_addr: SocketAddr, + tls_enabled: bool, + tls_server_name: Option, + ) -> Result> { + handler + .handle_request(req, client_addr, listen_addr, tls_enabled, tls_server_name) + .await + } + + /// Serves requests from clients pub(super) fn client_serve( self, stream: I, @@ -72,7 +88,8 @@ where .serve_connection( stream, service_fn(move |req: Request| { - self.msg_handler.clone().handle_request( + Self::serve( + self.msg_handler.clone(), req, peer_addr, self.listening_on, @@ -91,11 +108,11 @@ where }); } + /// Start without TLS (HTTP cleartext) async fn start_without_tls(self, server: Http) -> Result<()> { let listener_service = async { let tcp_socket = bind_tcp_socket(&self.listening_on)?; let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?; - // let tcp_listener = TcpListener::bind(&self.listening_on).await?; info!("Start TCP proxy serving with HTTP request for configured host names"); while let Ok((stream, _client_addr)) = tcp_listener.accept().await { self.clone().client_serve(stream, server.clone(), _client_addr, None); @@ -106,7 +123,8 @@ where Ok(()) } - pub async fn start(self) -> Result<()> { + /// Entrypoint for HTTP/1.1 and HTTP/2 servers + pub async fn start(self, term_notify: Option>) -> Result<()> { let mut server = Http::new(); server.http1_keep_alive(self.globals.proxy_config.keepalive); server.http2_max_concurrent_streams(self.globals.proxy_config.max_concurrent_streams); @@ -114,12 +132,35 @@ where let executor = LocalExecutor::new(self.globals.runtime_handle.clone()); let server = server.with_executor(executor); - if self.tls_enabled { - self.start_with_tls(server).await?; - } else { - self.start_without_tls(server).await?; + let listening_on = self.listening_on; + + let proxy_service = async { + if self.tls_enabled { + self.start_with_tls(server).await + } else { + self.start_without_tls(server).await + } + }; + + match term_notify { + Some(term) => { + tokio::select! { + _ = proxy_service => { + warn!("Proxy service got down"); + } + _ = term.notified() => { + info!("Proxy service listening on {} receives term signal", listening_on); + } + } + } + None => { + proxy_service.await?; + warn!("Proxy service got down"); + } } + // proxy_service.await?; + Ok(()) } } diff --git a/rpxy-lib/src/proxy/socket.rs b/rpxy-lib/src/proxy/socket.rs index a8b9f01..9e4c8f9 100644 --- a/rpxy-lib/src/proxy/socket.rs +++ b/rpxy-lib/src/proxy/socket.rs @@ -32,7 +32,7 @@ pub(super) fn bind_udp_socket(listening_on: &SocketAddr) -> Result { } else { Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) }?; - // socket.set_reuse_address(true)?; // This isn't necessary + socket.set_reuse_address(true)?; // This isn't necessary? socket.set_reuse_port(true)?; if let Err(e) = socket.bind(&(*listening_on).into()) { diff --git a/s2n-quic b/s2n-quic deleted file mode 160000 index 8ef0a6b..0000000 --- a/s2n-quic +++ /dev/null @@ -1 +0,0 @@ -Subproject commit 8ef0a6b66a856dc9f34ce18159c617ac29154cc7 diff --git a/h3 b/submodules/h3 similarity index 100% rename from h3 rename to submodules/h3 diff --git a/h3-quinn/Cargo.toml b/submodules/h3-quinn/Cargo.toml similarity index 100% rename from h3-quinn/Cargo.toml rename to submodules/h3-quinn/Cargo.toml diff --git a/h3-quinn/src/lib.rs b/submodules/h3-quinn/src/lib.rs similarity index 100% rename from h3-quinn/src/lib.rs rename to submodules/h3-quinn/src/lib.rs diff --git a/submodules/quinn b/submodules/quinn new file mode 160000 index 0000000..7f26029 --- /dev/null +++ b/submodules/quinn @@ -0,0 +1 @@ +Subproject commit 7f260292848a93d615eb43e6e88114a97e64daf1 diff --git a/submodules/rusty-http-cache-semantics b/submodules/rusty-http-cache-semantics new file mode 160000 index 0000000..3cd0917 --- /dev/null +++ b/submodules/rusty-http-cache-semantics @@ -0,0 +1 @@ +Subproject commit 3cd09170305753309d86e88b9427827cca0de0dd diff --git a/submodules/s2n-quic b/submodules/s2n-quic new file mode 160000 index 0000000..e6402b7 --- /dev/null +++ b/submodules/s2n-quic @@ -0,0 +1 @@ +Subproject commit e6402b7f8649bc9d90b69aedc83c387b0372bc94