commit
				
					
						ff54643e92
					
				
			
		
					 37 changed files with 4102 additions and 305 deletions
				
			
		
							
								
								
									
										5
									
								
								.gitignore
									
										
									
									
										vendored
									
									
								
							
							
						
						
									
										5
									
								
								.gitignore
									
										
									
									
										vendored
									
									
								
							|  | @ -3,14 +3,11 @@ | |||
| docker/log | ||||
| docker/cache | ||||
| docker/config | ||||
| docker/acme_registry | ||||
| 
 | ||||
| # Generated by Cargo | ||||
| # will have compiled files and executables | ||||
| /target/ | ||||
| 
 | ||||
| # Remove Cargo.lock from gitignore if creating an executable, leave it for libraries | ||||
| # More information here https://doc.rust-lang.org/cargo/guide/cargo-toml-vs-cargo-lock.html | ||||
| Cargo.lock | ||||
| 
 | ||||
| # These are backup files generated by rustfmt | ||||
| **/*.rs.bk | ||||
|  |  | |||
							
								
								
									
										14
									
								
								CHANGELOG.md
									
										
									
									
									
								
							
							
						
						
									
										14
									
								
								CHANGELOG.md
									
										
									
									
									
								
							|  | @ -1,6 +1,18 @@ | |||
| # CHANGELOG | ||||
| 
 | ||||
| ## 0.9.8 or 0.10.0 (Unreleased) | ||||
| ## 0.10.1 or 0.11.0 (Unreleased) | ||||
| 
 | ||||
| ## 0.10.0 | ||||
| 
 | ||||
| ### Important Changes | ||||
| 
 | ||||
| - [Breaking] We removed non-`watch` execute option and enabled the dynamic reloading of the config file by default. | ||||
| - We newly added `log-dir` execute option to specify the directory for `access.log`,`error.log` and `rpxy.log`. This is optional, and if not specified, the logs are written to the standard output by default. | ||||
| 
 | ||||
| ### Improvement | ||||
| 
 | ||||
| - Refactor: lots of minor improvements | ||||
| - Deps | ||||
| 
 | ||||
| ## 0.9.7 | ||||
| 
 | ||||
|  |  | |||
							
								
								
									
										3688
									
								
								Cargo.lock
									
										
									
										generated
									
									
									
										Normal file
									
								
							
							
						
						
									
										3688
									
								
								Cargo.lock
									
										
									
										generated
									
									
									
										Normal file
									
								
							
										
											
												File diff suppressed because it is too large
												Load diff
											
										
									
								
							|  | @ -1,11 +1,11 @@ | |||
| [workspace.package] | ||||
| version = "0.9.7" | ||||
| version = "0.10.0" | ||||
| authors = ["Jun Kurihara"] | ||||
| homepage = "https://github.com/junkurihara/rust-rpxy" | ||||
| repository = "https://github.com/junkurihara/rust-rpxy" | ||||
| license = "MIT" | ||||
| readme = "./README.md" | ||||
| edition = "2021" | ||||
| edition = "2024" | ||||
| publish = false | ||||
| 
 | ||||
| [workspace] | ||||
|  |  | |||
							
								
								
									
										21
									
								
								README.md
									
										
									
									
									
								
							
							
						
						
									
										21
									
								
								README.md
									
										
									
									
									
								
							|  | @ -8,6 +8,9 @@ | |||
| 
 | ||||
| > **WIP Project** | ||||
| 
 | ||||
| > [!NOTE] | ||||
| > This project is an HTTP, i.e., Layer 7, reverse-proxy. If you are looking for a TCP/UDP, i.e., Layer 4, reverse-proxy, please check my another project, [`rpxy-l4`](https://github.com/junkurihara/rust-rpxy-l4). | ||||
| 
 | ||||
| ## Introduction | ||||
| 
 | ||||
| `rpxy` [ahr-pik-see] is an implementation of simple and lightweight reverse-proxy with some additional features. The implementation is based on [`hyper`](https://github.com/hyperium/hyper), [`rustls`](https://github.com/rustls/rustls) and [`tokio`](https://github.com/tokio-rs/tokio), i.e., written in Rust [^pure_rust]. Our `rpxy` routes multiple host names to appropriate backend application servers while serving TLS connections. | ||||
|  | @ -36,7 +39,6 @@ Supported features are summarized as follows: | |||
| 
 | ||||
|  This project is still *work-in-progress*. But it is already working in some production environments and serves a number of domain names. Furthermore it *significantly outperforms* NGINX and Caddy, e.g., *1.5x faster than NGINX*, in the setting of a very simple HTTP reverse-proxy scenario (See [`bench`](./bench/) directory). | ||||
| 
 | ||||
| 
 | ||||
| ## Installing/Building an Executable Binary of `rpxy` | ||||
| 
 | ||||
| ### Building from Source | ||||
|  | @ -78,7 +80,7 @@ You can run `rpxy` with a configuration file like | |||
| % ./target/release/rpxy --config config.toml | ||||
| ``` | ||||
| 
 | ||||
| If you specify `-w` option along with the config file path, `rpxy` tracks the change of `config.toml` in the real-time manner and apply the change immediately without restarting the process. | ||||
| `rpxy` tracks the change of `config.toml` in the real-time manner and apply the change immediately without restarting the process. | ||||
| 
 | ||||
| The full help messages are given follows. | ||||
| 
 | ||||
|  | @ -86,12 +88,18 @@ The full help messages are given follows. | |||
| usage: rpxy [OPTIONS] --config <FILE> | ||||
| 
 | ||||
| Options: | ||||
|   -c, --config <FILE>  Configuration file path like ./config.toml | ||||
|   -w, --watch          Activate dynamic reloading of the config file via continuous monitoring | ||||
|   -h, --help           Print help | ||||
|   -V, --version        Print version | ||||
|   -c, --config <FILE>      Configuration file path like ./config.toml | ||||
|   -l, --log-dir <LOG_DIR>  Directory for log files. If not specified, logs are printed to stdout. | ||||
|   -h, --help               Print help | ||||
|   -V, --version            Print version | ||||
| ``` | ||||
| 
 | ||||
| If you set `--log-dir=<log_dir>`, the log files are created in the specified directory. Otherwise, the log is printed to stdout. | ||||
| 
 | ||||
| - `${log_dir}/access.log` for access log | ||||
| <!-- - `${log_dir}/error.log` for error log --> | ||||
| - `${log_dir}/rpxy.log` for system and error log | ||||
| 
 | ||||
| That's all! | ||||
| 
 | ||||
| ## Basic Configuration | ||||
|  | @ -446,7 +454,6 @@ todo! | |||
| - [`s2n-quic`](https://github.com/aws/s2n-quic) | ||||
| - [`rustls-acme`](https://github.com/FlorianUekermann/rustls-acme) | ||||
| 
 | ||||
| 
 | ||||
| ## License | ||||
| 
 | ||||
| `rpxy` is free, open-source software licensed under MIT License. | ||||
|  |  | |||
|  | @ -28,10 +28,10 @@ max_clients = 512 | |||
| listen_ipv6 = false | ||||
| 
 | ||||
| # Optional: App that serves all plaintext http request by referring to HOSTS or request header | ||||
| # execpt for configured application. | ||||
| # except for configured application. | ||||
| # Note that this is only for http. | ||||
| # Note that nothing is served for requests via https since secure channel cannot be | ||||
| # established for unconfigured server_name, and they are always rejected by checking SNI. | ||||
| # established for non-configured server_name, and they are always rejected by checking SNI. | ||||
| default_app = 'another_localhost' | ||||
| 
 | ||||
| ################################### | ||||
|  | @ -106,7 +106,7 @@ tls = { https_redirection = true, acme = true } | |||
| #      Experimantal settings      # | ||||
| ################################### | ||||
| [experimental] | ||||
| # Higly recommend not to be true. If true, you ignore RFC. if not specified, it is always false. | ||||
| # Highly recommend not to be true. If true, you ignore RFC. if not specified, it is always false. | ||||
| # This might be required to be true when a certificate is used by multiple backend hosts, especially in case where a TLS connection is re-used. | ||||
| # We should note that this strongly depends on the client implementation. | ||||
| ignore_sni_consistency = false | ||||
|  |  | |||
|  | @ -2,13 +2,13 @@ FROM ubuntu:24.04 AS base | |||
| LABEL maintainer="Jun Kurihara" | ||||
| 
 | ||||
| SHELL ["/bin/sh", "-x", "-c"] | ||||
| ENV SERIAL 2 | ||||
| ENV SERIAL=2 | ||||
| 
 | ||||
| ######################################## | ||||
| FROM --platform=$BUILDPLATFORM base AS builder | ||||
| 
 | ||||
| ENV CFLAGS=-Ofast | ||||
| ENV BUILD_DEPS curl make ca-certificates build-essential | ||||
| ENV BUILD_DEPS="curl make ca-certificates build-essential" | ||||
| ENV TARGET_SUFFIX=unknown-linux-gnu | ||||
| 
 | ||||
| WORKDIR /tmp | ||||
|  | @ -17,9 +17,9 @@ COPY . /tmp/ | |||
| 
 | ||||
| ARG TARGETARCH | ||||
| ARG CARGO_FEATURES | ||||
| ENV CARGO_FEATURES ${CARGO_FEATURES} | ||||
| ENV CARGO_FEATURES="${CARGO_FEATURES}" | ||||
| ARG ADDITIONAL_DEPS | ||||
| ENV ADDITIONAL_DEPS ${ADDITIONAL_DEPS} | ||||
| ENV ADDITIONAL_DEPS="${ADDITIONAL_DEPS}" | ||||
| 
 | ||||
| RUN if [ $TARGETARCH = "amd64" ]; then \ | ||||
|   echo "x86_64" > /arch; \ | ||||
|  | @ -30,7 +30,7 @@ RUN if [ $TARGETARCH = "amd64" ]; then \ | |||
|   exit 1; \ | ||||
|   fi | ||||
| 
 | ||||
| ENV RUSTFLAGS "-C link-arg=-s" | ||||
| ENV RUSTFLAGS="-C link-arg=-s" | ||||
| 
 | ||||
| RUN update-ca-certificates 2> /dev/null || true | ||||
| 
 | ||||
|  | @ -48,7 +48,7 @@ RUN apt-get update && apt-get install -qy --no-install-recommends $BUILD_DEPS ${ | |||
| ######################################## | ||||
| FROM --platform=$TARGETPLATFORM base AS runner | ||||
| 
 | ||||
| ENV RUNTIME_DEPS logrotate ca-certificates gosu | ||||
| ENV RUNTIME_DEPS="logrotate ca-certificates gosu" | ||||
| 
 | ||||
| RUN apt-get update && \ | ||||
|   apt-get install -qy --no-install-recommends $RUNTIME_DEPS && \ | ||||
|  |  | |||
|  | @ -9,11 +9,10 @@ There are several docker-specific environment variables. | |||
| - `HOST_USER` (default: `user`): User name executing `rpxy` inside the container. | ||||
| - `HOST_UID` (default: `900`): `UID` of `HOST_USER`. | ||||
| - `HOST_GID` (default: `900`): `GID` of `HOST_USER` | ||||
| - `LOG_LEVEL=debug|info|warn|error`: Log level | ||||
| - `LOG_TO_FILE=true|false`: Enable logging to the log file `/rpxy/log/rpxy.log` using `logrotate`. You should mount `/rpxy/log` via docker volume option if enabled. The log dir and file will be owned by the `HOST_USER` with `HOST_UID:HOST_GID` on the host machine. Hence, `HOST_USER`, `HOST_UID` and `HOST_GID` should be the same as ones of the user who executes the `rpxy` docker container on the host. | ||||
| - `WATCH=true|false` (default: `false`): Activate continuous watching of the config file if true. | ||||
| - `LOG_LEVEL=trace|debug|info|warn|error`: Log level | ||||
| - `LOG_TO_FILE=true|false`: Enable logging to the log files using `logrotate` (locations: system/error log = `/rpxy/log/rpxy.log`, and access log = `/rpxy/log/access.log`). You should mount `/rpxy/log` via docker volume option if enabled. The log dir and file will be owned by the `HOST_USER` with `HOST_UID:HOST_GID` on the host machine. Hence, `HOST_USER`, `HOST_UID` and `HOST_GID` should be the same as ones of the user who executes the `rpxy` docker container on the host. | ||||
| 
 | ||||
| Then, all you need is to mount your `config.toml` as `/etc/rpxy.toml` and certificates/private keys as you like through the docker volume option. **If `WATCH=true`, You need to mount a directory, e.g., `./rpxy-config/`, including `rpxy.toml` on `/rpxy/config` instead of a file to correctly track file changes**. This is a docker limitation. Even if `WATCH=false`, you can mount the dir onto `/rpxy/config` rather than `/etc/rpxy.toml`. A file mounted on `/etc/rpxy` is prioritized over a dir mounted on `/rpxy/config`. | ||||
| Then, all you need is to mount your `config.toml` as `/etc/rpxy.toml` and certificates/private keys as you like through the docker volume option. **You need to mount a directory, e.g., `./rpxy-config/`, including `rpxy.toml` on `/rpxy/config` instead of a file to dynamically track file changes**. This is a docker limitation. You can mount the dir onto `/rpxy/config` rather than `/etc/rpxy.toml`. A file mounted on `/etc/rpxy` is prioritized over a dir mounted on `/rpxy/config`. | ||||
| 
 | ||||
| See [`docker-compose.yml`](./docker-compose.yml) for the detailed configuration. Note that the file path of keys and certificates must be ones in your docker container. | ||||
| 
 | ||||
|  | @ -27,19 +26,25 @@ e.g. `-v rpxy/ca-certificates:/usr/local/share/ca-certificates` | |||
| 
 | ||||
| Differences among tags are summarized as follows. | ||||
| 
 | ||||
| ### Latest Builds | ||||
| ### Latest and versioned builds | ||||
| 
 | ||||
| - `latest`: Built from the `main` branch with default features, running on Ubuntu. | ||||
| - `latest-slim`, `slim`: Built by `musl` from the `main` branch with default features, running on Alpine. | ||||
| - `latest-s2n`, `s2n`: Built from the `main` branch with the `http3-s2n` feature, running on Ubuntu. | ||||
| - `*-pq`: Built with the `post-quantum` feature. This feature supports the post-quantum key exchange using `rustls-post-quantum` crate. | ||||
| Latest builds are shipped from the `main` branch when the new version is released. For example, when the version `x.y.z` is released, the following images are provided. | ||||
| 
 | ||||
| ### Nightly Builds | ||||
| - `latest`, `x.y.z`: Built with default features, running on Ubuntu. | ||||
| - `latest-slim`, `slim`, `x.y.z-slim` : Built by `musl` with default features, running on Alpine. | ||||
| - `latest-s2n`, `s2n`, `x.y.z-s2n`: Built with the `http3-s2n` feature, running on Ubuntu. | ||||
| 
 | ||||
| - `nightly`: Built from the `develop` branch with default features, running on Ubuntu. | ||||
| - `nightly-slim`: Built by `musl` from the `develop` branch with default features, running on Alpine. | ||||
| - `nightly-s2n`: Built from the `develop` branch with the `http3-s2n` feature, running on Ubuntu. | ||||
| - `*-pq`: Built with the `post-quantum` feature. This feature supports the hybridized post-quantum key exchange using `rustls-post-quantum` crate. | ||||
| Additionally, images built with `webpki-roots` are provided in a similar manner to the above (e.g., `latest-s2n-webpki-roots` and `s2n-webpki-roots` tagged for the same image). | ||||
| 
 | ||||
| ### Nightly builds | ||||
| 
 | ||||
| Nightly builds are shipped from the `develop` branch for every push. | ||||
| 
 | ||||
| - `nightly`: Built with default features, running on Ubuntu. | ||||
| - `nightly-slim`: Built by `musl` with default features, running on Alpine. | ||||
| - `nightly-s2n`: Built with the `http3-s2n` feature, running on Ubuntu. | ||||
| 
 | ||||
| Additionally, images built with `webpki-roots` are provided in a similar manner to the above (e.g., `nightly-s2n-webpki-roots`). | ||||
| 
 | ||||
| ## Caveats | ||||
| 
 | ||||
|  |  | |||
|  | @ -20,12 +20,11 @@ services: | |||
|         # - "linux/amd64" | ||||
|         - "linux/arm64" | ||||
|     environment: | ||||
|       - LOG_LEVEL=debug | ||||
|       - LOG_LEVEL=trace | ||||
|       - LOG_TO_FILE=true | ||||
|       - HOST_USER=jun | ||||
|       - HOST_UID=501 | ||||
|       - HOST_GID=501 | ||||
|       # - WATCH=true | ||||
|     tty: false | ||||
|     privileged: true | ||||
|     volumes: | ||||
|  |  | |||
|  | @ -20,12 +20,11 @@ services: | |||
|         # - "linux/amd64" | ||||
|         - "linux/arm64" | ||||
|     environment: | ||||
|       - LOG_LEVEL=debug | ||||
|       - LOG_LEVEL=trace | ||||
|       - LOG_TO_FILE=true | ||||
|       - HOST_USER=jun | ||||
|       - HOST_UID=501 | ||||
|       - HOST_GID=501 | ||||
|       # - WATCH=true | ||||
|     tty: false | ||||
|     privileged: true | ||||
|     volumes: | ||||
|  |  | |||
|  | @ -1,6 +1,7 @@ | |||
| #!/usr/bin/env sh | ||||
| LOG_DIR=/rpxy/log | ||||
| LOG_FILE=${LOG_DIR}/rpxy.log | ||||
| SYSTEM_LOG_FILE=${LOG_DIR}/rpxy.log | ||||
| ACCESS_LOG_FILE=${LOG_DIR}/access.log | ||||
| LOG_SIZE=10M | ||||
| LOG_NUM=10 | ||||
| 
 | ||||
|  | @ -43,8 +44,24 @@ include /etc/logrotate.d | |||
| # system-specific logs may be also be configured here. | ||||
| EOF | ||||
| 
 | ||||
|   cat > /etc/logrotate.d/rpxy.conf << EOF | ||||
| ${LOG_FILE} { | ||||
|   cat > /etc/logrotate.d/rpxy-system.conf << EOF | ||||
| ${SYSTEM_LOG_FILE} { | ||||
|     dateext | ||||
|     daily | ||||
|     missingok | ||||
|     rotate ${LOG_NUM} | ||||
|     notifempty | ||||
|     compress | ||||
|     delaycompress | ||||
|     dateformat -%Y-%m-%d-%s | ||||
|     size ${LOG_SIZE} | ||||
|     copytruncate | ||||
|     su ${USER} ${USER} | ||||
| } | ||||
| EOF | ||||
| 
 | ||||
|   cat > /etc/logrotate.d/rpxy-access.conf << EOF | ||||
| ${ACCESS_LOG_FILE} { | ||||
|     dateext | ||||
|     daily | ||||
|     missingok | ||||
|  | @ -157,10 +174,4 @@ fi | |||
| # Run rpxy | ||||
| cd /rpxy | ||||
| echo "rpxy: Start with user: ${USER} (${USER_ID}:${GROUP_ID})" | ||||
| if "${LOGGING}"; then | ||||
|   echo "rpxy: Start with writing log file" | ||||
|   gosu ${USER} sh -c "/rpxy/run.sh 2>&1 | tee ${LOG_FILE}" | ||||
| else | ||||
|   echo "rpxy: Start without writing log file" | ||||
|   gosu ${USER} sh -c "/rpxy/run.sh 2>&1" | ||||
| fi | ||||
| gosu ${USER} sh -c "/rpxy/run.sh 2>&1" | ||||
|  |  | |||
|  | @ -1,5 +1,7 @@ | |||
| #!/usr/bin/env sh | ||||
| CONFIG_FILE=/etc/rpxy.toml | ||||
| LOG_DIR=/rpxy/log | ||||
| LOGGING=${LOG_TO_FILE:-false} | ||||
| 
 | ||||
| # debug level logging | ||||
| if [ -z $LOG_LEVEL ]; then | ||||
|  | @ -7,19 +9,11 @@ if [ -z $LOG_LEVEL ]; then | |||
| fi | ||||
| echo "rpxy: Logging with level ${LOG_LEVEL}" | ||||
| 
 | ||||
| # continuously watch and reload the config file | ||||
| if [ -z $WATCH ]; then | ||||
|   WATCH=false | ||||
| else | ||||
|   if [ "$WATCH" = "true" ]; then | ||||
|     WATCH=true | ||||
|   else | ||||
|     WATCH=false | ||||
|   fi | ||||
| fi | ||||
| 
 | ||||
| if  $WATCH ; then | ||||
|   RUST_LOG=${LOG_LEVEL} /rpxy/bin/rpxy --config ${CONFIG_FILE} -w | ||||
| if "${LOGGING}"; then | ||||
|   echo "rpxy: Start with writing log files" | ||||
|   RUST_LOG=${LOG_LEVEL} /rpxy/bin/rpxy --config ${CONFIG_FILE} --log-dir ${LOG_DIR} | ||||
| else | ||||
|   echo "rpxy: Start without writing log files" | ||||
|   RUST_LOG=${LOG_LEVEL} /rpxy/bin/rpxy --config ${CONFIG_FILE} | ||||
| fi | ||||
|  |  | |||
|  | @ -16,23 +16,23 @@ post-quantum = ["rustls-post-quantum"] | |||
| [dependencies] | ||||
| url = { version = "2.5.4" } | ||||
| ahash = "0.8.11" | ||||
| thiserror = "2.0.11" | ||||
| thiserror = "2.0.12" | ||||
| tracing = "0.1.41" | ||||
| async-trait = "0.1.85" | ||||
| async-trait = "0.1.88" | ||||
| base64 = "0.22.1" | ||||
| aws-lc-rs = { version = "1.12.2", default-features = false, features = [ | ||||
| aws-lc-rs = { version = "1.13.0", default-features = false, features = [ | ||||
|   "aws-lc-sys", | ||||
| ] } | ||||
| blocking = "1.6.1" | ||||
| rustls = { version = "0.23.22", default-features = false, features = [ | ||||
| rustls = { version = "0.23.26", default-features = false, features = [ | ||||
|   "std", | ||||
|   "aws_lc_rs", | ||||
| ] } | ||||
| rustls-platform-verifier = { version = "0.5.0" } | ||||
| rustls-platform-verifier = { version = "0.5.2" } | ||||
| rustls-acme = { path = "../submodules/rustls-acme/", default-features = false, features = [ | ||||
|   "aws-lc-rs", | ||||
| ] } | ||||
| rustls-post-quantum = { version = "0.2.2", optional = true } | ||||
| tokio = { version = "1.43.0", default-features = false } | ||||
| tokio-util = { version = "0.7.13", default-features = false } | ||||
| tokio = { version = "1.44.2", default-features = false } | ||||
| tokio-util = { version = "0.7.15", default-features = false } | ||||
| tokio-stream = { version = "0.1.17", default-features = false } | ||||
|  |  | |||
|  | @ -77,7 +77,7 @@ impl AcmeManager { | |||
|   /// Returns a Vec<JoinHandle<()>> as a tasks handles and a map of domain to ServerConfig for challenge.
 | ||||
|   pub fn spawn_manager_tasks( | ||||
|     &self, | ||||
|     cancel_token: Option<tokio_util::sync::CancellationToken>, | ||||
|     cancel_token: tokio_util::sync::CancellationToken, | ||||
|   ) -> (Vec<tokio::task::JoinHandle<()>>, HashMap<String, Arc<ServerConfig>>) { | ||||
|     let rustls_client_config = rustls::ClientConfig::builder() | ||||
|       .dangerous() // The `Verifier` we're using is actually safe
 | ||||
|  | @ -115,13 +115,10 @@ impl AcmeManager { | |||
|                 } | ||||
|               } | ||||
|             }; | ||||
|             if let Some(cancel_token) = cancel_token.as_ref() { | ||||
|               tokio::select! { | ||||
|                 _ = task => {}, | ||||
|                 _ = cancel_token.cancelled() => { debug!("rpxy ACME manager task for {domain} terminated") } | ||||
|               } | ||||
|             } else { | ||||
|               task.await; | ||||
| 
 | ||||
|             tokio::select! { | ||||
|               _ = task => {}, | ||||
|               _ = cancel_token.cancelled() => { debug!("rpxy ACME manager task for {domain} terminated") } | ||||
|             } | ||||
|           } | ||||
|         }) | ||||
|  |  | |||
|  | @ -29,26 +29,28 @@ rpxy-lib = { path = "../rpxy-lib/", default-features = false, features = [ | |||
|   "sticky-cookie", | ||||
| ] } | ||||
| 
 | ||||
| mimalloc = { version = "*", default-features = false } | ||||
| anyhow = "1.0.95" | ||||
| # TODO: pin mimalloc due to compilation failure by musl | ||||
| mimalloc = { version = "=0.1.44", default-features = false } | ||||
| libmimalloc-sys = { version = "=0.1.40" } | ||||
| anyhow = "1.0.98" | ||||
| ahash = "0.8.11" | ||||
| serde = { version = "1.0.217", default-features = false, features = ["derive"] } | ||||
| tokio = { version = "1.43.0", default-features = false, features = [ | ||||
| serde = { version = "1.0.219", default-features = false, features = ["derive"] } | ||||
| tokio = { version = "1.44.2", default-features = false, features = [ | ||||
|   "net", | ||||
|   "rt-multi-thread", | ||||
|   "time", | ||||
|   "sync", | ||||
|   "macros", | ||||
| ] } | ||||
| tokio-util = { version = "0.7.13", default-features = false } | ||||
| async-trait = "0.1.85" | ||||
| tokio-util = { version = "0.7.15", default-features = false } | ||||
| async-trait = "0.1.88" | ||||
| futures-util = { version = "0.3.31", default-features = false } | ||||
| 
 | ||||
| # config | ||||
| clap = { version = "4.5.27", features = ["std", "cargo", "wrap_help"] } | ||||
| toml = { version = "0.8.19", default-features = false, features = ["parse"] } | ||||
| hot_reload = "0.1.8" | ||||
| serde_ignored = "0.1.10" | ||||
| clap = { version = "4.5.37", features = ["std", "cargo", "wrap_help"] } | ||||
| toml = { version = "0.8.22", default-features = false, features = ["parse"] } | ||||
| hot_reload = "0.1.9" | ||||
| serde_ignored = "0.1.11" | ||||
| 
 | ||||
| # logging | ||||
| tracing = { version = "0.1.41" } | ||||
|  |  | |||
|  | @ -1,18 +1,18 @@ | |||
| use super::toml::ConfigToml; | ||||
| use crate::error::{anyhow, ensure}; | ||||
| use ahash::HashMap; | ||||
| use clap::{Arg, ArgAction}; | ||||
| use clap::Arg; | ||||
| use hot_reload::{ReloaderReceiver, ReloaderService}; | ||||
| use rpxy_certs::{build_cert_reloader, CryptoFileSourceBuilder, CryptoReloader, ServerCryptoBase}; | ||||
| use rpxy_certs::{CryptoFileSourceBuilder, CryptoReloader, ServerCryptoBase, build_cert_reloader}; | ||||
| use rpxy_lib::{AppConfig, AppConfigList, ProxyConfig}; | ||||
| 
 | ||||
| #[cfg(feature = "acme")] | ||||
| use rpxy_acme::{AcmeManager, ACME_DIR_URL, ACME_REGISTRY_PATH}; | ||||
| use rpxy_acme::{ACME_DIR_URL, ACME_REGISTRY_PATH, AcmeManager}; | ||||
| 
 | ||||
| /// Parsed options
 | ||||
| pub struct Opts { | ||||
|   pub config_file_path: String, | ||||
|   pub watch: bool, | ||||
|   pub log_dir_path: Option<String>, | ||||
| } | ||||
| 
 | ||||
| /// Parse arg values passed from cli
 | ||||
|  | @ -28,19 +28,22 @@ pub fn parse_opts() -> Result<Opts, anyhow::Error> { | |||
|         .help("Configuration file path like ./config.toml"), | ||||
|     ) | ||||
|     .arg( | ||||
|       Arg::new("watch") | ||||
|         .long("watch") | ||||
|         .short('w') | ||||
|         .action(ArgAction::SetTrue) | ||||
|         .help("Activate dynamic reloading of the config file via continuous monitoring"), | ||||
|       Arg::new("log_dir") | ||||
|         .long("log-dir") | ||||
|         .short('l') | ||||
|         .value_name("LOG_DIR") | ||||
|         .help("Directory for log files. If not specified, logs are printed to stdout."), | ||||
|     ); | ||||
|   let matches = options.get_matches(); | ||||
| 
 | ||||
|   ///////////////////////////////////
 | ||||
|   let config_file_path = matches.get_one::<String>("config_file").unwrap().to_owned(); | ||||
|   let watch = matches.get_one::<bool>("watch").unwrap().to_owned(); | ||||
|   let log_dir_path = matches.get_one::<String>("log_dir").map(|v| v.to_owned()); | ||||
| 
 | ||||
|   Ok(Opts { config_file_path, watch }) | ||||
|   Ok(Opts { | ||||
|     config_file_path, | ||||
|     log_dir_path, | ||||
|   }) | ||||
| } | ||||
| 
 | ||||
| pub fn build_settings(config: &ConfigToml) -> std::result::Result<(ProxyConfig, AppConfigList), anyhow::Error> { | ||||
|  |  | |||
|  | @ -4,7 +4,7 @@ use crate::{ | |||
|   log::warn, | ||||
| }; | ||||
| use ahash::HashMap; | ||||
| use rpxy_lib::{reexports::Uri, AppConfig, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri}; | ||||
| use rpxy_lib::{AppConfig, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri, reexports::Uri}; | ||||
| use serde::Deserialize; | ||||
| use std::{fs, net::SocketAddr}; | ||||
| use tokio::time::Duration; | ||||
|  |  | |||
|  | @ -5,3 +5,6 @@ pub const CONFIG_WATCH_DELAY_SECS: u32 = 15; | |||
| #[cfg(feature = "cache")] | ||||
| // Cache directory
 | ||||
| pub const CACHE_DIR: &str = "./cache"; | ||||
| 
 | ||||
| pub(crate) const ACCESS_LOG_FILE: &str = "access.log"; | ||||
| pub(crate) const SYSTEM_LOG_FILE: &str = "rpxy.log"; | ||||
|  |  | |||
|  | @ -1,3 +1,5 @@ | |||
| use crate::constants::{ACCESS_LOG_FILE, SYSTEM_LOG_FILE}; | ||||
| use rpxy_lib::log_event_names; | ||||
| use std::str::FromStr; | ||||
| use tracing_subscriber::{fmt, prelude::*}; | ||||
| 
 | ||||
|  | @ -5,10 +7,77 @@ use tracing_subscriber::{fmt, prelude::*}; | |||
| pub use tracing::{debug, error, info, warn}; | ||||
| 
 | ||||
| /// Initialize the logger with the RUST_LOG environment variable.
 | ||||
| pub fn init_logger() { | ||||
| pub fn init_logger(log_dir_path: Option<&str>) { | ||||
|   let level_string = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string()); | ||||
|   let level = tracing::Level::from_str(level_string.as_str()).unwrap_or(tracing::Level::INFO); | ||||
| 
 | ||||
|   match log_dir_path { | ||||
|     None => { | ||||
|       // log to stdout
 | ||||
|       init_stdio_logger(level); | ||||
|     } | ||||
|     Some(log_dir_path) => { | ||||
|       // log to files
 | ||||
|       println!("Activate logging to files: {log_dir_path}"); | ||||
|       init_file_logger(level, log_dir_path); | ||||
|     } | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| /// file logging TODO:
 | ||||
| fn init_file_logger(level: tracing::Level, log_dir_path: &str) { | ||||
|   let log_dir_path = std::path::PathBuf::from(log_dir_path); | ||||
|   // create the directory if it does not exist
 | ||||
|   if !log_dir_path.exists() { | ||||
|     println!("Directory does not exist, creating: {}", log_dir_path.display()); | ||||
|     std::fs::create_dir_all(&log_dir_path).expect("Failed to create log directory"); | ||||
|   } | ||||
|   let access_log_path = log_dir_path.join(ACCESS_LOG_FILE); | ||||
|   let system_log_path = log_dir_path.join(SYSTEM_LOG_FILE); | ||||
|   println!("Access log: {}", access_log_path.display()); | ||||
|   println!("System and error log: {}", system_log_path.display()); | ||||
| 
 | ||||
|   let access_log = open_log_file(&access_log_path); | ||||
|   let system_log = open_log_file(&system_log_path); | ||||
| 
 | ||||
|   let reg = tracing_subscriber::registry(); | ||||
| 
 | ||||
|   let access_log_base = fmt::layer() | ||||
|     .with_line_number(false) | ||||
|     .with_thread_ids(false) | ||||
|     .with_thread_names(false) | ||||
|     .with_target(false) | ||||
|     .with_level(false) | ||||
|     .compact() | ||||
|     .with_ansi(false); | ||||
|   let reg = reg.with(access_log_base.with_writer(access_log).with_filter(AccessLogFilter)); | ||||
| 
 | ||||
|   let system_log_base = fmt::layer() | ||||
|     .with_line_number(false) | ||||
|     .with_thread_ids(false) | ||||
|     .with_thread_names(false) | ||||
|     .with_target(false) | ||||
|     .with_level(true) // with level for system log
 | ||||
|     .compact() | ||||
|     .with_ansi(false); | ||||
|   let reg = reg.with( | ||||
|     system_log_base | ||||
|       .with_writer(system_log) | ||||
|       .with_filter(tracing_subscriber::filter::filter_fn(move |metadata| { | ||||
|         (metadata | ||||
|           .target() | ||||
|           .starts_with(env!("CARGO_PKG_NAME").replace('-', "_").as_str()) | ||||
|           && metadata.name() != log_event_names::ACCESS_LOG | ||||
|           && metadata.level() <= &level) | ||||
|           || metadata.level() <= &tracing::Level::WARN.min(level) | ||||
|       })), | ||||
|   ); | ||||
| 
 | ||||
|   reg.init(); | ||||
| } | ||||
| 
 | ||||
| /// stdio logging
 | ||||
| fn init_stdio_logger(level: tracing::Level) { | ||||
|   // This limits the logger to emits only this crate with any level above RUST_LOG, for included crates it will emit only ERROR (in prod)/INFO (in dev) or above level.
 | ||||
|   let stdio_layer = fmt::layer().with_level(true).with_thread_ids(false); | ||||
|   if level <= tracing::Level::INFO { | ||||
|  | @ -42,3 +111,29 @@ pub fn init_logger() { | |||
|     tracing_subscriber::registry().with(stdio_layer).init(); | ||||
|   }; | ||||
| } | ||||
| 
 | ||||
| /// Access log filter
 | ||||
| struct AccessLogFilter; | ||||
| impl<S> tracing_subscriber::layer::Filter<S> for AccessLogFilter { | ||||
|   fn enabled(&self, metadata: &tracing::Metadata<'_>, _: &tracing_subscriber::layer::Context<'_, S>) -> bool { | ||||
|     metadata | ||||
|       .target() | ||||
|       .starts_with(env!("CARGO_PKG_NAME").replace('-', "_").as_str()) | ||||
|       && metadata.name().contains(log_event_names::ACCESS_LOG) | ||||
|       && metadata.level() <= &tracing::Level::INFO | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| #[inline] | ||||
| /// Create a file for logging
 | ||||
| fn open_log_file<P>(path: P) -> std::fs::File | ||||
| where | ||||
|   P: AsRef<std::path::Path>, | ||||
| { | ||||
|   // crate a file if it does not exist
 | ||||
|   std::fs::OpenOptions::new() | ||||
|     .create(true) | ||||
|     .append(true) | ||||
|     .open(path) | ||||
|     .expect("Failed to open the log file") | ||||
| } | ||||
|  |  | |||
|  | @ -9,19 +9,17 @@ mod log; | |||
| #[cfg(feature = "acme")] | ||||
| use crate::config::build_acme_manager; | ||||
| use crate::{ | ||||
|   config::{build_cert_manager, build_settings, parse_opts, ConfigToml, ConfigTomlReloader}, | ||||
|   config::{ConfigToml, ConfigTomlReloader, build_cert_manager, build_settings, parse_opts}, | ||||
|   constants::CONFIG_WATCH_DELAY_SECS, | ||||
|   error::*, | ||||
|   log::*, | ||||
| }; | ||||
| use hot_reload::{ReloaderReceiver, ReloaderService}; | ||||
| use rpxy_lib::{entrypoint, RpxyOptions, RpxyOptionsBuilder}; | ||||
| use rpxy_lib::{RpxyOptions, RpxyOptionsBuilder, entrypoint}; | ||||
| use std::sync::Arc; | ||||
| use tokio_util::sync::CancellationToken; | ||||
| 
 | ||||
| fn main() { | ||||
|   init_logger(); | ||||
| 
 | ||||
|   let mut runtime_builder = tokio::runtime::Builder::new_multi_thread(); | ||||
|   runtime_builder.enable_all(); | ||||
|   runtime_builder.thread_name("rpxy"); | ||||
|  | @ -30,40 +28,34 @@ fn main() { | |||
|   runtime.block_on(async { | ||||
|     // Initially load options
 | ||||
|     let Ok(parsed_opts) = parse_opts() else { | ||||
|       error!("Invalid toml file"); | ||||
|       std::process::exit(1); | ||||
|     }; | ||||
| 
 | ||||
|     if !parsed_opts.watch { | ||||
|       if let Err(e) = rpxy_service_without_watcher(&parsed_opts.config_file_path, runtime.handle().clone()).await { | ||||
|         error!("rpxy service existed: {e}"); | ||||
|         std::process::exit(1); | ||||
|       } | ||||
|     } else { | ||||
|       let (config_service, config_rx) = ReloaderService::<ConfigTomlReloader, ConfigToml, String>::new( | ||||
|         &parsed_opts.config_file_path, | ||||
|         CONFIG_WATCH_DELAY_SECS, | ||||
|         false, | ||||
|       ) | ||||
|       .await | ||||
|       .unwrap(); | ||||
|     init_logger(parsed_opts.log_dir_path.as_deref()); | ||||
| 
 | ||||
|       tokio::select! { | ||||
|         config_res = config_service.start() => { | ||||
|           if let Err(e) = config_res { | ||||
|             error!("config reloader service exited: {e}"); | ||||
|             std::process::exit(1); | ||||
|           } | ||||
|         } | ||||
|         rpxy_res = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => { | ||||
|           if let Err(e) = rpxy_res { | ||||
|             error!("rpxy service existed: {e}"); | ||||
|             std::process::exit(1); | ||||
|           } | ||||
|     let (config_service, config_rx) = ReloaderService::<ConfigTomlReloader, ConfigToml, String>::new( | ||||
|       &parsed_opts.config_file_path, | ||||
|       CONFIG_WATCH_DELAY_SECS, | ||||
|       false, | ||||
|     ) | ||||
|     .await | ||||
|     .unwrap(); | ||||
| 
 | ||||
|     tokio::select! { | ||||
|       config_res = config_service.start() => { | ||||
|         if let Err(e) = config_res { | ||||
|           error!("config reloader service exited: {e}"); | ||||
|           std::process::exit(1); | ||||
|         } | ||||
|       } | ||||
|       rpxy_res = rpxy_service(config_rx, runtime.handle().clone()) => { | ||||
|         if let Err(e) = rpxy_res { | ||||
|           error!("rpxy service existed: {e}"); | ||||
|           std::process::exit(1); | ||||
|         } | ||||
|       } | ||||
|       std::process::exit(0); | ||||
|     } | ||||
|     std::process::exit(0); | ||||
|   }); | ||||
| } | ||||
| 
 | ||||
|  | @ -99,7 +91,7 @@ impl RpxyService { | |||
|     }) | ||||
|   } | ||||
| 
 | ||||
|   async fn start(&self, cancel_token: Option<CancellationToken>) -> Result<(), anyhow::Error> { | ||||
|   async fn start(&self, cancel_token: CancellationToken) -> Result<(), anyhow::Error> { | ||||
|     let RpxyService { | ||||
|       runtime_handle, | ||||
|       proxy_conf, | ||||
|  | @ -114,17 +106,19 @@ impl RpxyService { | |||
|     { | ||||
|       let (acme_join_handles, server_config_acme_challenge) = acme_manager | ||||
|         .as_ref() | ||||
|         .map(|m| m.spawn_manager_tasks(cancel_token.as_ref().map(|t| t.child_token()))) | ||||
|         .map(|m| m.spawn_manager_tasks(cancel_token.child_token())) | ||||
|         .unwrap_or((vec![], Default::default())); | ||||
|       let rpxy_opts = RpxyOptionsBuilder::default() | ||||
|         .proxy_config(proxy_conf.clone()) | ||||
|         .app_config_list(app_conf.clone()) | ||||
|         .cert_rx(cert_rx.clone()) | ||||
|         .runtime_handle(runtime_handle.clone()) | ||||
|         .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) | ||||
|         .server_configs_acme_challenge(Arc::new(server_config_acme_challenge)) | ||||
|         .build()?; | ||||
|       self.start_inner(rpxy_opts, acme_join_handles).await.map_err(|e| anyhow!(e)) | ||||
|       self | ||||
|         .start_inner(rpxy_opts, cancel_token, acme_join_handles) | ||||
|         .await | ||||
|         .map_err(|e| anyhow!(e)) | ||||
|     } | ||||
| 
 | ||||
|     #[cfg(not(feature = "acme"))] | ||||
|  | @ -134,9 +128,8 @@ impl RpxyService { | |||
|         .app_config_list(app_conf.clone()) | ||||
|         .cert_rx(cert_rx.clone()) | ||||
|         .runtime_handle(runtime_handle.clone()) | ||||
|         .cancel_token(cancel_token.as_ref().map(|t| t.child_token())) | ||||
|         .build()?; | ||||
|       self.start_inner(rpxy_opts).await.map_err(|e| anyhow!(e)) | ||||
|       self.start_inner(rpxy_opts, cancel_token).await.map_err(|e| anyhow!(e)) | ||||
|     } | ||||
|   } | ||||
| 
 | ||||
|  | @ -144,19 +137,19 @@ impl RpxyService { | |||
|   async fn start_inner( | ||||
|     &self, | ||||
|     rpxy_opts: RpxyOptions, | ||||
|     cancel_token: CancellationToken, | ||||
|     #[cfg(feature = "acme")] acme_task_handles: Vec<tokio::task::JoinHandle<()>>, | ||||
|   ) -> Result<(), anyhow::Error> { | ||||
|     let cancel_token = rpxy_opts.cancel_token.clone(); | ||||
|     let cancel_token = cancel_token.clone(); | ||||
|     let runtime_handle = rpxy_opts.runtime_handle.clone(); | ||||
| 
 | ||||
|     // spawn rpxy entrypoint, where cancellation token is possibly contained inside the service
 | ||||
|     let cancel_token_clone = cancel_token.clone(); | ||||
|     let child_cancel_token = cancel_token.child_token(); | ||||
|     let rpxy_handle = runtime_handle.spawn(async move { | ||||
|       if let Err(e) = entrypoint(&rpxy_opts).await { | ||||
|       if let Err(e) = entrypoint(&rpxy_opts, child_cancel_token).await { | ||||
|         error!("rpxy entrypoint exited on error: {e}"); | ||||
|         if let Some(cancel_token) = cancel_token_clone { | ||||
|           cancel_token.cancel(); | ||||
|         } | ||||
|         cancel_token_clone.cancel(); | ||||
|         return Err(anyhow!(e)); | ||||
|       } | ||||
|       Ok(()) | ||||
|  | @ -169,24 +162,20 @@ impl RpxyService { | |||
|     // spawn certificate reloader service, where cert service does not have cancellation token inside the service
 | ||||
|     let cert_service = self.cert_service.as_ref().unwrap().clone(); | ||||
|     let cancel_token_clone = cancel_token.clone(); | ||||
|     let child_cancel_token = cancel_token.as_ref().map(|c| c.child_token()); | ||||
|     let child_cancel_token = cancel_token.child_token(); | ||||
|     let cert_handle = runtime_handle.spawn(async move { | ||||
|       if let Some(child_cancel_token) = child_cancel_token { | ||||
|         tokio::select! { | ||||
|           cert_res = cert_service.start() => { | ||||
|             if let Err(ref e) = cert_res { | ||||
|               error!("cert reloader service exited on error: {e}"); | ||||
|             } | ||||
|             cancel_token_clone.unwrap().cancel(); | ||||
|             cert_res.map_err(|e| anyhow!(e)) | ||||
|           } | ||||
|           _ = child_cancel_token.cancelled() => { | ||||
|             debug!("cert reloader service terminated"); | ||||
|             Ok(()) | ||||
|       tokio::select! { | ||||
|         cert_res = cert_service.start() => { | ||||
|           if let Err(ref e) = cert_res { | ||||
|             error!("cert reloader service exited on error: {e}"); | ||||
|           } | ||||
|           cancel_token_clone.cancel(); | ||||
|           cert_res.map_err(|e| anyhow!(e)) | ||||
|         } | ||||
|         _ = child_cancel_token.cancelled() => { | ||||
|           debug!("cert reloader service terminated"); | ||||
|           Ok(()) | ||||
|         } | ||||
|       } else { | ||||
|         cert_service.start().await.map_err(|e| anyhow!(e)) | ||||
|       } | ||||
|     }); | ||||
| 
 | ||||
|  | @ -221,9 +210,7 @@ impl RpxyService { | |||
|         if let Err(ref e) = acme_res { | ||||
|           error!("acme manager exited on error: {e}"); | ||||
|         } | ||||
|         if let Some(cancel_token) = cancel_token_clone { | ||||
|           cancel_token.cancel(); | ||||
|         } | ||||
|         cancel_token_clone.cancel(); | ||||
|         acme_res.map_err(|e| anyhow!(e)) | ||||
|       }); | ||||
|       let (rpxy_res, cert_res, acme_res) = tokio::join!(rpxy_handle, cert_handle, acme_handle); | ||||
|  | @ -238,17 +225,7 @@ impl RpxyService { | |||
|   } | ||||
| } | ||||
| 
 | ||||
| async fn rpxy_service_without_watcher( | ||||
|   config_file_path: &str, | ||||
|   runtime_handle: tokio::runtime::Handle, | ||||
| ) -> Result<(), anyhow::Error> { | ||||
|   info!("Start rpxy service"); | ||||
|   let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?; | ||||
|   let service = RpxyService::new(&config_toml, runtime_handle).await?; | ||||
|   service.start(None).await | ||||
| } | ||||
| 
 | ||||
| async fn rpxy_service_with_watcher( | ||||
| async fn rpxy_service( | ||||
|   mut config_rx: ReloaderReceiver<ConfigToml, String>, | ||||
|   runtime_handle: tokio::runtime::Handle, | ||||
| ) -> Result<(), anyhow::Error> { | ||||
|  | @ -268,7 +245,7 @@ async fn rpxy_service_with_watcher( | |||
| 
 | ||||
|     tokio::select! { | ||||
|       /* ---------- */ | ||||
|       rpxy_res = service.start(Some(cancel_token.clone())) => { | ||||
|       rpxy_res = service.start(cancel_token.clone()) => { | ||||
|         if let Err(ref e) = rpxy_res { | ||||
|           error!("rpxy service exited on error: {e}"); | ||||
|         } else { | ||||
|  |  | |||
|  | @ -19,23 +19,23 @@ http3 = [] | |||
| ahash = { version = "0.8.11" } | ||||
| tracing = { version = "0.1.41" } | ||||
| derive_builder = { version = "0.20.2" } | ||||
| thiserror = { version = "2.0.11" } | ||||
| hot_reload = { version = "0.1.8" } | ||||
| async-trait = { version = "0.1.85" } | ||||
| rustls = { version = "0.23.22", default-features = false, features = [ | ||||
| thiserror = { version = "2.0.12" } | ||||
| hot_reload = { version = "0.1.9" } | ||||
| async-trait = { version = "0.1.88" } | ||||
| rustls = { version = "0.23.26", default-features = false, features = [ | ||||
|   "std", | ||||
|   "aws_lc_rs", | ||||
| ] } | ||||
| rustls-pemfile = { version = "2.2.0" } | ||||
| rustls-webpki = { version = "0.102.8", default-features = false, features = [ | ||||
| rustls-webpki = { version = "0.103.1", default-features = false, features = [ | ||||
|   "std", | ||||
|   "aws_lc_rs", | ||||
|   "aws-lc-rs", | ||||
| ] } | ||||
| rustls-post-quantum = { version = "0.2.2", optional = true } | ||||
| x509-parser = { version = "0.17.0" } | ||||
| 
 | ||||
| [dev-dependencies] | ||||
| tokio = { version = "1.43.0", default-features = false, features = [ | ||||
| tokio = { version = "1.44.2", default-features = false, features = [ | ||||
|   "rt-multi-thread", | ||||
|   "macros", | ||||
| ] } | ||||
|  |  | |||
|  | @ -65,7 +65,7 @@ impl SingleServerCertsKeys { | |||
|       .cert_keys | ||||
|       .clone() | ||||
|       .iter() | ||||
|       .find_map(|k| if let Ok(sk) = any_supported_type(k) { Some(sk) } else { None }) | ||||
|       .find_map(|k| any_supported_type(k).ok()) | ||||
|       .ok_or_else(|| RpxyCertError::InvalidCertificateAndKey)?; | ||||
| 
 | ||||
|     let cert = self.certs.iter().map(|c| Certificate::from(c.to_vec())).collect::<Vec<_>>(); | ||||
|  |  | |||
|  | @ -36,12 +36,12 @@ post-quantum = [ | |||
| ] | ||||
| 
 | ||||
| [dependencies] | ||||
| rand = "0.9.0" | ||||
| rand = "0.9.1" | ||||
| ahash = "0.8.11" | ||||
| bytes = "1.9.0" | ||||
| bytes = "1.10.1" | ||||
| derive_builder = "0.20.2" | ||||
| futures = { version = "0.3.31", features = ["alloc", "async-await"] } | ||||
| tokio = { version = "1.43.0", default-features = false, features = [ | ||||
| tokio = { version = "1.44.2", default-features = false, features = [ | ||||
|   "net", | ||||
|   "rt-multi-thread", | ||||
|   "time", | ||||
|  | @ -49,19 +49,19 @@ tokio = { version = "1.43.0", default-features = false, features = [ | |||
|   "macros", | ||||
|   "fs", | ||||
| ] } | ||||
| tokio-util = { version = "0.7.13", default-features = false } | ||||
| tokio-util = { version = "0.7.15", default-features = false } | ||||
| pin-project-lite = "0.2.16" | ||||
| async-trait = "0.1.85" | ||||
| async-trait = "0.1.88" | ||||
| 
 | ||||
| # Error handling | ||||
| anyhow = "1.0.95" | ||||
| thiserror = "2.0.11" | ||||
| anyhow = "1.0.98" | ||||
| thiserror = "2.0.12" | ||||
| 
 | ||||
| # http for both server and client | ||||
| http = "1.2.0" | ||||
| http-body-util = "0.1.2" | ||||
| http = "1.3.1" | ||||
| http-body-util = "0.1.3" | ||||
| hyper = { version = "1.6.0", default-features = false } | ||||
| hyper-util = { version = "0.1.10", features = ["full"] } | ||||
| hyper-util = { version = "0.1.11", features = ["full"] } | ||||
| futures-util = { version = "0.3.31", default-features = false } | ||||
| futures-channel = { version = "0.3.31", default-features = false } | ||||
| 
 | ||||
|  | @ -79,10 +79,10 @@ hyper-rustls = { version = "0.27.5", default-features = false, features = [ | |||
| 
 | ||||
| # tls and cert management for server | ||||
| rpxy-certs = { path = "../rpxy-certs/", default-features = false } | ||||
| hot_reload = "0.1.8" | ||||
| rustls = { version = "0.23.22", default-features = false } | ||||
| hot_reload = "0.1.9" | ||||
| rustls = { version = "0.23.26", default-features = false } | ||||
| rustls-post-quantum = { version = "0.2.2", optional = true } | ||||
| tokio-rustls = { version = "0.26.1", features = ["early-data"] } | ||||
| tokio-rustls = { version = "0.26.2", features = ["early-data"] } | ||||
| 
 | ||||
| # acme | ||||
| rpxy-acme = { path = "../rpxy-acme/", default-features = false, optional = true } | ||||
|  | @ -91,28 +91,28 @@ rpxy-acme = { path = "../rpxy-acme/", default-features = false, optional = true | |||
| tracing = { version = "0.1.41" } | ||||
| 
 | ||||
| # http/3 | ||||
| quinn = { version = "0.11.6", optional = true } | ||||
| h3 = { version = "0.0.6", features = ["tracing"], optional = true } | ||||
| h3-quinn = { version = "0.0.7", optional = true } | ||||
| s2n-quic = { version = "1.52.1", path = "../submodules/s2n-quic/quic/s2n-quic/", default-features = false, features = [ | ||||
| quinn = { version = "0.11.7", optional = true } | ||||
| h3 = { version = "0.0.7", features = ["tracing"], optional = true } | ||||
| h3-quinn = { version = "0.0.9", optional = true } | ||||
| s2n-quic = { version = "1.57.0", path = "../submodules/s2n-quic/quic/s2n-quic/", default-features = false, features = [ | ||||
|   "provider-tls-rustls", | ||||
| ], optional = true } | ||||
| s2n-quic-core = { version = "0.52.1", path = "../submodules/s2n-quic/quic/s2n-quic-core", default-features = false, optional = true } | ||||
| s2n-quic-rustls = { version = "0.52.1", path = "../submodules/s2n-quic/quic/s2n-quic-rustls", optional = true } | ||||
| s2n-quic-core = { version = "0.57.0", path = "../submodules/s2n-quic/quic/s2n-quic-core", default-features = false, optional = true } | ||||
| s2n-quic-rustls = { version = "0.57.0", path = "../submodules/s2n-quic/quic/s2n-quic-rustls", optional = true } | ||||
| s2n-quic-h3 = { path = "../submodules/s2n-quic/quic/s2n-quic-h3/", features = [ | ||||
|   "tracing", | ||||
| ], optional = true } | ||||
| ########## | ||||
| # for UDP socket wit SO_REUSEADDR when h3 with quinn | ||||
| socket2 = { version = "0.5.8", features = ["all"], optional = true } | ||||
| socket2 = { version = "0.5.9", features = ["all"], optional = true } | ||||
| 
 | ||||
| # cache | ||||
| http-cache-semantics = { path = "../submodules/rusty-http-cache-semantics", default-features = false, optional = true } | ||||
| lru = { version = "0.13.0", optional = true } | ||||
| lru = { version = "0.14.0", optional = true } | ||||
| sha2 = { version = "0.10.8", default-features = false, optional = true } | ||||
| 
 | ||||
| # cookie handling for sticky cookie | ||||
| chrono = { version = "0.4.39", default-features = false, features = [ | ||||
| chrono = { version = "0.4.41", default-features = false, features = [ | ||||
|   "unstable-locales", | ||||
|   "alloc", | ||||
|   "clock", | ||||
|  |  | |||
|  | @ -32,3 +32,9 @@ pub const MAX_CACHE_EACH_SIZE: usize = 65_535; | |||
| pub const MAX_CACHE_EACH_SIZE_ON_MEMORY: usize = 4_096; | ||||
| 
 | ||||
| // TODO: max cache size in total
 | ||||
| 
 | ||||
| /// Logging event name TODO: Other separated logs?
 | ||||
| pub mod log_event_names { | ||||
|   /// access log
 | ||||
|   pub const ACCESS_LOG: &str = "rpxy::access"; | ||||
| } | ||||
|  |  | |||
							
								
								
									
										36
									
								
								rpxy-lib/src/forwarder/cache/cache_main.rs
									
										
									
									
										vendored
									
									
								
							
							
						
						
									
										36
									
								
								rpxy-lib/src/forwarder/cache/cache_main.rs
									
										
									
									
										vendored
									
									
								
							|  | @ -1,10 +1,10 @@ | |||
| use super::cache_error::*; | ||||
| use crate::{ | ||||
|   globals::Globals, | ||||
|   hyper_ext::body::{full, BoxBody, ResponseBody, UnboundedStreamBody}, | ||||
|   hyper_ext::body::{BoxBody, ResponseBody, UnboundedStreamBody, full}, | ||||
|   log::*, | ||||
| }; | ||||
| use base64::{engine::general_purpose, Engine as _}; | ||||
| use base64::{Engine as _, engine::general_purpose}; | ||||
| use bytes::{Buf, Bytes, BytesMut}; | ||||
| use futures::channel::mpsc; | ||||
| use http::{Request, Response, Uri}; | ||||
|  | @ -16,8 +16,8 @@ use sha2::{Digest, Sha256}; | |||
| use std::{ | ||||
|   path::{Path, PathBuf}, | ||||
|   sync::{ | ||||
|     atomic::{AtomicUsize, Ordering}, | ||||
|     Arc, Mutex, | ||||
|     atomic::{AtomicUsize, Ordering}, | ||||
|   }, | ||||
|   time::SystemTime, | ||||
| }; | ||||
|  | @ -59,9 +59,7 @@ impl RpxyCache { | |||
|     let max_each_size = globals.proxy_config.cache_max_each_size; | ||||
|     let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory; | ||||
|     if max_each_size < max_each_size_on_memory { | ||||
|       warn!( | ||||
|         "Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache" | ||||
|       ); | ||||
|       warn!("Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache"); | ||||
|       max_each_size_on_memory = max_each_size; | ||||
|     } | ||||
| 
 | ||||
|  | @ -89,12 +87,7 @@ impl RpxyCache { | |||
|   } | ||||
| 
 | ||||
|   /// Put response into the cache
 | ||||
|   pub(crate) async fn put( | ||||
|     &self, | ||||
|     uri: &hyper::Uri, | ||||
|     mut body: Incoming, | ||||
|     policy: &CachePolicy, | ||||
|   ) -> CacheResult<UnboundedStreamBody> { | ||||
|   pub(crate) async fn put(&self, uri: &hyper::Uri, mut body: Incoming, policy: &CachePolicy) -> CacheResult<UnboundedStreamBody> { | ||||
|     let cache_manager = self.inner.clone(); | ||||
|     let mut file_store = self.file_store.clone(); | ||||
|     let uri = uri.clone(); | ||||
|  | @ -155,7 +148,7 @@ impl RpxyCache { | |||
|       let mut hasher = Sha256::new(); | ||||
|       hasher.update(buf.as_ref()); | ||||
|       let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref()); | ||||
|       debug!("Cached data: {} bytes, hash = {:?}", size, hash_bytes); | ||||
|       trace!("Cached data: {} bytes, hash = {:?}", size, hash_bytes); | ||||
| 
 | ||||
|       // Create cache object
 | ||||
|       let cache_key = derive_cache_key_from_uri(&uri); | ||||
|  | @ -188,10 +181,7 @@ impl RpxyCache { | |||
| 
 | ||||
|   /// Get cached response
 | ||||
|   pub(crate) async fn get<R>(&self, req: &Request<R>) -> Option<Response<ResponseBody>> { | ||||
|     debug!( | ||||
|       "Current cache status: (total, on-memory, file) = {:?}", | ||||
|       self.count().await | ||||
|     ); | ||||
|     trace!("Current cache status: (total, on-memory, file) = {:?}", self.count().await); | ||||
|     let cache_key = derive_cache_key_from_uri(req.uri()); | ||||
| 
 | ||||
|     // First check cache chance
 | ||||
|  | @ -282,11 +272,7 @@ impl FileStore { | |||
|     }; | ||||
|   } | ||||
|   /// Read a temporary file cache
 | ||||
|   async fn read( | ||||
|     &self, | ||||
|     path: impl AsRef<Path> + Send + Sync + 'static, | ||||
|     hash: &Bytes, | ||||
|   ) -> CacheResult<UnboundedStreamBody> { | ||||
|   async fn read(&self, path: impl AsRef<Path> + Send + Sync + 'static, hash: &Bytes) -> CacheResult<UnboundedStreamBody> { | ||||
|     let inner = self.inner.read().await; | ||||
|     inner.read(path, hash).await | ||||
|   } | ||||
|  | @ -336,11 +322,7 @@ impl FileStoreInner { | |||
|   } | ||||
| 
 | ||||
|   /// Retrieve a stored temporary file cache
 | ||||
|   async fn read( | ||||
|     &self, | ||||
|     path: impl AsRef<Path> + Send + Sync + 'static, | ||||
|     hash: &Bytes, | ||||
|   ) -> CacheResult<UnboundedStreamBody> { | ||||
|   async fn read(&self, path: impl AsRef<Path> + Send + Sync + 'static, hash: &Bytes) -> CacheResult<UnboundedStreamBody> { | ||||
|     let Ok(mut file) = File::open(&path).await else { | ||||
|       warn!("Cache file object cannot be opened"); | ||||
|       return Err(CacheError::FailedToOpenCacheFile); | ||||
|  |  | |||
|  | @ -2,7 +2,6 @@ use crate::{constants::*, count::RequestCount}; | |||
| use hot_reload::ReloaderReceiver; | ||||
| use rpxy_certs::ServerCryptoBase; | ||||
| use std::{net::SocketAddr, time::Duration}; | ||||
| use tokio_util::sync::CancellationToken; | ||||
| 
 | ||||
| /// Global object containing proxy configurations and shared object like counters.
 | ||||
| /// But note that in Globals, we do not have Mutex and RwLock. It is indeed, the context shared among async tasks.
 | ||||
|  | @ -13,8 +12,6 @@ pub struct Globals { | |||
|   pub request_count: RequestCount, | ||||
|   /// Shared context - Async task runtime handler
 | ||||
|   pub runtime_handle: tokio::runtime::Handle, | ||||
|   /// Shared context - Notify object to stop async tasks
 | ||||
|   pub cancel_token: Option<CancellationToken>, | ||||
|   /// Shared context - Certificate reloader service receiver // TODO: newer one
 | ||||
|   pub cert_reloader_rx: Option<ReloaderReceiver<ServerCryptoBase>>, | ||||
| 
 | ||||
|  |  | |||
|  | @ -27,6 +27,7 @@ use std::sync::Arc; | |||
| use tokio_util::sync::CancellationToken; | ||||
| 
 | ||||
| /* ------------------------------------------------ */ | ||||
| pub use crate::constants::log_event_names; | ||||
| pub use crate::globals::{AppConfig, AppConfigList, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri}; | ||||
| pub mod reexports { | ||||
|   pub use hyper::Uri; | ||||
|  | @ -43,8 +44,6 @@ pub struct RpxyOptions { | |||
|   pub cert_rx: Option<ReloaderReceiver<ServerCryptoBase>>, // TODO:
 | ||||
|   /// Async task runtime handler
 | ||||
|   pub runtime_handle: tokio::runtime::Handle, | ||||
|   /// Notify object to stop async tasks
 | ||||
|   pub cancel_token: Option<CancellationToken>, | ||||
| 
 | ||||
|   #[cfg(feature = "acme")] | ||||
|   /// ServerConfig used for only ACME challenge for ACME domains
 | ||||
|  | @ -58,10 +57,10 @@ pub async fn entrypoint( | |||
|     app_config_list, | ||||
|     cert_rx, // TODO:
 | ||||
|     runtime_handle, | ||||
|     cancel_token, | ||||
|     #[cfg(feature = "acme")] | ||||
|     server_configs_acme_challenge, | ||||
|   }: &RpxyOptions, | ||||
|   cancel_token: CancellationToken, | ||||
| ) -> RpxyResult<()> { | ||||
|   #[cfg(all(feature = "http3-quinn", feature = "http3-s2n"))] | ||||
|   warn!("Both \"http3-quinn\" and \"http3-s2n\" features are enabled. \"http3-quinn\" will be used"); | ||||
|  | @ -117,7 +116,6 @@ pub async fn entrypoint( | |||
|     proxy_config: proxy_config.clone(), | ||||
|     request_count: Default::default(), | ||||
|     runtime_handle: runtime_handle.clone(), | ||||
|     cancel_token: cancel_token.clone(), | ||||
|     cert_reloader_rx: cert_rx.clone(), | ||||
| 
 | ||||
|     #[cfg(feature = "acme")] | ||||
|  | @ -153,25 +151,21 @@ pub async fn entrypoint( | |||
|       message_handler: message_handler.clone(), | ||||
|     }; | ||||
| 
 | ||||
|     let cancel_token = globals.cancel_token.as_ref().map(|t| t.child_token()); | ||||
|     let parent_cancel_token_clone = globals.cancel_token.clone(); | ||||
|     let cancel_token = cancel_token.clone(); | ||||
|     globals.runtime_handle.spawn(async move { | ||||
|       info!("rpxy proxy service for {listening_on} started"); | ||||
|       if let Some(cancel_token) = cancel_token { | ||||
|         tokio::select! { | ||||
|           _ = cancel_token.cancelled() => { | ||||
|             debug!("rpxy proxy service for {listening_on} terminated"); | ||||
|             Ok(()) | ||||
|           }, | ||||
|           proxy_res = proxy.start() => { | ||||
|             info!("rpxy proxy service for {listening_on} exited"); | ||||
|             // cancel other proxy tasks
 | ||||
|             parent_cancel_token_clone.unwrap().cancel(); | ||||
|             proxy_res | ||||
|           } | ||||
| 
 | ||||
|       tokio::select! { | ||||
|         _ = cancel_token.cancelled() => { | ||||
|           debug!("rpxy proxy service for {listening_on} terminated"); | ||||
|           Ok(()) | ||||
|         }, | ||||
|         proxy_res = proxy.start(cancel_token.child_token()) => { | ||||
|           info!("rpxy proxy service for {listening_on} exited"); | ||||
|           // cancel other proxy tasks
 | ||||
|           cancel_token.cancel(); | ||||
|           proxy_res | ||||
|         } | ||||
|       } else { | ||||
|         proxy.start().await | ||||
|       } | ||||
|     }) | ||||
|   }); | ||||
|  |  | |||
|  | @ -1 +1 @@ | |||
| pub use tracing::{debug, error, info, warn}; | ||||
| pub use tracing::{debug, error, info, trace, warn}; | ||||
|  |  | |||
|  | @ -71,7 +71,7 @@ where | |||
|         Ok(v) | ||||
|       } | ||||
|       Err(e) => { | ||||
|         error!("{e}"); | ||||
|         error!("{e}: {log_data}"); | ||||
|         let code = StatusCode::from(e); | ||||
|         log_data.status_code(&code).output(); | ||||
|         synthetic_error_response(code) | ||||
|  |  | |||
|  | @ -1,11 +1,11 @@ | |||
| use super::{handler_main::HandlerContext, utils_headers::*, utils_request::update_request_line, HttpMessageHandler}; | ||||
| use super::{HttpMessageHandler, handler_main::HandlerContext, utils_headers::*, utils_request::update_request_line}; | ||||
| use crate::{ | ||||
|   backend::{BackendApp, UpstreamCandidates}, | ||||
|   constants::RESPONSE_HEADER_SERVER, | ||||
|   log::*, | ||||
| }; | ||||
| use anyhow::{anyhow, ensure, Result}; | ||||
| use http::{header, HeaderValue, Request, Response, Uri}; | ||||
| use anyhow::{Result, anyhow, ensure}; | ||||
| use http::{HeaderValue, Request, Response, Uri, header}; | ||||
| use hyper_util::client::legacy::connect::Connect; | ||||
| use std::net::SocketAddr; | ||||
| 
 | ||||
|  | @ -66,7 +66,7 @@ where | |||
|     upstream_candidates: &UpstreamCandidates, | ||||
|     tls_enabled: bool, | ||||
|   ) -> Result<HandlerContext> { | ||||
|     debug!("Generate request to be forwarded"); | ||||
|     trace!("Generate request to be forwarded"); | ||||
| 
 | ||||
|     // Add te: trailer if contained in original request
 | ||||
|     let contains_te_trailers = { | ||||
|  |  | |||
|  | @ -34,11 +34,7 @@ impl<T> From<&http::Request<T>> for HttpMessageLog { | |||
|       client_addr: "".to_string(), | ||||
|       method: req.method().to_string(), | ||||
|       host: header_mapper(header::HOST), | ||||
|       p_and_q: req | ||||
|         .uri() | ||||
|         .path_and_query() | ||||
|         .map_or_else(|| "", |v| v.as_str()) | ||||
|         .to_string(), | ||||
|       p_and_q: req.uri().path_and_query().map_or_else(|| "", |v| v.as_str()).to_string(), | ||||
|       version: req.version(), | ||||
|       uri_scheme: req.uri().scheme_str().unwrap_or("").to_string(), | ||||
|       uri_host: req.uri().host().unwrap_or("").to_string(), | ||||
|  | @ -50,6 +46,33 @@ impl<T> From<&http::Request<T>> for HttpMessageLog { | |||
|   } | ||||
| } | ||||
| 
 | ||||
| impl std::fmt::Display for HttpMessageLog { | ||||
|   fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||||
|     write!( | ||||
|       f, | ||||
|       "{} <- {} -- {} {} {:?} -- {} -- {} \"{}\", \"{}\" \"{}\"", | ||||
|       if !self.host.is_empty() { | ||||
|         self.host.as_str() | ||||
|       } else { | ||||
|         self.uri_host.as_str() | ||||
|       }, | ||||
|       self.client_addr, | ||||
|       self.method, | ||||
|       self.p_and_q, | ||||
|       self.version, | ||||
|       self.status, | ||||
|       if !self.uri_scheme.is_empty() && !self.uri_host.is_empty() { | ||||
|         format!("{}://{}", self.uri_scheme, self.uri_host) | ||||
|       } else { | ||||
|         "".to_string() | ||||
|       }, | ||||
|       self.ua, | ||||
|       self.xff, | ||||
|       self.upstream | ||||
|     ) | ||||
|   } | ||||
| } | ||||
| 
 | ||||
| impl HttpMessageLog { | ||||
|   pub fn client_addr(&mut self, client_addr: &SocketAddr) -> &mut Self { | ||||
|     self.client_addr = client_addr.to_canonical().to_string(); | ||||
|  | @ -74,26 +97,8 @@ impl HttpMessageLog { | |||
| 
 | ||||
|   pub fn output(&self) { | ||||
|     info!( | ||||
|       "{} <- {} -- {} {} {:?} -- {} -- {} \"{}\", \"{}\" \"{}\"", | ||||
|       if !self.host.is_empty() { | ||||
|         self.host.as_str() | ||||
|       } else { | ||||
|         self.uri_host.as_str() | ||||
|       }, | ||||
|       self.client_addr, | ||||
|       self.method, | ||||
|       self.p_and_q, | ||||
|       self.version, | ||||
|       self.status, | ||||
|       if !self.uri_scheme.is_empty() && !self.uri_host.is_empty() { | ||||
|         format!("{}://{}", self.uri_scheme, self.uri_host) | ||||
|       } else { | ||||
|         "".to_string() | ||||
|       }, | ||||
|       self.ua, | ||||
|       self.xff, | ||||
|       self.upstream, | ||||
|       // self.tls_server_name
 | ||||
|       name: crate::constants::log_event_names::ACCESS_LOG, | ||||
|       "{}", self | ||||
|     ); | ||||
|   } | ||||
| } | ||||
|  |  | |||
|  | @ -33,7 +33,7 @@ where | |||
|     <<C as OpenStreams<Bytes>>::BidiStream as BidiStream<Bytes>>::SendStream: Send, | ||||
|   { | ||||
|     let mut h3_conn = h3::server::Connection::<_, Bytes>::new(quic_connection).await?; | ||||
|     info!( | ||||
|     debug!( | ||||
|       "QUIC/HTTP3 connection established from {:?} {}", | ||||
|       client_addr, | ||||
|       <&ServerName as TryInto<String>>::try_into(&tls_server_name).unwrap_or_default() | ||||
|  | @ -115,7 +115,7 @@ where | |||
|       let mut sender = body_sender; | ||||
|       let mut size = 0usize; | ||||
|       while let Some(mut body) = recv_stream.recv_data().await? { | ||||
|         debug!("HTTP/3 incoming request body: remaining {}", body.remaining()); | ||||
|         trace!("HTTP/3 incoming request body: remaining {}", body.remaining()); | ||||
|         size += body.remaining(); | ||||
|         if size > max_body_size { | ||||
|           error!( | ||||
|  | @ -129,9 +129,9 @@ where | |||
|       } | ||||
| 
 | ||||
|       // trailers: use inner for work around. (directly get trailer)
 | ||||
|       let trailers = recv_stream.as_mut().recv_trailers().await?; | ||||
|       let trailers = futures_util::future::poll_fn(|cx| recv_stream.as_mut().poll_recv_trailers(cx)).await?; | ||||
|       if trailers.is_some() { | ||||
|         debug!("HTTP/3 incoming request trailers"); | ||||
|         trace!("HTTP/3 incoming request trailers"); | ||||
|         sender.send_trailers(trailers.unwrap()).await?; | ||||
|       } | ||||
|       Ok(()) as RpxyResult<()> | ||||
|  | @ -154,13 +154,13 @@ where | |||
| 
 | ||||
|     match send_stream.send_response(new_res).await { | ||||
|       Ok(_) => { | ||||
|         debug!("HTTP/3 response to connection successful"); | ||||
|         trace!("HTTP/3 response to connection successful"); | ||||
|         // on-demand body streaming to downstream without expanding the object onto memory.
 | ||||
|         loop { | ||||
|           let frame = match new_body.frame().await { | ||||
|             Some(frame) => frame, | ||||
|             None => { | ||||
|               debug!("Response body finished"); | ||||
|               trace!("Response body finished"); | ||||
|               break; | ||||
|             } | ||||
|           } | ||||
|  |  | |||
|  | @ -22,6 +22,7 @@ use hyper_util::{client::legacy::connect::Connect, rt::TokioIo, server::conn::au | |||
| use rpxy_certs::ServerCrypto; | ||||
| use std::{net::SocketAddr, sync::Arc, time::Duration}; | ||||
| use tokio::time::timeout; | ||||
| use tokio_util::sync::CancellationToken; | ||||
| 
 | ||||
| /// Wrapper function to handle request for HTTP/1.1 and HTTP/2
 | ||||
| /// HTTP/3 is handled in proxy_h3.rs which directly calls the message handler
 | ||||
|  | @ -129,7 +130,7 @@ where | |||
|   } | ||||
| 
 | ||||
|   /// Start with TLS (HTTPS)
 | ||||
|   pub(super) async fn start_with_tls(&self) -> RpxyResult<()> { | ||||
|   pub(super) async fn start_with_tls(&self, cancel_token: CancellationToken) -> RpxyResult<()> { | ||||
|     #[cfg(not(any(feature = "http3-quinn", feature = "http3-s2n")))] | ||||
|     { | ||||
|       self.tls_listener_service().await?; | ||||
|  | @ -139,14 +140,37 @@ where | |||
|     #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] | ||||
|     { | ||||
|       if self.globals.proxy_config.http3 { | ||||
|         select! { | ||||
|           _ = self.tls_listener_service().fuse() => { | ||||
|             error!("TCP proxy service for TLS exited"); | ||||
|           }, | ||||
|           _ = self.h3_listener_service().fuse() => { | ||||
|             error!("UDP proxy service for QUIC exited"); | ||||
|         let jh_tls = self.globals.runtime_handle.spawn({ | ||||
|           let self_clone = self.clone(); | ||||
|           let cancel_token = cancel_token.clone(); | ||||
|           async move { | ||||
|             select! { | ||||
|               _ = self_clone.tls_listener_service().fuse() => { | ||||
|                 error!("TCP proxy service for TLS exited"); | ||||
|                 cancel_token.cancel(); | ||||
|               }, | ||||
|               _ = cancel_token.cancelled().fuse() => { | ||||
|                 debug!("Cancel token is called for TLS listener"); | ||||
|               } | ||||
|             } | ||||
|           } | ||||
|         }; | ||||
|         }); | ||||
|         let jh_h3 = self.globals.runtime_handle.spawn({ | ||||
|           let self_clone = self.clone(); | ||||
|           async move { | ||||
|             select! { | ||||
|               _ = self_clone.h3_listener_service().fuse() => { | ||||
|                 error!("UDP proxy service for QUIC exited"); | ||||
|                 cancel_token.cancel(); | ||||
|               }, | ||||
|               _ = cancel_token.cancelled().fuse() => { | ||||
|                 debug!("Cancel token is called for QUIC listener"); | ||||
|               } | ||||
|             } | ||||
|           } | ||||
|         }); | ||||
|         let _ = futures::future::join(jh_tls, jh_h3).await; | ||||
| 
 | ||||
|         Ok(()) | ||||
|       } else { | ||||
|         self.tls_listener_service().await?; | ||||
|  | @ -303,10 +327,10 @@ where | |||
|   } | ||||
| 
 | ||||
|   /// Entrypoint for HTTP/1.1, 2 and 3 servers
 | ||||
|   pub async fn start(&self) -> RpxyResult<()> { | ||||
|   pub async fn start(&self, cancel_token: CancellationToken) -> RpxyResult<()> { | ||||
|     let proxy_service = async { | ||||
|       if self.tls_enabled { | ||||
|         self.start_with_tls().await | ||||
|         self.start_with_tls(cancel_token).await | ||||
|       } else { | ||||
|         self.start_without_tls().await | ||||
|       } | ||||
|  |  | |||
|  | @ -2,8 +2,8 @@ use super::{proxy_main::Proxy, socket::bind_udp_socket}; | |||
| use crate::{error::*, log::*, name_exp::ByteName}; | ||||
| use hyper_util::client::legacy::connect::Connect; | ||||
| use quinn::{ | ||||
|   crypto::rustls::{HandshakeData, QuicServerConfig}, | ||||
|   Endpoint, TransportConfig, | ||||
|   crypto::rustls::{HandshakeData, QuicServerConfig}, | ||||
| }; | ||||
| use rpxy_certs::ServerCrypto; | ||||
| use rustls::ServerConfig; | ||||
|  | @ -82,7 +82,7 @@ where | |||
|             let client_addr = incoming.remote_address(); | ||||
|             let quic_connection = match incoming.await { | ||||
|               Ok(new_conn) => { | ||||
|                 info!("New connection established"); | ||||
|                 trace!("New connection established"); | ||||
|                 h3_quinn::Connection::new(new_conn) | ||||
|               }, | ||||
|               Err(e) => { | ||||
|  |  | |||
|  | @ -110,7 +110,7 @@ where | |||
| 
 | ||||
|     // quic event loop. this immediately cancels when crypto is updated by tokio::select!
 | ||||
|     while let Some(new_conn) = server.accept().await { | ||||
|       debug!("New QUIC connection established"); | ||||
|       trace!("New QUIC connection established"); | ||||
|       let Ok(Some(new_server_name)) = new_conn.server_name() else { | ||||
|         warn!("HTTP/3 no SNI is given"); | ||||
|         continue; | ||||
|  |  | |||
|  | @ -1 +1 @@ | |||
| Subproject commit a65d7e7000b49e6e1e14daf32baee094f4d8dacd | ||||
| Subproject commit cc7aeb870a62cd8d4b962de35927a241525ea30d | ||||
|  | @ -1 +1 @@ | |||
| Subproject commit 78524172f54af5e3d5a0404b230d265c82eaf446 | ||||
| Subproject commit f9d0c4feb83160b6fe66fe34da76c443fc2b381c | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Jun Kurihara
				Jun Kurihara