diff --git a/.github/workflows/docker_build_push.yml b/.github/workflows/docker_build_push_amd64.yml similarity index 93% rename from .github/workflows/docker_build_push.yml rename to .github/workflows/docker_build_push_amd64.yml index 305aa45..fa016a0 100644 --- a/.github/workflows/docker_build_push.yml +++ b/.github/workflows/docker_build_push_amd64.yml @@ -1,4 +1,4 @@ -name: Build and Publish Docker +name: Build and Publish Docker x86_64 on: push: @@ -39,7 +39,7 @@ jobs: tags: | ${{ secrets.DOCKERHUB_USERNAME }}/${{ env.IMAGE_NAME }}:latest file: ./docker/Dockerfile - platforms: linux/amd64,linux/arm64 + platforms: linux/amd64 - name: Release build and push x86_64-slim if: ${{ env.BRANCH == 'main' }} @@ -61,4 +61,4 @@ jobs: tags: | ${{ secrets.DOCKERHUB_USERNAME }}/${{ env.IMAGE_NAME }}:nightly file: ./docker/Dockerfile - platforms: linux/amd64,linux/arm64 + platforms: linux/amd64 diff --git a/.github/workflows/docker_build_push_arm64.yml b/.github/workflows/docker_build_push_arm64.yml new file mode 100644 index 0000000..b31581c --- /dev/null +++ b/.github/workflows/docker_build_push_arm64.yml @@ -0,0 +1,64 @@ +name: Build and Publish Docker Aarch64 + +on: + push: + branches: + - main + - develop + +jobs: + build_and_push: + runs-on: ubuntu-latest + env: + IMAGE_NAME: rpxy + + steps: + - name: Checkout + uses: actions/checkout@v3 + with: + submodules: recursive + + - name: GitHub Environment + run: echo "BRANCH=${GITHUB_REF##*/}" >> $GITHUB_ENV + + - name: Set up Docker Buildx + uses: docker/setup-buildx-action@v2 + + - name: Login to Docker Hub + uses: docker/login-action@v2 + with: + username: ${{ secrets.DOCKERHUB_USERNAME }} + password: ${{ secrets.DOCKERHUB_TOKEN }} + + - name: Release build and push + if: ${{ env.BRANCH == 'main' }} + uses: docker/build-push-action@v4 + with: + context: . + push: true + tags: | + ${{ secrets.DOCKERHUB_USERNAME }}/${{ env.IMAGE_NAME }}:latest + file: ./docker/Dockerfile + platforms: linux/arm64 + + - name: Release build and push x86_64-slim + if: ${{ env.BRANCH == 'main' }} + uses: docker/build-push-action@v4 + with: + context: . + push: true + tags: | + ${{ secrets.DOCKERHUB_USERNAME }}/${{ env.IMAGE_NAME }}:slim, ${{ secrets.DOCKERHUB_USERNAME }}/${{ env.IMAGE_NAME }}:latest-slim + file: ./docker/Dockerfile.arm64-slim + platforms: linux/arm64 + + - name: Nightly build and push + if: ${{ env.BRANCH == 'develop' }} + uses: docker/build-push-action@v4 + with: + context: . + push: true + tags: | + ${{ secrets.DOCKERHUB_USERNAME }}/${{ env.IMAGE_NAME }}:nightly + file: ./docker/Dockerfile + platforms: linux/arm64 diff --git a/CHANGELOG.md b/CHANGELOG.md index 8dea263..3a11094 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,18 @@ ## 0.4.0 (unreleased) +### Improvement + +- Feat: Continuous watching on a specified config file and hot-reloading the file when updated +- Feat: Enabled to specify TCP listen backlog in the config file +- Feat: Add a GitHub action to build `arm64` docker image. +- Bench: Add benchmark result on `amd64` architecture. +- Refactor: Split `rpxy` into `rpxy-lib` and `rpxy-bin` +- Refactor: lots of minor improvements + +### Bugfix + +- Fix bug to apply default backend application ## 0.3.0 diff --git a/Cargo.toml b/Cargo.toml index 5f7b0ea..64d1414 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,88 +1,7 @@ -[package] -name = "rpxy" -version = "0.3.0" -authors = ["Jun Kurihara"] -homepage = "https://github.com/junkurihara/rust-rpxy" -repository = "https://github.com/junkurihara/rust-rpxy" -license = "MIT" -readme = "README.md" -edition = "2021" -publish = false - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[features] -default = ["http3", "sticky-cookie"] -http3 = ["quinn", "h3", "h3-quinn"] -sticky-cookie = ["base64", "sha2", "chrono"] - -[dependencies] -anyhow = "1.0.71" -clap = { version = "4.3.11", features = ["std", "cargo", "wrap_help"] } -rand = "0.8.5" -toml = { version = "0.7.6", default-features = false, features = ["parse"] } -rustc-hash = "1.1.0" -serde = { version = "1.0.171", default-features = false, features = ["derive"] } -bytes = "1.4.0" -thiserror = "1.0.43" -x509-parser = "0.15.0" -derive_builder = "0.12.0" -futures = { version = "0.3.28", features = ["alloc", "async-await"] } -tokio = { version = "1.29.1", default-features = false, features = [ - "net", - "rt-multi-thread", - "time", - "sync", - "macros", -] } -async-trait = "0.1.71" -hot_reload = "0.1.2" # reloading certs - -# http and tls -hyper = { version = "0.14.27", default-features = false, features = [ - "server", - "http1", - "http2", - "stream", -] } -hyper-rustls = { version = "0.24.1", default-features = false, features = [ - "tokio-runtime", - "webpki-tokio", - "http1", - "http2", -] } -tokio-rustls = { version = "0.24.1", features = ["early-data"] } -rustls-pemfile = "1.0.3" -rustls = { version = "0.21.3", default-features = false } -webpki = "0.22.0" - -# logging -tracing = { version = "0.1.37" } -tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } - -# http/3 -# quinn = { version = "0.9.3", optional = true } -quinn = { path = "./quinn/quinn", optional = true } # Tentative to support rustls-0.21 -h3 = { path = "./h3/h3/", optional = true } -# h3-quinn = { path = "./h3/h3-quinn/", optional = true } -h3-quinn = { path = "./h3-quinn/", optional = true } # Tentative to support rustls-0.21 - -# cookie handling for sticky cookie -chrono = { version = "0.4.26", default-features = false, features = [ - "unstable-locales", - "alloc", - "clock", -], optional = true } -base64 = { version = "0.21.2", optional = true } -sha2 = { version = "0.10.7", default-features = false, optional = true } - - -[target.'cfg(not(target_env = "msvc"))'.dependencies] -tikv-jemallocator = "0.5.0" - - -[dev-dependencies] +[workspace] +members = ["rpxy-bin", "rpxy-lib"] +exclude = ["quinn", "h3-quinn", "h3"] [profile.release] codegen-units = 1 diff --git a/README.md b/README.md index 561141a..a3d8795 100644 --- a/README.md +++ b/README.md @@ -48,6 +48,20 @@ You can run `rpxy` with a configuration file like % ./target/release/rpxy --config config.toml ``` +If you specify `-w` option along with the config file path, `rpxy` tracks the change of `config.toml` in the real-time manner and apply the change immediately without restarting the process. + +The full help messages are given follows. + +```bash: +usage: rpxy [OPTIONS] --config + +Options: + -c, --config Configuration file path like ./config.toml + -w, --watch Activate dynamic reloading of the config file via continuous monitoring + -h, --help Print help + -V, --version Print version +``` + That's all! ## Basic Configuration @@ -217,7 +231,7 @@ Since it is currently a work-in-progress project, we are frequently adding new o ## Using Docker Image -You can also use [docker image](https://hub.docker.com/r/jqtype/rpxy) instead of directly executing the binary. There are only two docker-specific environment variables. +You can also use [docker image](https://hub.docker.com/r/jqtype/rpxy) instead of directly executing the binary. There are only several docker-specific environment variables. - `HOST_USER` (default: `user`): User name executing `rpxy` inside the container. - `HOST_UID` (default: `900`): `UID` of `HOST_USER`. diff --git a/TODO.md b/TODO.md index 81af1d5..3425d11 100644 --- a/TODO.md +++ b/TODO.md @@ -10,7 +10,8 @@ - upstream/upstream group: information on targeted destinations for each set of (a domain + a path) - load-balance: load balancing mod for a domain + path - - Split `rpxy` source codes into `rpxy-lib` and `rpxy-bin` to make the core part (reverse proxy) isolated from the misc part like toml file loader. This is in order to make the configuration-related part more flexible (related to [#33](https://github.com/junkurihara/rust-rpxy/issues/33)) + - **Almost done in version 0.4.0**: + Split `rpxy` source codes into `rpxy-lib` and `rpxy-bin` to make the core part (reverse proxy) isolated from the misc part like toml file loader. This is in order to make the configuration-related part more flexible (related to [#33](https://github.com/junkurihara/rust-rpxy/issues/33)) - Cache option for the response with `Cache-Control: public` header directive ([#55](https://github.com/junkurihara/rust-rpxy/issues/55)) - Consideration on migrating from `quinn` and `h3-quinn` to other QUIC implementations ([#57](https://github.com/junkurihara/rust-rpxy/issues/57)) diff --git a/config-example.toml b/config-example.toml index 0382393..605067c 100644 --- a/config-example.toml +++ b/config-example.toml @@ -10,6 +10,9 @@ listen_port = 8080 listen_port_tls = 8443 +# Optional for h2 and http1.1 +tcp_listen_backlog = 1024 + # Optional for h2 and http1.1 max_concurrent_streams = 100 diff --git a/docker/Dockerfile.arm64-slim b/docker/Dockerfile.arm64-slim new file mode 100644 index 0000000..83b2d16 --- /dev/null +++ b/docker/Dockerfile.arm64-slim @@ -0,0 +1,45 @@ +######################################## +FROM messense/rust-musl-cross:aarch64-musl as builder + +ENV TARGET_DIR=aarch64-unknown-linux-musl +ENV CFLAGS=-Ofast + +WORKDIR /tmp + +COPY . /tmp/ + +ENV RUSTFLAGS "-C link-arg=-s" + +RUN echo "Building rpxy from source" && \ + cargo build --release && \ + musl-strip --strip-all /tmp/target/${TARGET_DIR}/release/rpxy + +######################################## +FROM alpine:latest as runner +LABEL maintainer="Jun Kurihara" + +ENV TARGET_DIR=aarch64-unknown-linux-musl +ENV RUNTIME_DEPS logrotate ca-certificates su-exec + +RUN apk add --no-cache ${RUNTIME_DEPS} && \ + update-ca-certificates && \ + find / -type d -path /proc -prune -o -type f -perm /u+s -exec chmod u-s {} \; && \ + find / -type d -path /proc -prune -o -type f -perm /g+s -exec chmod g-s {} \; && \ + mkdir -p /rpxy/bin &&\ + mkdir -p /rpxy/log + +COPY --from=builder /tmp/target/${TARGET_DIR}/release/rpxy /rpxy/bin/rpxy +COPY ./docker/run.sh /rpxy +COPY ./docker/entrypoint.sh /rpxy + +RUN chmod +x /rpxy/run.sh && \ + chmod +x /rpxy/entrypoint.sh + +ENV SSL_CERT_FILE=/etc/ssl/certs/ca-certificates.crt +ENV SSL_CERT_DIR=/etc/ssl/certs + +EXPOSE 80 443 + +CMD ["/rpxy/entrypoint.sh"] + +ENTRYPOINT ["/rpxy/entrypoint.sh"] diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml new file mode 100644 index 0000000..f4bdc68 --- /dev/null +++ b/rpxy-bin/Cargo.toml @@ -0,0 +1,49 @@ +[package] +name = "rpxy" +version = "0.4.0" +authors = ["Jun Kurihara"] +homepage = "https://github.com/junkurihara/rust-rpxy" +repository = "https://github.com/junkurihara/rust-rpxy" +license = "MIT" +readme = "README.md" +edition = "2021" +publish = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +default = ["http3"] +http3 = [] + +[dependencies] +rpxy-lib = { path = "../rpxy-lib/", features = ["http3", "sticky-cookie"] } + +anyhow = "1.0.72" +rustc-hash = "1.1.0" +serde = { version = "1.0.174", default-features = false, features = ["derive"] } +derive_builder = "0.12.0" +tokio = { version = "1.29.1", default-features = false, features = [ + "net", + "rt-multi-thread", + "time", + "sync", + "macros", +] } +async-trait = "0.1.72" +rustls-pemfile = "1.0.3" + +# config +clap = { version = "4.3.19", features = ["std", "cargo", "wrap_help"] } +toml = { version = "0.7.6", default-features = false, features = ["parse"] } +hot_reload = "0.1.4" + +# logging +tracing = { version = "0.1.37" } +tracing-subscriber = { version = "0.3.17", features = ["env-filter"] } + + +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.5.0" + + +[dev-dependencies] diff --git a/src/cert_file_reader.rs b/rpxy-bin/src/cert_file_reader.rs similarity index 92% rename from src/cert_file_reader.rs rename to rpxy-bin/src/cert_file_reader.rs index e25dcf7..0a6a14f 100644 --- a/src/cert_file_reader.rs +++ b/rpxy-bin/src/cert_file_reader.rs @@ -1,10 +1,10 @@ -use crate::{ - certs::{CertsAndKeys, CryptoSource}, - log::*, -}; +use crate::log::*; use async_trait::async_trait; use derive_builder::Builder; -use rustls::{Certificate, PrivateKey}; +use rpxy_lib::{ + reexports::{Certificate, PrivateKey}, + CertsAndKeys, CryptoSource, +}; use std::{ fs::File, io::{self, BufReader, Cursor, Read}, @@ -150,8 +150,8 @@ mod tests { use super::*; #[tokio::test] async fn read_server_crt_key_files() { - let tls_cert_path = "example-certs/server.crt"; - let tls_cert_key_path = "example-certs/server.key"; + let tls_cert_path = "../example-certs/server.crt"; + let tls_cert_key_path = "../example-certs/server.key"; let crypto_file_source = CryptoFileSourceBuilder::default() .tls_cert_key_path(tls_cert_key_path) .tls_cert_path(tls_cert_path) @@ -165,9 +165,9 @@ mod tests { #[tokio::test] async fn read_server_crt_key_files_with_client_ca_crt() { - let tls_cert_path = "example-certs/server.crt"; - let tls_cert_key_path = "example-certs/server.key"; - let client_ca_cert_path = Some("example-certs/client.ca.crt".to_string()); + let tls_cert_path = "../example-certs/server.crt"; + let tls_cert_key_path = "../example-certs/server.key"; + let client_ca_cert_path = Some("../example-certs/client.ca.crt".to_string()); let crypto_file_source = CryptoFileSourceBuilder::default() .tls_cert_key_path(tls_cert_key_path) .tls_cert_path(tls_cert_path) diff --git a/rpxy-bin/src/config/mod.rs b/rpxy-bin/src/config/mod.rs new file mode 100644 index 0000000..09ec2b9 --- /dev/null +++ b/rpxy-bin/src/config/mod.rs @@ -0,0 +1,9 @@ +mod parse; +mod service; +mod toml; + +pub use { + self::toml::ConfigToml, + parse::{build_settings, parse_opts}, + service::ConfigTomlReloader, +}; diff --git a/rpxy-bin/src/config/parse.rs b/rpxy-bin/src/config/parse.rs new file mode 100644 index 0000000..15ff240 --- /dev/null +++ b/rpxy-bin/src/config/parse.rs @@ -0,0 +1,97 @@ +use super::toml::ConfigToml; +use crate::{ + cert_file_reader::CryptoFileSource, + error::{anyhow, ensure}, +}; +use clap::{Arg, ArgAction}; +use rpxy_lib::{AppConfig, AppConfigList, ProxyConfig}; + +/// Parsed options +pub struct Opts { + pub config_file_path: String, + pub watch: bool, +} + +/// Parse arg values passed from cli +pub fn parse_opts() -> Result { + let _ = include_str!("../../Cargo.toml"); + let options = clap::command!() + .arg( + Arg::new("config_file") + .long("config") + .short('c') + .value_name("FILE") + .required(true) + .help("Configuration file path like ./config.toml"), + ) + .arg( + Arg::new("watch") + .long("watch") + .short('w') + .action(ArgAction::SetTrue) + .help("Activate dynamic reloading of the config file via continuous monitoring"), + ); + let matches = options.get_matches(); + + /////////////////////////////////// + let config_file_path = matches.get_one::("config_file").unwrap().to_owned(); + let watch = matches.get_one::("watch").unwrap().to_owned(); + + Ok(Opts { + config_file_path, + watch, + }) +} + +pub fn build_settings( + config: &ConfigToml, +) -> std::result::Result<(ProxyConfig, AppConfigList), anyhow::Error> { + /////////////////////////////////// + // build proxy config + let proxy_config: ProxyConfig = config.try_into()?; + + /////////////////////////////////// + // backend_apps + let apps = config.apps.clone().ok_or(anyhow!("Missing application spec"))?; + + // assertions for all backend apps + ensure!(!apps.0.is_empty(), "Wrong application spec."); + // if only https_port is specified, tls must be configured for all apps + if proxy_config.http_port.is_none() { + ensure!( + apps.0.iter().all(|(_, app)| app.tls.is_some()), + "Some apps serves only plaintext HTTP" + ); + } + // https redirection can be configured if both ports are active + if !(proxy_config.https_port.is_some() && proxy_config.http_port.is_some()) { + ensure!( + apps.0.iter().all(|(_, app)| { + if let Some(tls) = app.tls.as_ref() { + tls.https_redirection.is_none() + } else { + true + } + }), + "https_redirection can be specified only when both http_port and https_port are specified" + ); + } + + // build applications + let mut app_config_list_inner = Vec::>::new(); + + // let mut backends = Backends::new(); + for (app_name, app) in apps.0.iter() { + let _server_name_string = app.server_name.as_ref().ok_or(anyhow!("No server name"))?; + let registered_app_name = app_name.to_ascii_lowercase(); + let app_config = app.build_app_config(®istered_app_name)?; + app_config_list_inner.push(app_config); + } + + let app_config_list = AppConfigList { + inner: app_config_list_inner, + default_app: config.default_app.clone().map(|v| v.to_ascii_lowercase()), // default backend application for plaintext http requests + }; + + Ok((proxy_config, app_config_list)) +} diff --git a/rpxy-bin/src/config/service.rs b/rpxy-bin/src/config/service.rs new file mode 100644 index 0000000..8769b96 --- /dev/null +++ b/rpxy-bin/src/config/service.rs @@ -0,0 +1,24 @@ +use super::toml::ConfigToml; +use async_trait::async_trait; +use hot_reload::{Reload, ReloaderError}; + +#[derive(Clone)] +pub struct ConfigTomlReloader { + pub config_path: String, +} + +#[async_trait] +impl Reload for ConfigTomlReloader { + type Source = String; + async fn new(source: &Self::Source) -> Result> { + Ok(Self { + config_path: source.clone(), + }) + } + + async fn reload(&self) -> Result, ReloaderError> { + let conf = ConfigToml::new(&self.config_path) + .map_err(|_e| ReloaderError::::Reload("Failed to reload config toml"))?; + Ok(Some(conf)) + } +} diff --git a/src/config/toml.rs b/rpxy-bin/src/config/toml.rs similarity index 64% rename from src/config/toml.rs rename to rpxy-bin/src/config/toml.rs index f33ea4d..84260c0 100644 --- a/src/config/toml.rs +++ b/rpxy-bin/src/config/toml.rs @@ -1,20 +1,19 @@ use crate::{ - backend::{Backend, BackendBuilder, ReverseProxy, Upstream, UpstreamGroup, UpstreamGroupBuilder, UpstreamOption}, cert_file_reader::{CryptoFileSource, CryptoFileSourceBuilder}, constants::*, error::{anyhow, ensure}, - globals::ProxyConfig, - utils::PathNameBytesExp, }; +use rpxy_lib::{reexports::Uri, AppConfig, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri}; use rustc_hash::FxHashMap as HashMap; use serde::Deserialize; use std::{fs, net::SocketAddr}; -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct ConfigToml { pub listen_port: Option, pub listen_port_tls: Option, pub listen_ipv6: Option, + pub tcp_listen_backlog: Option, pub max_concurrent_streams: Option, pub max_clients: Option, pub apps: Option, @@ -23,7 +22,7 @@ pub struct ConfigToml { } #[cfg(feature = "http3")] -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct Http3Option { pub alt_svc_max_age: Option, pub request_max_body_size: Option, @@ -33,24 +32,24 @@ pub struct Http3Option { pub max_idle_timeout: Option, } -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct Experimental { #[cfg(feature = "http3")] pub h3: Option, pub ignore_sni_consistency: Option, } -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct Apps(pub HashMap); -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct Application { pub server_name: Option, pub reverse_proxy: Option>, pub tls: Option, } -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct TlsOption { pub tls_cert_path: Option, pub tls_cert_key_path: Option, @@ -58,7 +57,7 @@ pub struct TlsOption { pub client_ca_cert_path: Option, } -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct ReverseProxyOption { pub path: Option, pub replace_path: Option, @@ -67,7 +66,7 @@ pub struct ReverseProxyOption { pub load_balance: Option, } -#[derive(Deserialize, Debug, Default)] +#[derive(Deserialize, Debug, Default, PartialEq, Eq, Clone)] pub struct UpstreamParams { pub location: String, pub tls: Option, @@ -114,6 +113,11 @@ impl TryInto for &ConfigToml { }) .collect(); + // tcp backlog + if let Some(backlog) = self.tcp_listen_backlog { + proxy_config.tcp_listen_backlog = backlog; + } + // max values if let Some(c) = self.max_clients { proxy_config.max_clients = c as usize; @@ -147,8 +151,7 @@ impl TryInto for &ConfigToml { if x == 0u64 { proxy_config.h3_max_idle_timeout = None; } else { - proxy_config.h3_max_idle_timeout = - Some(quinn::IdleTimeout::try_from(tokio::time::Duration::from_secs(x)).unwrap()) + proxy_config.h3_max_idle_timeout = Some(tokio::time::Duration::from_secs(x)) } } } @@ -171,101 +174,86 @@ impl ConfigToml { } } -impl TryInto> for &Application { - type Error = anyhow::Error; - - fn try_into(self) -> std::result::Result, Self::Error> { +impl Application { + pub fn build_app_config(&self, app_name: &str) -> std::result::Result, anyhow::Error> { let server_name_string = self.server_name.as_ref().ok_or(anyhow!("Missing server_name"))?; - // backend builder - let mut backend_builder = BackendBuilder::default(); // reverse proxy settings - let reverse_proxy = self.try_into()?; + let reverse_proxy_config: Vec = self.try_into()?; - backend_builder - .app_name(server_name_string) - .server_name(server_name_string) - .reverse_proxy(reverse_proxy); - - // TLS settings and build backend instance - let backend = if self.tls.is_none() { - backend_builder.build()? - } else { + // tls settings + let tls_config = if self.tls.is_some() { let tls = self.tls.as_ref().unwrap(); ensure!(tls.tls_cert_key_path.is_some() && tls.tls_cert_path.is_some()); - - let https_redirection = if tls.https_redirection.is_none() { - Some(true) // Default true - } else { - tls.https_redirection - }; - - let crypto_source = CryptoFileSourceBuilder::default() + let inner = CryptoFileSourceBuilder::default() .tls_cert_path(tls.tls_cert_path.as_ref().unwrap()) .tls_cert_key_path(tls.tls_cert_key_path.as_ref().unwrap()) .client_ca_cert_path(&tls.client_ca_cert_path) .build()?; - backend_builder - .https_redirection(https_redirection) - .crypto_source(Some(crypto_source)) - .build()? + let https_redirection = if tls.https_redirection.is_none() { + true // Default true + } else { + tls.https_redirection.unwrap() + }; + + Some(TlsConfig { + inner, + https_redirection, + }) + } else { + None }; - Ok(backend) + + Ok(AppConfig { + app_name: app_name.to_owned(), + server_name: server_name_string.to_owned(), + reverse_proxy: reverse_proxy_config, + tls: tls_config, + }) } } -impl TryInto for &Application { +impl TryInto> for &Application { type Error = anyhow::Error; - fn try_into(self) -> std::result::Result { - let server_name_string = self.server_name.as_ref().ok_or(anyhow!("Missing server_name"))?; + fn try_into(self) -> std::result::Result, Self::Error> { + let _server_name_string = self.server_name.as_ref().ok_or(anyhow!("Missing server_name"))?; let rp_settings = self.reverse_proxy.as_ref().ok_or(anyhow!("Missing reverse_proxy"))?; - let mut upstream: HashMap = HashMap::default(); + let mut reverse_proxies: Vec = Vec::new(); - rp_settings.iter().for_each(|rpo| { - let upstream_vec: Vec = rpo.upstream.iter().map(|x| x.try_into().unwrap()).collect(); - // let upstream_iter = rpo.upstream.iter().map(|x| x.to_upstream().unwrap()); - // let lb_upstream_num = vec_upstream.len(); - let elem = UpstreamGroupBuilder::default() - .upstream(&upstream_vec) - .path(&rpo.path) - .replace_path(&rpo.replace_path) - .lb(&rpo.load_balance, &upstream_vec, server_name_string, &rpo.path) - .opts(&rpo.upstream_options) - .build() - .unwrap(); + for rpo in rp_settings.iter() { + let upstream_res: Vec> = rpo.upstream.iter().map(|v| v.try_into().ok()).collect(); + if !upstream_res.iter().all(|v| v.is_some()) { + return Err(anyhow!("[{}] Upstream uri is invalid", &_server_name_string)); + } + let upstream = upstream_res.into_iter().map(|v| v.unwrap()).collect(); - upstream.insert(elem.path.clone(), elem); - }); - ensure!( - rp_settings.iter().filter(|rpo| rpo.path.is_none()).count() < 2, - "Multiple default reverse proxy setting" - ); - ensure!( - upstream - .iter() - .all(|(_, elem)| !(elem.opts.contains(&UpstreamOption::ConvertHttpsTo11) - && elem.opts.contains(&UpstreamOption::ConvertHttpsTo2))), - "either one of force_http11 or force_http2 can be enabled" - ); + reverse_proxies.push(ReverseProxyConfig { + path: rpo.path.clone(), + replace_path: rpo.replace_path.clone(), + upstream, + upstream_options: rpo.upstream_options.clone(), + load_balance: rpo.load_balance.clone(), + }) + } - Ok(ReverseProxy { upstream }) + Ok(reverse_proxies) } } -impl TryInto for &UpstreamParams { +impl TryInto for &UpstreamParams { type Error = anyhow::Error; - fn try_into(self) -> std::result::Result { + fn try_into(self) -> std::result::Result { let scheme = match self.tls { Some(true) => "https", _ => "http", }; let location = format!("{}://{}", scheme, self.location); - Ok(Upstream { - uri: location.parse::().map_err(|e| anyhow!("{}", e))?, + Ok(UpstreamUri { + inner: location.parse::().map_err(|e| anyhow!("{}", e))?, }) } } diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs new file mode 100644 index 0000000..323615f --- /dev/null +++ b/rpxy-bin/src/constants.rs @@ -0,0 +1,3 @@ +pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; +pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; +pub const CONFIG_WATCH_DELAY_SECS: u32 = 20; diff --git a/rpxy-bin/src/error.rs b/rpxy-bin/src/error.rs new file mode 100644 index 0000000..b559bce --- /dev/null +++ b/rpxy-bin/src/error.rs @@ -0,0 +1 @@ +pub use anyhow::{anyhow, bail, ensure, Context}; diff --git a/rpxy-bin/src/log.rs b/rpxy-bin/src/log.rs new file mode 100644 index 0000000..3fcf694 --- /dev/null +++ b/rpxy-bin/src/log.rs @@ -0,0 +1,24 @@ +pub use tracing::{debug, error, info, warn}; + +pub fn init_logger() { + use tracing_subscriber::{fmt, prelude::*, EnvFilter}; + + let format_layer = fmt::layer() + .with_line_number(false) + .with_thread_ids(false) + .with_target(false) + .with_thread_names(true) + .with_target(true) + .with_level(true) + .compact(); + + // This limits the logger to emits only rpxy crate + let level_string = std::env::var(EnvFilter::DEFAULT_ENV).unwrap_or_else(|_| "info".to_string()); + let filter_layer = EnvFilter::new(format!("{}={}", env!("CARGO_PKG_NAME"), level_string)); + // let filter_layer = EnvFilter::from_default_env(); + + tracing_subscriber::registry() + .with(format_layer) + .with(filter_layer) + .init(); +} diff --git a/rpxy-bin/src/main.rs b/rpxy-bin/src/main.rs new file mode 100644 index 0000000..8fe00dc --- /dev/null +++ b/rpxy-bin/src/main.rs @@ -0,0 +1,134 @@ +#[cfg(not(target_env = "msvc"))] +use tikv_jemallocator::Jemalloc; + +#[cfg(not(target_env = "msvc"))] +#[global_allocator] +static GLOBAL: Jemalloc = Jemalloc; + +mod cert_file_reader; +mod config; +mod constants; +mod error; +mod log; + +use crate::{ + config::{build_settings, parse_opts, ConfigToml, ConfigTomlReloader}, + constants::CONFIG_WATCH_DELAY_SECS, + log::*, +}; +use hot_reload::{ReloaderReceiver, ReloaderService}; +use rpxy_lib::entrypoint; + +fn main() { + init_logger(); + + let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); + runtime_builder.enable_all(); + runtime_builder.thread_name("rpxy"); + let runtime = runtime_builder.build().unwrap(); + + runtime.block_on(async { + // Initially load options + let Ok(parsed_opts) = parse_opts() else { + error!("Invalid toml file"); + std::process::exit(1); + }; + + if !parsed_opts.watch { + if let Err(e) = rpxy_service_without_watcher(&parsed_opts.config_file_path, runtime.handle().clone()).await { + error!("rpxy service existed: {e}"); + std::process::exit(1); + } + } else { + let (config_service, config_rx) = ReloaderService::::new( + &parsed_opts.config_file_path, + CONFIG_WATCH_DELAY_SECS, + false, + ) + .await + .unwrap(); + + tokio::select! { + Err(e) = config_service.start() => { + error!("config reloader service exited: {e}"); + std::process::exit(1); + } + Err(e) = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => { + error!("rpxy service existed: {e}"); + std::process::exit(1); + } + } + } + }); +} + +async fn rpxy_service_without_watcher( + config_file_path: &str, + runtime_handle: tokio::runtime::Handle, +) -> Result<(), anyhow::Error> { + info!("Start rpxy service"); + let config_toml = match ConfigToml::new(config_file_path) { + Ok(v) => v, + Err(e) => { + error!("Invalid toml file: {e}"); + std::process::exit(1); + } + }; + let (proxy_conf, app_conf) = match build_settings(&config_toml) { + Ok(v) => v, + Err(e) => { + error!("Invalid configuration: {e}"); + return Err(anyhow::anyhow!(e)); + } + }; + entrypoint(&proxy_conf, &app_conf, &runtime_handle) + .await + .map_err(|e| anyhow::anyhow!(e)) +} + +async fn rpxy_service_with_watcher( + mut config_rx: ReloaderReceiver, + runtime_handle: tokio::runtime::Handle, +) -> Result<(), anyhow::Error> { + info!("Start rpxy service with dynamic config reloader"); + // Initial loading + config_rx.changed().await?; + let config_toml = config_rx.borrow().clone().unwrap(); + let (mut proxy_conf, mut app_conf) = match build_settings(&config_toml) { + Ok(v) => v, + Err(e) => { + error!("Invalid configuration: {e}"); + return Err(anyhow::anyhow!(e)); + } + }; + + // Continuous monitoring + loop { + tokio::select! { + _ = entrypoint(&proxy_conf, &app_conf, &runtime_handle) => { + error!("rpxy entrypoint exited"); + break; + } + _ = config_rx.changed() => { + if config_rx.borrow().is_none() { + error!("Something wrong in config reloader receiver"); + break; + } + let config_toml = config_rx.borrow().clone().unwrap(); + match build_settings(&config_toml) { + Ok((p, a)) => { + (proxy_conf, app_conf) = (p, a) + }, + Err(e) => { + error!("Invalid configuration. Configuration does not updated: {e}"); + continue; + } + }; + info!("Configuration updated. Force to re-bind TCP/UDP sockets"); + } + else => break + } + } + + Err(anyhow::anyhow!("rpxy or continuous monitoring service exited")) +} diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml new file mode 100644 index 0000000..e36bae6 --- /dev/null +++ b/rpxy-lib/Cargo.toml @@ -0,0 +1,79 @@ +[package] +name = "rpxy-lib" +version = "0.4.0" +authors = ["Jun Kurihara"] +homepage = "https://github.com/junkurihara/rust-rpxy" +repository = "https://github.com/junkurihara/rust-rpxy" +license = "MIT" +readme = "README.md" +edition = "2021" +publish = false + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[features] +default = ["http3", "sticky-cookie"] +http3 = ["quinn", "h3", "h3-quinn"] +sticky-cookie = ["base64", "sha2", "chrono"] + +[dependencies] +rand = "0.8.5" +rustc-hash = "1.1.0" +bytes = "1.4.0" +derive_builder = "0.12.0" +futures = { version = "0.3.28", features = ["alloc", "async-await"] } +tokio = { version = "1.29.1", default-features = false, features = [ + "net", + "rt-multi-thread", + "time", + "sync", + "macros", +] } +async-trait = "0.1.72" +hot_reload = "0.1.4" # reloading certs + +# Error handling +anyhow = "1.0.72" +thiserror = "1.0.44" + +# http and tls +hyper = { version = "0.14.27", default-features = false, features = [ + "server", + "http1", + "http2", + "stream", +] } +hyper-rustls = { version = "0.24.1", default-features = false, features = [ + "tokio-runtime", + "webpki-tokio", + "http1", + "http2", +] } +tokio-rustls = { version = "0.24.1", features = ["early-data"] } +rustls = { version = "0.21.5", default-features = false } +webpki = "0.22.0" +x509-parser = "0.15.0" + +# logging +tracing = { version = "0.1.37" } + +# http/3 +# quinn = { version = "0.9.3", optional = true } +quinn = { path = "../quinn/quinn", optional = true } # Tentative to support rustls-0.21 +h3 = { path = "../h3/h3/", optional = true } +# h3-quinn = { path = "./h3/h3-quinn/", optional = true } +h3-quinn = { path = "../h3-quinn/", optional = true } # Tentative to support rustls-0.21 +# for UDP socket wit SO_REUSEADDR +socket2 = { version = "0.5.3", features = ["all"] } + +# cookie handling for sticky cookie +chrono = { version = "0.4.26", default-features = false, features = [ + "unstable-locales", + "alloc", + "clock", +], optional = true } +base64 = { version = "0.21.2", optional = true } +sha2 = { version = "0.10.7", default-features = false, optional = true } + + +[dev-dependencies] diff --git a/src/backend/load_balance.rs b/rpxy-lib/src/backend/load_balance.rs similarity index 100% rename from src/backend/load_balance.rs rename to rpxy-lib/src/backend/load_balance.rs diff --git a/src/backend/load_balance_sticky.rs b/rpxy-lib/src/backend/load_balance_sticky.rs similarity index 100% rename from src/backend/load_balance_sticky.rs rename to rpxy-lib/src/backend/load_balance_sticky.rs diff --git a/src/backend/mod.rs b/rpxy-lib/src/backend/mod.rs similarity index 98% rename from src/backend/mod.rs rename to rpxy-lib/src/backend/mod.rs index 524f30b..73c4466 100644 --- a/src/backend/mod.rs +++ b/rpxy-lib/src/backend/mod.rs @@ -67,6 +67,7 @@ impl Backends where T: CryptoSource, { + #[allow(clippy::new_without_default)] pub fn new() -> Self { Backends { apps: HashMap::>::default(), diff --git a/src/backend/sticky_cookie.rs b/rpxy-lib/src/backend/sticky_cookie.rs similarity index 100% rename from src/backend/sticky_cookie.rs rename to rpxy-lib/src/backend/sticky_cookie.rs diff --git a/src/backend/upstream.rs b/rpxy-lib/src/backend/upstream.rs similarity index 100% rename from src/backend/upstream.rs rename to rpxy-lib/src/backend/upstream.rs diff --git a/src/backend/upstream_opts.rs b/rpxy-lib/src/backend/upstream_opts.rs similarity index 100% rename from src/backend/upstream_opts.rs rename to rpxy-lib/src/backend/upstream_opts.rs diff --git a/src/certs.rs b/rpxy-lib/src/certs.rs similarity index 100% rename from src/certs.rs rename to rpxy-lib/src/certs.rs diff --git a/src/constants.rs b/rpxy-lib/src/constants.rs similarity index 87% rename from src/constants.rs rename to rpxy-lib/src/constants.rs index 2ed14d1..9d7fb5e 100644 --- a/src/constants.rs +++ b/rpxy-lib/src/constants.rs @@ -1,5 +1,6 @@ -pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; -pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; +// pub const LISTEN_ADDRESSES_V4: &[&str] = &["0.0.0.0"]; +// pub const LISTEN_ADDRESSES_V6: &[&str] = &["[::]"]; +pub const TCP_LISTEN_BACKLOG: u32 = 1024; // pub const HTTP_LISTEN_PORT: u16 = 8080; // pub const HTTPS_LISTEN_PORT: u16 = 8443; pub const PROXY_TIMEOUT_SEC: u64 = 60; diff --git a/src/error.rs b/rpxy-lib/src/error.rs similarity index 91% rename from src/error.rs rename to rpxy-lib/src/error.rs index 187c993..3407e8a 100644 --- a/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -10,9 +10,15 @@ pub enum RpxyError { #[error("Proxy build error")] ProxyBuild(#[from] crate::proxy::ProxyBuilderError), + #[error("Backend build error")] + BackendBuild(#[from] crate::backend::BackendBuilderError), + #[error("MessageHandler build error")] HandlerBuild(#[from] crate::handler::HttpMessageHandlerBuilderError), + #[error("Config builder error: {0}")] + ConfigBuild(&'static str), + #[error("Http Message Handler Error: {0}")] Handler(&'static str), diff --git a/rpxy-lib/src/globals.rs b/rpxy-lib/src/globals.rs new file mode 100644 index 0000000..44808dd --- /dev/null +++ b/rpxy-lib/src/globals.rs @@ -0,0 +1,298 @@ +use crate::{ + backend::{ + Backend, BackendBuilder, Backends, ReverseProxy, Upstream, UpstreamGroup, UpstreamGroupBuilder, UpstreamOption, + }, + certs::CryptoSource, + constants::*, + error::RpxyError, + log::*, + utils::{BytesName, PathNameBytesExp}, +}; +use rustc_hash::FxHashMap as HashMap; +use std::net::SocketAddr; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; +use tokio::time::Duration; + +/// Global object containing proxy configurations and shared object like counters. +/// But note that in Globals, we do not have Mutex and RwLock. It is indeed, the context shared among async tasks. +pub struct Globals +where + T: CryptoSource, +{ + /// Configuration parameters for proxy transport and request handlers + pub proxy_config: ProxyConfig, // TODO: proxy configはarcに包んでこいつだけ使いまわせばいいように変えていく。backendsも? + + /// Backend application objects to which http request handler forward incoming requests + pub backends: Backends, + + /// Shared context - Counter for serving requests + pub request_count: RequestCount, + + /// Shared context - Async task runtime handler + pub runtime_handle: tokio::runtime::Handle, +} + +/// Configuration parameters for proxy transport and request handlers +#[derive(PartialEq, Eq, Clone)] +pub struct ProxyConfig { + pub listen_sockets: Vec, // when instantiate server + pub http_port: Option, // when instantiate server + pub https_port: Option, // when instantiate server + pub tcp_listen_backlog: u32, // when instantiate server + + pub proxy_timeout: Duration, // when serving requests at Proxy + pub upstream_timeout: Duration, // when serving requests at Handler + + pub max_clients: usize, // when serving requests + pub max_concurrent_streams: u32, // when instantiate server + pub keepalive: bool, // when instantiate server + + // experimentals + pub sni_consistency: bool, // Handler + // All need to make packet acceptor + #[cfg(feature = "http3")] + pub http3: bool, + #[cfg(feature = "http3")] + pub h3_alt_svc_max_age: u32, + #[cfg(feature = "http3")] + pub h3_request_max_body_size: usize, + #[cfg(feature = "http3")] + pub h3_max_concurrent_bidistream: quinn::VarInt, + #[cfg(feature = "http3")] + pub h3_max_concurrent_unistream: quinn::VarInt, + #[cfg(feature = "http3")] + pub h3_max_concurrent_connections: u32, + #[cfg(feature = "http3")] + pub h3_max_idle_timeout: Option, +} + +impl Default for ProxyConfig { + fn default() -> Self { + Self { + listen_sockets: Vec::new(), + http_port: None, + https_port: None, + tcp_listen_backlog: TCP_LISTEN_BACKLOG, + + // TODO: Reconsider each timeout values + proxy_timeout: Duration::from_secs(PROXY_TIMEOUT_SEC), + upstream_timeout: Duration::from_secs(UPSTREAM_TIMEOUT_SEC), + + max_clients: MAX_CLIENTS, + max_concurrent_streams: MAX_CONCURRENT_STREAMS, + keepalive: true, + + sni_consistency: true, + + #[cfg(feature = "http3")] + http3: false, + #[cfg(feature = "http3")] + h3_alt_svc_max_age: H3::ALT_SVC_MAX_AGE, + #[cfg(feature = "http3")] + h3_request_max_body_size: H3::REQUEST_MAX_BODY_SIZE, + #[cfg(feature = "http3")] + h3_max_concurrent_connections: H3::MAX_CONCURRENT_CONNECTIONS, + #[cfg(feature = "http3")] + h3_max_concurrent_bidistream: H3::MAX_CONCURRENT_BIDISTREAM.into(), + #[cfg(feature = "http3")] + h3_max_concurrent_unistream: H3::MAX_CONCURRENT_UNISTREAM.into(), + #[cfg(feature = "http3")] + h3_max_idle_timeout: Some(Duration::from_secs(H3::MAX_IDLE_TIMEOUT)), + } + } +} + +/// Configuration parameters for backend applications +#[derive(PartialEq, Eq, Clone)] +pub struct AppConfigList +where + T: CryptoSource, +{ + pub inner: Vec>, + pub default_app: Option, +} +impl TryInto> for AppConfigList +where + T: CryptoSource + Clone, +{ + type Error = RpxyError; + + fn try_into(self) -> Result, Self::Error> { + let mut backends = Backends::new(); + for app_config in self.inner.iter() { + let backend = app_config.try_into()?; + backends + .apps + .insert(app_config.server_name.clone().to_server_name_vec(), backend); + info!( + "Registering application {} ({})", + &app_config.server_name, &app_config.app_name + ); + } + + // default backend application for plaintext http requests + if let Some(d) = self.default_app { + let d_sn: Vec<&str> = backends + .apps + .iter() + .filter(|(_k, v)| v.app_name == d) + .map(|(_, v)| v.server_name.as_ref()) + .collect(); + if !d_sn.is_empty() { + info!( + "Serving plaintext http for requests to unconfigured server_name by app {} (server_name: {}).", + d, d_sn[0] + ); + backends.default_server_name_bytes = Some(d_sn[0].to_server_name_vec()); + } + } + Ok(backends) + } +} + +/// Configuration parameters for single backend application +#[derive(PartialEq, Eq, Clone)] +pub struct AppConfig +where + T: CryptoSource, +{ + pub app_name: String, + pub server_name: String, + pub reverse_proxy: Vec, + pub tls: Option>, +} +impl TryInto> for &AppConfig +where + T: CryptoSource + Clone, +{ + type Error = RpxyError; + + fn try_into(self) -> Result, Self::Error> { + // backend builder + let mut backend_builder = BackendBuilder::default(); + // reverse proxy settings + let reverse_proxy = self.try_into()?; + + backend_builder + .app_name(self.app_name.clone()) + .server_name(self.server_name.clone()) + .reverse_proxy(reverse_proxy); + + // TLS settings and build backend instance + let backend = if self.tls.is_none() { + backend_builder.build().map_err(RpxyError::BackendBuild)? + } else { + let tls = self.tls.as_ref().unwrap(); + + backend_builder + .https_redirection(Some(tls.https_redirection)) + .crypto_source(Some(tls.inner.clone())) + .build()? + }; + Ok(backend) + } +} +impl TryInto for &AppConfig +where + T: CryptoSource + Clone, +{ + type Error = RpxyError; + + fn try_into(self) -> Result { + let mut upstream: HashMap = HashMap::default(); + + self.reverse_proxy.iter().for_each(|rpo| { + let upstream_vec: Vec = rpo.upstream.iter().map(|x| x.try_into().unwrap()).collect(); + // let upstream_iter = rpo.upstream.iter().map(|x| x.to_upstream().unwrap()); + // let lb_upstream_num = vec_upstream.len(); + let elem = UpstreamGroupBuilder::default() + .upstream(&upstream_vec) + .path(&rpo.path) + .replace_path(&rpo.replace_path) + .lb(&rpo.load_balance, &upstream_vec, &self.server_name, &rpo.path) + .opts(&rpo.upstream_options) + .build() + .unwrap(); + + upstream.insert(elem.path.clone(), elem); + }); + if self.reverse_proxy.iter().filter(|rpo| rpo.path.is_none()).count() >= 2 { + error!("Multiple default reverse proxy setting"); + return Err(RpxyError::ConfigBuild("Invalid reverse proxy setting")); + } + + if !(upstream.iter().all(|(_, elem)| { + !(elem.opts.contains(&UpstreamOption::ConvertHttpsTo11) && elem.opts.contains(&UpstreamOption::ConvertHttpsTo2)) + })) { + error!("Either one of force_http11 or force_http2 can be enabled"); + return Err(RpxyError::ConfigBuild("Invalid upstream option setting")); + } + + Ok(ReverseProxy { upstream }) + } +} + +/// Configuration parameters for single reverse proxy corresponding to the path +#[derive(PartialEq, Eq, Clone)] +pub struct ReverseProxyConfig { + pub path: Option, + pub replace_path: Option, + pub upstream: Vec, + pub upstream_options: Option>, + pub load_balance: Option, +} + +/// Configuration parameters for single upstream destination from a reverse proxy +#[derive(PartialEq, Eq, Clone)] +pub struct UpstreamUri { + pub inner: hyper::Uri, +} +impl TryInto for &UpstreamUri { + type Error = anyhow::Error; + + fn try_into(self) -> std::result::Result { + Ok(Upstream { + uri: self.inner.clone(), + }) + } +} + +/// Configuration parameters on TLS for a single backend application +#[derive(PartialEq, Eq, Clone)] +pub struct TlsConfig +where + T: CryptoSource, +{ + pub inner: T, + pub https_redirection: bool, +} + +#[derive(Debug, Clone, Default)] +/// Counter for serving requests +pub struct RequestCount(Arc); + +impl RequestCount { + pub fn current(&self) -> usize { + self.0.load(Ordering::Relaxed) + } + + pub fn increment(&self) -> usize { + self.0.fetch_add(1, Ordering::Relaxed) + } + + pub fn decrement(&self) -> usize { + let mut count; + while { + count = self.0.load(Ordering::Relaxed); + count > 0 + && self + .0 + .compare_exchange(count, count - 1, Ordering::Relaxed, Ordering::Relaxed) + != Ok(count) + } {} + count + } +} diff --git a/src/handler/handler_main.rs b/rpxy-lib/src/handler/handler_main.rs similarity index 100% rename from src/handler/handler_main.rs rename to rpxy-lib/src/handler/handler_main.rs diff --git a/src/handler/mod.rs b/rpxy-lib/src/handler/mod.rs similarity index 100% rename from src/handler/mod.rs rename to rpxy-lib/src/handler/mod.rs diff --git a/src/handler/utils_headers.rs b/rpxy-lib/src/handler/utils_headers.rs similarity index 100% rename from src/handler/utils_headers.rs rename to rpxy-lib/src/handler/utils_headers.rs diff --git a/src/handler/utils_request.rs b/rpxy-lib/src/handler/utils_request.rs similarity index 100% rename from src/handler/utils_request.rs rename to rpxy-lib/src/handler/utils_request.rs diff --git a/src/handler/utils_synth_response.rs b/rpxy-lib/src/handler/utils_synth_response.rs similarity index 100% rename from src/handler/utils_synth_response.rs rename to rpxy-lib/src/handler/utils_synth_response.rs diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs new file mode 100644 index 0000000..72f8a8a --- /dev/null +++ b/rpxy-lib/src/lib.rs @@ -0,0 +1,98 @@ +mod backend; +mod certs; +mod constants; +mod error; +mod globals; +mod handler; +mod log; +mod proxy; +mod utils; + +use crate::{error::*, globals::Globals, handler::HttpMessageHandlerBuilder, log::*, proxy::ProxyBuilder}; +use futures::future::select_all; +use hyper::Client; +// use hyper_trust_dns::TrustDnsResolver; +use std::sync::Arc; + +pub use crate::{ + certs::{CertsAndKeys, CryptoSource}, + globals::{AppConfig, AppConfigList, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri}, +}; +pub mod reexports { + pub use hyper::Uri; + pub use rustls::{Certificate, PrivateKey}; +} + +/// Entrypoint that creates and spawns tasks of reverse proxy services +pub async fn entrypoint( + proxy_config: &ProxyConfig, + app_config_list: &AppConfigList, + runtime_handle: &tokio::runtime::Handle, +) -> Result<()> +where + T: CryptoSource + Clone + Send + Sync + 'static, +{ + // For initial message logging + if proxy_config.listen_sockets.iter().any(|addr| addr.is_ipv6()) { + info!("Listen both IPv4 and IPv6") + } else { + info!("Listen IPv4") + } + if proxy_config.http_port.is_some() { + info!("Listen port: {}", proxy_config.http_port.unwrap()); + } + if proxy_config.https_port.is_some() { + info!("Listen port: {} (for TLS)", proxy_config.https_port.unwrap()); + } + if proxy_config.http3 { + info!("Experimental HTTP/3.0 is enabled. Note it is still very unstable."); + } + if !proxy_config.sni_consistency { + info!("Ignore consistency between TLS SNI and Host header (or Request line). Note it violates RFC."); + } + + // build global + let globals = Arc::new(Globals { + proxy_config: proxy_config.clone(), + backends: app_config_list.clone().try_into()?, + request_count: Default::default(), + runtime_handle: runtime_handle.clone(), + }); + // let connector = TrustDnsResolver::default().into_rustls_webpki_https_connector(); + let connector = hyper_rustls::HttpsConnectorBuilder::new() + .with_webpki_roots() + .https_or_http() + .enable_http1() + .enable_http2() + .build(); + + let msg_handler = HttpMessageHandlerBuilder::default() + .forwarder(Arc::new(Client::builder().build::<_, hyper::Body>(connector))) + .globals(globals.clone()) + .build()?; + + let addresses = globals.proxy_config.listen_sockets.clone(); + let futures = select_all(addresses.into_iter().map(|addr| { + let mut tls_enabled = false; + if let Some(https_port) = globals.proxy_config.https_port { + tls_enabled = https_port == addr.port() + } + + let proxy = ProxyBuilder::default() + .globals(globals.clone()) + .listening_on(addr) + .tls_enabled(tls_enabled) + .msg_handler(msg_handler.clone()) + .build() + .unwrap(); + + globals.runtime_handle.spawn(proxy.start()) + })); + + // wait for all future + if let (Ok(Err(e)), _, _) = futures.await { + error!("Some proxy services are down: {:?}", e); + }; + + Ok(()) +} diff --git a/src/log.rs b/rpxy-lib/src/log.rs similarity index 80% rename from src/log.rs rename to rpxy-lib/src/log.rs index d391607..0fb7812 100644 --- a/src/log.rs +++ b/rpxy-lib/src/log.rs @@ -95,26 +95,3 @@ impl MessageLog { ); } } - -pub fn init_logger() { - use tracing_subscriber::{fmt, prelude::*, EnvFilter}; - - let format_layer = fmt::layer() - .with_line_number(false) - .with_thread_ids(false) - .with_target(false) - .with_thread_names(true) - .with_target(true) - .with_level(true) - .compact(); - - // This limits the logger to emits only rpxy crate - let level_string = std::env::var(EnvFilter::DEFAULT_ENV).unwrap_or_else(|_| "info".to_string()); - let filter_layer = EnvFilter::new(format!("{}={}", env!("CARGO_PKG_NAME"), level_string)); - // let filter_layer = EnvFilter::from_default_env(); - - tracing_subscriber::registry() - .with(format_layer) - .with(filter_layer) - .init(); -} diff --git a/src/proxy/crypto_service.rs b/rpxy-lib/src/proxy/crypto_service.rs similarity index 100% rename from src/proxy/crypto_service.rs rename to rpxy-lib/src/proxy/crypto_service.rs diff --git a/src/proxy/mod.rs b/rpxy-lib/src/proxy/mod.rs similarity index 93% rename from src/proxy/mod.rs rename to rpxy-lib/src/proxy/mod.rs index 73a4002..749239c 100644 --- a/src/proxy/mod.rs +++ b/rpxy-lib/src/proxy/mod.rs @@ -4,5 +4,6 @@ mod proxy_client_cert; mod proxy_h3; mod proxy_main; mod proxy_tls; +mod socket; pub use proxy_main::{Proxy, ProxyBuilder, ProxyBuilderError}; diff --git a/src/proxy/proxy_client_cert.rs b/rpxy-lib/src/proxy/proxy_client_cert.rs similarity index 100% rename from src/proxy/proxy_client_cert.rs rename to rpxy-lib/src/proxy/proxy_client_cert.rs diff --git a/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs similarity index 100% rename from src/proxy/proxy_h3.rs rename to rpxy-lib/src/proxy/proxy_h3.rs diff --git a/src/proxy/proxy_main.rs b/rpxy-lib/src/proxy/proxy_main.rs similarity index 92% rename from src/proxy/proxy_main.rs rename to rpxy-lib/src/proxy/proxy_main.rs index e5a02a5..166f048 100644 --- a/src/proxy/proxy_main.rs +++ b/rpxy-lib/src/proxy/proxy_main.rs @@ -1,4 +1,4 @@ -// use super::proxy_handler::handle_request; +use super::socket::bind_tcp_socket; use crate::{ certs::CryptoSource, error::*, globals::Globals, handler::HttpMessageHandler, log::*, utils::ServerNameBytesExp, }; @@ -7,7 +7,6 @@ use hyper::{client::connect::Connect, server::conn::Http, service::service_fn, B use std::{net::SocketAddr, sync::Arc}; use tokio::{ io::{AsyncRead, AsyncWrite}, - net::TcpListener, runtime::Handle, time::{timeout, Duration}, }; @@ -94,7 +93,9 @@ where async fn start_without_tls(self, server: Http) -> Result<()> { let listener_service = async { - let tcp_listener = TcpListener::bind(&self.listening_on).await?; + let tcp_socket = bind_tcp_socket(&self.listening_on)?; + let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?; + // let tcp_listener = TcpListener::bind(&self.listening_on).await?; info!("Start TCP proxy serving with HTTP request for configured host names"); while let Ok((stream, _client_addr)) = tcp_listener.accept().await { self.clone().client_serve(stream, server.clone(), _client_addr, None); diff --git a/src/proxy/proxy_tls.rs b/rpxy-lib/src/proxy/proxy_tls.rs similarity index 90% rename from src/proxy/proxy_tls.rs rename to rpxy-lib/src/proxy/proxy_tls.rs index 5e846f0..5512eff 100644 --- a/src/proxy/proxy_tls.rs +++ b/rpxy-lib/src/proxy/proxy_tls.rs @@ -1,6 +1,9 @@ +#[cfg(feature = "http3")] +use super::socket::bind_udp_socket; use super::{ crypto_service::{CryptoReloader, ServerCrypto, ServerCryptoBase, SniServerCryptoMap}, proxy_main::{LocalExecutor, Proxy}, + socket::bind_tcp_socket, }; use crate::{certs::CryptoSource, constants::*, error::*, log::*, utils::BytesName}; use hot_reload::{ReloaderReceiver, ReloaderService}; @@ -10,10 +13,7 @@ use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerC #[cfg(feature = "http3")] use rustls::ServerConfig; use std::sync::Arc; -use tokio::{ - net::TcpListener, - time::{timeout, Duration}, -}; +use tokio::time::{timeout, Duration}; impl Proxy where @@ -26,7 +26,8 @@ where server: Http, mut server_crypto_rx: ReloaderReceiver, ) -> Result<()> { - let tcp_listener = TcpListener::bind(&self.listening_on).await?; + let tcp_socket = bind_tcp_socket(&self.listening_on)?; + let tcp_listener = tcp_socket.listen(self.globals.proxy_config.tcp_listen_backlog)?; info!("Start TCP proxy serving with HTTPS request for configured host names"); let mut server_crypto_map: Option> = None; @@ -119,12 +120,28 @@ where transport_config_quic .max_concurrent_bidi_streams(self.globals.proxy_config.h3_max_concurrent_bidistream) .max_concurrent_uni_streams(self.globals.proxy_config.h3_max_concurrent_unistream) - .max_idle_timeout(self.globals.proxy_config.h3_max_idle_timeout); + .max_idle_timeout( + self + .globals + .proxy_config + .h3_max_idle_timeout + .map(|v| quinn::IdleTimeout::try_from(v).unwrap()), + ); let mut server_config_h3 = QuicServerConfig::with_crypto(Arc::new(rustls_server_config)); server_config_h3.transport = Arc::new(transport_config_quic); server_config_h3.concurrent_connections(self.globals.proxy_config.h3_max_concurrent_connections); - let endpoint = Endpoint::server(server_config_h3, self.listening_on)?; + + // To reuse address + let udp_socket = bind_udp_socket(&self.listening_on)?; + let runtime = quinn::default_runtime() + .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "No async runtime found"))?; + let endpoint = Endpoint::new( + quinn::EndpointConfig::default(), + Some(server_config_h3), + udp_socket, + runtime, + )?; let mut server_crypto: Option> = None; loop { @@ -193,10 +210,10 @@ where #[cfg(not(feature = "http3"))] { tokio::select! { - _= self.cert_service(tx) => { + _= cert_reloader_service.start() => { error!("Cert service for TLS exited"); }, - _ = self.listener_service(server, rx) => { + _ = self.listener_service(server, cert_reloader_rx) => { error!("TCP proxy service for TLS exited"); }, else => { diff --git a/rpxy-lib/src/proxy/socket.rs b/rpxy-lib/src/proxy/socket.rs new file mode 100644 index 0000000..48f72e9 --- /dev/null +++ b/rpxy-lib/src/proxy/socket.rs @@ -0,0 +1,45 @@ +use crate::{error::*, log::*}; +#[cfg(feature = "http3")] +use socket2::{Domain, Protocol, Socket, Type}; +use std::net::SocketAddr; +#[cfg(feature = "http3")] +use std::net::UdpSocket; +use tokio::net::TcpSocket; + +/// Bind TCP socket to the given `SocketAddr`, and returns the TCP socket with `SO_REUSEADDR` and `SO_REUSEPORT` options. +/// This option is required to re-bind the socket address when the proxy instance is reconstructed. +pub(super) fn bind_tcp_socket(listening_on: &SocketAddr) -> Result { + let tcp_socket = if listening_on.is_ipv6() { + TcpSocket::new_v6() + } else { + TcpSocket::new_v4() + }?; + tcp_socket.set_reuseaddr(true)?; + tcp_socket.set_reuseport(true)?; + if let Err(e) = tcp_socket.bind(*listening_on) { + error!("Failed to bind TCP socket: {}", e); + return Err(RpxyError::Io(e)); + }; + Ok(tcp_socket) +} + +#[cfg(feature = "http3")] +/// Bind UDP socket to the given `SocketAddr`, and returns the UDP socket with `SO_REUSEADDR` and `SO_REUSEPORT` options. +/// This option is required to re-bind the socket address when the proxy instance is reconstructed. +pub(super) fn bind_udp_socket(listening_on: &SocketAddr) -> Result { + let socket = if listening_on.is_ipv6() { + Socket::new(Domain::IPV6, Type::DGRAM, Some(Protocol::UDP)) + } else { + Socket::new(Domain::IPV4, Type::DGRAM, Some(Protocol::UDP)) + }?; + socket.set_reuse_address(true)?; + socket.set_reuse_port(true)?; + + if let Err(e) = socket.bind(&(*listening_on).into()) { + error!("Failed to bind UDP socket: {}", e); + return Err(RpxyError::Io(e)); + }; + let udp_socket: UdpSocket = socket.into(); + + Ok(udp_socket) +} diff --git a/src/utils/bytes_name.rs b/rpxy-lib/src/utils/bytes_name.rs similarity index 98% rename from src/utils/bytes_name.rs rename to rpxy-lib/src/utils/bytes_name.rs index a093c41..5d2fef5 100644 --- a/src/utils/bytes_name.rs +++ b/rpxy-lib/src/utils/bytes_name.rs @@ -23,6 +23,9 @@ impl PathNameBytesExp { pub fn len(&self) -> usize { self.0.len() } + pub fn is_empty(&self) -> bool { + self.0.len() == 0 + } pub fn get(&self, index: I) -> Option<&I::Output> where I: std::slice::SliceIndex<[u8]>, diff --git a/src/utils/mod.rs b/rpxy-lib/src/utils/mod.rs similarity index 100% rename from src/utils/mod.rs rename to rpxy-lib/src/utils/mod.rs diff --git a/src/utils/socket_addr.rs b/rpxy-lib/src/utils/socket_addr.rs similarity index 100% rename from src/utils/socket_addr.rs rename to rpxy-lib/src/utils/socket_addr.rs diff --git a/src/config/mod.rs b/src/config/mod.rs deleted file mode 100644 index 54b2600..0000000 --- a/src/config/mod.rs +++ /dev/null @@ -1,4 +0,0 @@ -mod parse; -mod toml; - -pub use parse::build_globals; diff --git a/src/config/parse.rs b/src/config/parse.rs deleted file mode 100644 index 1dc2545..0000000 --- a/src/config/parse.rs +++ /dev/null @@ -1,116 +0,0 @@ -use super::toml::ConfigToml; -use crate::{ - backend::Backends, - cert_file_reader::CryptoFileSource, - error::{anyhow, ensure}, - globals::*, - log::*, - utils::BytesName, -}; -use clap::Arg; -use tokio::runtime::Handle; - -pub fn build_globals(runtime_handle: Handle) -> std::result::Result, anyhow::Error> { - let _ = include_str!("../../Cargo.toml"); - let options = clap::command!().arg( - Arg::new("config_file") - .long("config") - .short('c') - .value_name("FILE") - .help("Configuration file path like \"./config.toml\""), - ); - let matches = options.get_matches(); - - /////////////////////////////////// - let config = if let Some(config_file_path) = matches.get_one::("config_file") { - ConfigToml::new(config_file_path)? - } else { - // Default config Toml - ConfigToml::default() - }; - - /////////////////////////////////// - // build proxy config - let proxy_config: ProxyConfig = (&config).try_into()?; - // For loggings - if proxy_config.listen_sockets.iter().any(|addr| addr.is_ipv6()) { - info!("Listen both IPv4 and IPv6") - } else { - info!("Listen IPv4") - } - if proxy_config.http_port.is_some() { - info!("Listen port: {}", proxy_config.http_port.unwrap()); - } - if proxy_config.https_port.is_some() { - info!("Listen port: {} (for TLS)", proxy_config.https_port.unwrap()); - } - if proxy_config.http3 { - info!("Experimental HTTP/3.0 is enabled. Note it is still very unstable."); - } - if !proxy_config.sni_consistency { - info!("Ignore consistency between TLS SNI and Host header (or Request line). Note it violates RFC."); - } - - /////////////////////////////////// - // backend_apps - let apps = config.apps.ok_or(anyhow!("Missing application spec"))?; - - // assertions for all backend apps - ensure!(!apps.0.is_empty(), "Wrong application spec."); - // if only https_port is specified, tls must be configured for all apps - if proxy_config.http_port.is_none() { - ensure!( - apps.0.iter().all(|(_, app)| app.tls.is_some()), - "Some apps serves only plaintext HTTP" - ); - } - // https redirection can be configured if both ports are active - if !(proxy_config.https_port.is_some() && proxy_config.http_port.is_some()) { - ensure!( - apps.0.iter().all(|(_, app)| { - if let Some(tls) = app.tls.as_ref() { - tls.https_redirection.is_none() - } else { - true - } - }), - "https_redirection can be specified only when both http_port and https_port are specified" - ); - } - - // build backends - let mut backends = Backends::new(); - for (app_name, app) in apps.0.iter() { - let server_name_string = app.server_name.as_ref().ok_or(anyhow!("No server name"))?; - let backend = app.try_into()?; - backends.apps.insert(server_name_string.to_server_name_vec(), backend); - info!("Registering application: {} ({})", app_name, server_name_string); - } - - // default backend application for plaintext http requests - if let Some(d) = config.default_app { - let d_sn: Vec<&str> = backends - .apps - .iter() - .filter(|(_k, v)| v.app_name == d) - .map(|(_, v)| v.server_name.as_ref()) - .collect(); - if !d_sn.is_empty() { - info!( - "Serving plaintext http for requests to unconfigured server_name by app {} (server_name: {}).", - d, d_sn[0] - ); - backends.default_server_name_bytes = Some(d_sn[0].to_server_name_vec()); - } - } - - /////////////////////////////////// - let globals = Globals { - proxy_config, - backends, - request_count: Default::default(), - runtime_handle, - }; - - Ok(globals) -} diff --git a/src/globals.rs b/src/globals.rs deleted file mode 100644 index b85733b..0000000 --- a/src/globals.rs +++ /dev/null @@ -1,121 +0,0 @@ -use crate::certs::CryptoSource; -use crate::{backend::Backends, constants::*}; -use std::net::SocketAddr; -use std::sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, -}; -use tokio::time::Duration; - -/// Global object containing proxy configurations and shared object like counters. -/// But note that in Globals, we do not have Mutex and RwLock. It is indeed, the context shared among async tasks. -pub struct Globals -where - T: CryptoSource, -{ - /// Configuration parameters for proxy transport and request handlers - pub proxy_config: ProxyConfig, // TODO: proxy configはarcに包んでこいつだけ使いまわせばいいように変えていく。backendsも? - - /// Backend application objects to which http request handler forward incoming requests - pub backends: Backends, - - /// Shared context - Counter for serving requests - pub request_count: RequestCount, - - /// Shared context - Async task runtime handler - pub runtime_handle: tokio::runtime::Handle, -} - -/// Configuration parameters for proxy transport and request handlers -pub struct ProxyConfig { - pub listen_sockets: Vec, // when instantiate server - pub http_port: Option, // when instantiate server - pub https_port: Option, // when instantiate server - - pub proxy_timeout: Duration, // when serving requests at Proxy - pub upstream_timeout: Duration, // when serving requests at Handler - - pub max_clients: usize, // when serving requests - pub max_concurrent_streams: u32, // when instantiate server - pub keepalive: bool, // when instantiate server - - // experimentals - pub sni_consistency: bool, // Handler - // All need to make packet acceptor - #[cfg(feature = "http3")] - pub http3: bool, - #[cfg(feature = "http3")] - pub h3_alt_svc_max_age: u32, - #[cfg(feature = "http3")] - pub h3_request_max_body_size: usize, - #[cfg(feature = "http3")] - pub h3_max_concurrent_bidistream: quinn::VarInt, - #[cfg(feature = "http3")] - pub h3_max_concurrent_unistream: quinn::VarInt, - #[cfg(feature = "http3")] - pub h3_max_concurrent_connections: u32, - #[cfg(feature = "http3")] - pub h3_max_idle_timeout: Option, -} - -impl Default for ProxyConfig { - fn default() -> Self { - Self { - listen_sockets: Vec::new(), - http_port: None, - https_port: None, - - // TODO: Reconsider each timeout values - proxy_timeout: Duration::from_secs(PROXY_TIMEOUT_SEC), - upstream_timeout: Duration::from_secs(UPSTREAM_TIMEOUT_SEC), - - max_clients: MAX_CLIENTS, - max_concurrent_streams: MAX_CONCURRENT_STREAMS, - keepalive: true, - - sni_consistency: true, - - #[cfg(feature = "http3")] - http3: false, - #[cfg(feature = "http3")] - h3_alt_svc_max_age: H3::ALT_SVC_MAX_AGE, - #[cfg(feature = "http3")] - h3_request_max_body_size: H3::REQUEST_MAX_BODY_SIZE, - #[cfg(feature = "http3")] - h3_max_concurrent_connections: H3::MAX_CONCURRENT_CONNECTIONS, - #[cfg(feature = "http3")] - h3_max_concurrent_bidistream: H3::MAX_CONCURRENT_BIDISTREAM.into(), - #[cfg(feature = "http3")] - h3_max_concurrent_unistream: H3::MAX_CONCURRENT_UNISTREAM.into(), - #[cfg(feature = "http3")] - h3_max_idle_timeout: Some(quinn::IdleTimeout::try_from(Duration::from_secs(H3::MAX_IDLE_TIMEOUT)).unwrap()), - } - } -} - -#[derive(Debug, Clone, Default)] -/// Counter for serving requests -pub struct RequestCount(Arc); - -impl RequestCount { - pub fn current(&self) -> usize { - self.0.load(Ordering::Relaxed) - } - - pub fn increment(&self) -> usize { - self.0.fetch_add(1, Ordering::Relaxed) - } - - pub fn decrement(&self) -> usize { - let mut count; - while { - count = self.0.load(Ordering::Relaxed); - count > 0 - && self - .0 - .compare_exchange(count, count - 1, Ordering::Relaxed, Ordering::Relaxed) - != Ok(count) - } {} - count - } -} diff --git a/src/main.rs b/src/main.rs deleted file mode 100644 index 7f8dcfc..0000000 --- a/src/main.rs +++ /dev/null @@ -1,94 +0,0 @@ -use certs::CryptoSource; -#[cfg(not(target_env = "msvc"))] -use tikv_jemallocator::Jemalloc; - -#[cfg(not(target_env = "msvc"))] -#[global_allocator] -static GLOBAL: Jemalloc = Jemalloc; - -mod backend; -mod cert_file_reader; -mod certs; -mod config; -mod constants; -mod error; -mod globals; -mod handler; -mod log; -mod proxy; -mod utils; - -use crate::{ - cert_file_reader::CryptoFileSource, config::build_globals, error::*, globals::*, handler::HttpMessageHandlerBuilder, - log::*, proxy::ProxyBuilder, -}; -use futures::future::select_all; -use hyper::Client; -// use hyper_trust_dns::TrustDnsResolver; -use std::sync::Arc; - -fn main() { - init_logger(); - - let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); - runtime_builder.enable_all(); - runtime_builder.thread_name("rpxy"); - let runtime = runtime_builder.build().unwrap(); - - runtime.block_on(async { - let globals: Globals = match build_globals(runtime.handle().clone()) { - Ok(g) => g, - Err(e) => { - error!("Invalid configuration: {}", e); - std::process::exit(1); - } - }; - - entrypoint(Arc::new(globals)).await.unwrap() - }); - warn!("rpxy exited!"); -} - -// entrypoint creates and spawns tasks of proxy services -async fn entrypoint(globals: Arc>) -> Result<()> -where - T: CryptoSource + Clone + Send + Sync + 'static, -{ - // let connector = TrustDnsResolver::default().into_rustls_webpki_https_connector(); - let connector = hyper_rustls::HttpsConnectorBuilder::new() - .with_webpki_roots() - .https_or_http() - .enable_http1() - .enable_http2() - .build(); - - let msg_handler = HttpMessageHandlerBuilder::default() - .forwarder(Arc::new(Client::builder().build::<_, hyper::Body>(connector))) - .globals(globals.clone()) - .build()?; - - let addresses = globals.proxy_config.listen_sockets.clone(); - let futures = select_all(addresses.into_iter().map(|addr| { - let mut tls_enabled = false; - if let Some(https_port) = globals.proxy_config.https_port { - tls_enabled = https_port == addr.port() - } - - let proxy = ProxyBuilder::default() - .globals(globals.clone()) - .listening_on(addr) - .tls_enabled(tls_enabled) - .msg_handler(msg_handler.clone()) - .build() - .unwrap(); - - globals.runtime_handle.spawn(proxy.start()) - })); - - // wait for all future - if let (Ok(_), _, _) = futures.await { - error!("Some proxy services are down"); - }; - - Ok(()) -}