Merge pull request #271 from junkurihara/feat/log-file
Feat: separated log files for access and error (+system) logs
This commit is contained in:
commit
6598f8a342
27 changed files with 265 additions and 168 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -3,6 +3,7 @@
|
|||
docker/log
|
||||
docker/cache
|
||||
docker/config
|
||||
docker/acme_registry
|
||||
|
||||
# Generated by Cargo
|
||||
# will have compiled files and executables
|
||||
|
|
|
|||
14
CHANGELOG.md
14
CHANGELOG.md
|
|
@ -1,6 +1,18 @@
|
|||
# CHANGELOG
|
||||
|
||||
## 0.9.8 or 0.10.0 (Unreleased)
|
||||
## 0.10.1 or 0.11.0 (Unreleased)
|
||||
|
||||
## 0.10.0
|
||||
|
||||
### Important Changes
|
||||
|
||||
- [Breaking] We removed non-`watch` execute option and enabled the dynamic reloading of the config file by default.
|
||||
- We newly added `log-dir` execute option to specify the directory for `access.log`,`error.log` and `rpxy.log`. This is optional, and if not specified, the logs are written to the standard output by default.
|
||||
|
||||
### Improvement
|
||||
|
||||
- Refactor: lots of minor improvements
|
||||
- Deps
|
||||
|
||||
## 0.9.7
|
||||
|
||||
|
|
|
|||
8
Cargo.lock
generated
8
Cargo.lock
generated
|
|
@ -2084,7 +2084,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rpxy"
|
||||
version = "0.9.7"
|
||||
version = "0.10.0"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
|
|
@ -2108,7 +2108,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rpxy-acme"
|
||||
version = "0.9.7"
|
||||
version = "0.10.0"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"async-trait",
|
||||
|
|
@ -2129,7 +2129,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rpxy-certs"
|
||||
version = "0.9.7"
|
||||
version = "0.10.0"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"async-trait",
|
||||
|
|
@ -2147,7 +2147,7 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "rpxy-lib"
|
||||
version = "0.9.7"
|
||||
version = "0.10.0"
|
||||
dependencies = [
|
||||
"ahash",
|
||||
"anyhow",
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
[workspace.package]
|
||||
version = "0.9.7"
|
||||
version = "0.10.0"
|
||||
authors = ["Jun Kurihara"]
|
||||
homepage = "https://github.com/junkurihara/rust-rpxy"
|
||||
repository = "https://github.com/junkurihara/rust-rpxy"
|
||||
|
|
|
|||
16
README.md
16
README.md
|
|
@ -80,7 +80,7 @@ You can run `rpxy` with a configuration file like
|
|||
% ./target/release/rpxy --config config.toml
|
||||
```
|
||||
|
||||
If you specify `-w` option along with the config file path, `rpxy` tracks the change of `config.toml` in the real-time manner and apply the change immediately without restarting the process.
|
||||
`rpxy` tracks the change of `config.toml` in the real-time manner and apply the change immediately without restarting the process.
|
||||
|
||||
The full help messages are given follows.
|
||||
|
||||
|
|
@ -88,12 +88,18 @@ The full help messages are given follows.
|
|||
usage: rpxy [OPTIONS] --config <FILE>
|
||||
|
||||
Options:
|
||||
-c, --config <FILE> Configuration file path like ./config.toml
|
||||
-w, --watch Activate dynamic reloading of the config file via continuous monitoring
|
||||
-h, --help Print help
|
||||
-V, --version Print version
|
||||
-c, --config <FILE> Configuration file path like ./config.toml
|
||||
-l, --log-dir <LOG_DIR> Directory for log files. If not specified, logs are printed to stdout.
|
||||
-h, --help Print help
|
||||
-V, --version Print version
|
||||
```
|
||||
|
||||
If you set `--log-dir=<log_dir>`, the log files are created in the specified directory. Otherwise, the log is printed to stdout.
|
||||
|
||||
- `${log_dir}/access.log` for access log
|
||||
<!-- - `${log_dir}/error.log` for error log -->
|
||||
- `${log_dir}/rpxy.log` for system and error log
|
||||
|
||||
That's all!
|
||||
|
||||
## Basic Configuration
|
||||
|
|
|
|||
|
|
@ -2,13 +2,13 @@ FROM ubuntu:24.04 AS base
|
|||
LABEL maintainer="Jun Kurihara"
|
||||
|
||||
SHELL ["/bin/sh", "-x", "-c"]
|
||||
ENV SERIAL 2
|
||||
ENV SERIAL=2
|
||||
|
||||
########################################
|
||||
FROM --platform=$BUILDPLATFORM base AS builder
|
||||
|
||||
ENV CFLAGS=-Ofast
|
||||
ENV BUILD_DEPS curl make ca-certificates build-essential
|
||||
ENV BUILD_DEPS="curl make ca-certificates build-essential"
|
||||
ENV TARGET_SUFFIX=unknown-linux-gnu
|
||||
|
||||
WORKDIR /tmp
|
||||
|
|
@ -17,9 +17,9 @@ COPY . /tmp/
|
|||
|
||||
ARG TARGETARCH
|
||||
ARG CARGO_FEATURES
|
||||
ENV CARGO_FEATURES ${CARGO_FEATURES}
|
||||
ENV CARGO_FEATURES="${CARGO_FEATURES}"
|
||||
ARG ADDITIONAL_DEPS
|
||||
ENV ADDITIONAL_DEPS ${ADDITIONAL_DEPS}
|
||||
ENV ADDITIONAL_DEPS="${ADDITIONAL_DEPS}"
|
||||
|
||||
RUN if [ $TARGETARCH = "amd64" ]; then \
|
||||
echo "x86_64" > /arch; \
|
||||
|
|
@ -30,7 +30,7 @@ RUN if [ $TARGETARCH = "amd64" ]; then \
|
|||
exit 1; \
|
||||
fi
|
||||
|
||||
ENV RUSTFLAGS "-C link-arg=-s"
|
||||
ENV RUSTFLAGS="-C link-arg=-s"
|
||||
|
||||
RUN update-ca-certificates 2> /dev/null || true
|
||||
|
||||
|
|
@ -48,7 +48,7 @@ RUN apt-get update && apt-get install -qy --no-install-recommends $BUILD_DEPS ${
|
|||
########################################
|
||||
FROM --platform=$TARGETPLATFORM base AS runner
|
||||
|
||||
ENV RUNTIME_DEPS logrotate ca-certificates gosu
|
||||
ENV RUNTIME_DEPS="logrotate ca-certificates gosu"
|
||||
|
||||
RUN apt-get update && \
|
||||
apt-get install -qy --no-install-recommends $RUNTIME_DEPS && \
|
||||
|
|
|
|||
|
|
@ -9,11 +9,10 @@ There are several docker-specific environment variables.
|
|||
- `HOST_USER` (default: `user`): User name executing `rpxy` inside the container.
|
||||
- `HOST_UID` (default: `900`): `UID` of `HOST_USER`.
|
||||
- `HOST_GID` (default: `900`): `GID` of `HOST_USER`
|
||||
- `LOG_LEVEL=debug|info|warn|error`: Log level
|
||||
- `LOG_TO_FILE=true|false`: Enable logging to the log file `/rpxy/log/rpxy.log` using `logrotate`. You should mount `/rpxy/log` via docker volume option if enabled. The log dir and file will be owned by the `HOST_USER` with `HOST_UID:HOST_GID` on the host machine. Hence, `HOST_USER`, `HOST_UID` and `HOST_GID` should be the same as ones of the user who executes the `rpxy` docker container on the host.
|
||||
- `WATCH=true|false` (default: `false`): Activate continuous watching of the config file if true.
|
||||
- `LOG_LEVEL=trace|debug|info|warn|error`: Log level
|
||||
- `LOG_TO_FILE=true|false`: Enable logging to the log files using `logrotate` (locations: system/error log = `/rpxy/log/rpxy.log`, and access log = `/rpxy/log/access.log`). You should mount `/rpxy/log` via docker volume option if enabled. The log dir and file will be owned by the `HOST_USER` with `HOST_UID:HOST_GID` on the host machine. Hence, `HOST_USER`, `HOST_UID` and `HOST_GID` should be the same as ones of the user who executes the `rpxy` docker container on the host.
|
||||
|
||||
Then, all you need is to mount your `config.toml` as `/etc/rpxy.toml` and certificates/private keys as you like through the docker volume option. **If `WATCH=true`, You need to mount a directory, e.g., `./rpxy-config/`, including `rpxy.toml` on `/rpxy/config` instead of a file to correctly track file changes**. This is a docker limitation. Even if `WATCH=false`, you can mount the dir onto `/rpxy/config` rather than `/etc/rpxy.toml`. A file mounted on `/etc/rpxy` is prioritized over a dir mounted on `/rpxy/config`.
|
||||
Then, all you need is to mount your `config.toml` as `/etc/rpxy.toml` and certificates/private keys as you like through the docker volume option. **You need to mount a directory, e.g., `./rpxy-config/`, including `rpxy.toml` on `/rpxy/config` instead of a file to dynamically track file changes**. This is a docker limitation. You can mount the dir onto `/rpxy/config` rather than `/etc/rpxy.toml`. A file mounted on `/etc/rpxy` is prioritized over a dir mounted on `/rpxy/config`.
|
||||
|
||||
See [`docker-compose.yml`](./docker-compose.yml) for the detailed configuration. Note that the file path of keys and certificates must be ones in your docker container.
|
||||
|
||||
|
|
|
|||
|
|
@ -20,12 +20,11 @@ services:
|
|||
# - "linux/amd64"
|
||||
- "linux/arm64"
|
||||
environment:
|
||||
- LOG_LEVEL=debug
|
||||
- LOG_LEVEL=trace
|
||||
- LOG_TO_FILE=true
|
||||
- HOST_USER=jun
|
||||
- HOST_UID=501
|
||||
- HOST_GID=501
|
||||
# - WATCH=true
|
||||
tty: false
|
||||
privileged: true
|
||||
volumes:
|
||||
|
|
|
|||
|
|
@ -20,12 +20,11 @@ services:
|
|||
# - "linux/amd64"
|
||||
- "linux/arm64"
|
||||
environment:
|
||||
- LOG_LEVEL=debug
|
||||
- LOG_LEVEL=trace
|
||||
- LOG_TO_FILE=true
|
||||
- HOST_USER=jun
|
||||
- HOST_UID=501
|
||||
- HOST_GID=501
|
||||
# - WATCH=true
|
||||
tty: false
|
||||
privileged: true
|
||||
volumes:
|
||||
|
|
|
|||
|
|
@ -1,6 +1,7 @@
|
|||
#!/usr/bin/env sh
|
||||
LOG_DIR=/rpxy/log
|
||||
LOG_FILE=${LOG_DIR}/rpxy.log
|
||||
SYSTEM_LOG_FILE=${LOG_DIR}/rpxy.log
|
||||
ACCESS_LOG_FILE=${LOG_DIR}/access.log
|
||||
LOG_SIZE=10M
|
||||
LOG_NUM=10
|
||||
|
||||
|
|
@ -43,8 +44,24 @@ include /etc/logrotate.d
|
|||
# system-specific logs may be also be configured here.
|
||||
EOF
|
||||
|
||||
cat > /etc/logrotate.d/rpxy.conf << EOF
|
||||
${LOG_FILE} {
|
||||
cat > /etc/logrotate.d/rpxy-system.conf << EOF
|
||||
${SYSTEM_LOG_FILE} {
|
||||
dateext
|
||||
daily
|
||||
missingok
|
||||
rotate ${LOG_NUM}
|
||||
notifempty
|
||||
compress
|
||||
delaycompress
|
||||
dateformat -%Y-%m-%d-%s
|
||||
size ${LOG_SIZE}
|
||||
copytruncate
|
||||
su ${USER} ${USER}
|
||||
}
|
||||
EOF
|
||||
|
||||
cat > /etc/logrotate.d/rpxy-access.conf << EOF
|
||||
${ACCESS_LOG_FILE} {
|
||||
dateext
|
||||
daily
|
||||
missingok
|
||||
|
|
@ -157,10 +174,4 @@ fi
|
|||
# Run rpxy
|
||||
cd /rpxy
|
||||
echo "rpxy: Start with user: ${USER} (${USER_ID}:${GROUP_ID})"
|
||||
if "${LOGGING}"; then
|
||||
echo "rpxy: Start with writing log file"
|
||||
gosu ${USER} sh -c "/rpxy/run.sh 2>&1 | tee ${LOG_FILE}"
|
||||
else
|
||||
echo "rpxy: Start without writing log file"
|
||||
gosu ${USER} sh -c "/rpxy/run.sh 2>&1"
|
||||
fi
|
||||
gosu ${USER} sh -c "/rpxy/run.sh 2>&1"
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
#!/usr/bin/env sh
|
||||
CONFIG_FILE=/etc/rpxy.toml
|
||||
LOG_DIR=/rpxy/log
|
||||
LOGGING=${LOG_TO_FILE:-false}
|
||||
|
||||
# debug level logging
|
||||
if [ -z $LOG_LEVEL ]; then
|
||||
|
|
@ -7,19 +9,11 @@ if [ -z $LOG_LEVEL ]; then
|
|||
fi
|
||||
echo "rpxy: Logging with level ${LOG_LEVEL}"
|
||||
|
||||
# continuously watch and reload the config file
|
||||
if [ -z $WATCH ]; then
|
||||
WATCH=false
|
||||
else
|
||||
if [ "$WATCH" = "true" ]; then
|
||||
WATCH=true
|
||||
else
|
||||
WATCH=false
|
||||
fi
|
||||
fi
|
||||
|
||||
if $WATCH ; then
|
||||
RUST_LOG=${LOG_LEVEL} /rpxy/bin/rpxy --config ${CONFIG_FILE} -w
|
||||
if "${LOGGING}"; then
|
||||
echo "rpxy: Start with writing log files"
|
||||
RUST_LOG=${LOG_LEVEL} /rpxy/bin/rpxy --config ${CONFIG_FILE} --log-dir ${LOG_DIR}
|
||||
else
|
||||
echo "rpxy: Start without writing log files"
|
||||
RUST_LOG=${LOG_LEVEL} /rpxy/bin/rpxy --config ${CONFIG_FILE}
|
||||
fi
|
||||
|
|
|
|||
|
|
@ -1,18 +1,18 @@
|
|||
use super::toml::ConfigToml;
|
||||
use crate::error::{anyhow, ensure};
|
||||
use ahash::HashMap;
|
||||
use clap::{Arg, ArgAction};
|
||||
use clap::Arg;
|
||||
use hot_reload::{ReloaderReceiver, ReloaderService};
|
||||
use rpxy_certs::{build_cert_reloader, CryptoFileSourceBuilder, CryptoReloader, ServerCryptoBase};
|
||||
use rpxy_certs::{CryptoFileSourceBuilder, CryptoReloader, ServerCryptoBase, build_cert_reloader};
|
||||
use rpxy_lib::{AppConfig, AppConfigList, ProxyConfig};
|
||||
|
||||
#[cfg(feature = "acme")]
|
||||
use rpxy_acme::{AcmeManager, ACME_DIR_URL, ACME_REGISTRY_PATH};
|
||||
use rpxy_acme::{ACME_DIR_URL, ACME_REGISTRY_PATH, AcmeManager};
|
||||
|
||||
/// Parsed options
|
||||
pub struct Opts {
|
||||
pub config_file_path: String,
|
||||
pub watch: bool,
|
||||
pub log_dir_path: Option<String>,
|
||||
}
|
||||
|
||||
/// Parse arg values passed from cli
|
||||
|
|
@ -28,19 +28,22 @@ pub fn parse_opts() -> Result<Opts, anyhow::Error> {
|
|||
.help("Configuration file path like ./config.toml"),
|
||||
)
|
||||
.arg(
|
||||
Arg::new("watch")
|
||||
.long("watch")
|
||||
.short('w')
|
||||
.action(ArgAction::SetTrue)
|
||||
.help("Activate dynamic reloading of the config file via continuous monitoring"),
|
||||
Arg::new("log_dir")
|
||||
.long("log-dir")
|
||||
.short('l')
|
||||
.value_name("LOG_DIR")
|
||||
.help("Directory for log files. If not specified, logs are printed to stdout."),
|
||||
);
|
||||
let matches = options.get_matches();
|
||||
|
||||
///////////////////////////////////
|
||||
let config_file_path = matches.get_one::<String>("config_file").unwrap().to_owned();
|
||||
let watch = matches.get_one::<bool>("watch").unwrap().to_owned();
|
||||
let log_dir_path = matches.get_one::<String>("log_dir").map(|v| v.to_owned());
|
||||
|
||||
Ok(Opts { config_file_path, watch })
|
||||
Ok(Opts {
|
||||
config_file_path,
|
||||
log_dir_path,
|
||||
})
|
||||
}
|
||||
|
||||
pub fn build_settings(config: &ConfigToml) -> std::result::Result<(ProxyConfig, AppConfigList), anyhow::Error> {
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ use crate::{
|
|||
log::warn,
|
||||
};
|
||||
use ahash::HashMap;
|
||||
use rpxy_lib::{reexports::Uri, AppConfig, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri};
|
||||
use rpxy_lib::{AppConfig, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri, reexports::Uri};
|
||||
use serde::Deserialize;
|
||||
use std::{fs, net::SocketAddr};
|
||||
use tokio::time::Duration;
|
||||
|
|
|
|||
|
|
@ -5,3 +5,6 @@ pub const CONFIG_WATCH_DELAY_SECS: u32 = 15;
|
|||
#[cfg(feature = "cache")]
|
||||
// Cache directory
|
||||
pub const CACHE_DIR: &str = "./cache";
|
||||
|
||||
pub(crate) const ACCESS_LOG_FILE: &str = "access.log";
|
||||
pub(crate) const SYSTEM_LOG_FILE: &str = "rpxy.log";
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
use crate::constants::{ACCESS_LOG_FILE, SYSTEM_LOG_FILE};
|
||||
use rpxy_lib::log_event_names;
|
||||
use std::str::FromStr;
|
||||
use tracing_subscriber::{fmt, prelude::*};
|
||||
|
||||
|
|
@ -5,10 +7,77 @@ use tracing_subscriber::{fmt, prelude::*};
|
|||
pub use tracing::{debug, error, info, warn};
|
||||
|
||||
/// Initialize the logger with the RUST_LOG environment variable.
|
||||
pub fn init_logger() {
|
||||
pub fn init_logger(log_dir_path: Option<&str>) {
|
||||
let level_string = std::env::var("RUST_LOG").unwrap_or_else(|_| "info".to_string());
|
||||
let level = tracing::Level::from_str(level_string.as_str()).unwrap_or(tracing::Level::INFO);
|
||||
|
||||
match log_dir_path {
|
||||
None => {
|
||||
// log to stdout
|
||||
init_stdio_logger(level);
|
||||
}
|
||||
Some(log_dir_path) => {
|
||||
// log to files
|
||||
println!("Activate logging to files: {log_dir_path}");
|
||||
init_file_logger(level, log_dir_path);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// file logging TODO:
|
||||
fn init_file_logger(level: tracing::Level, log_dir_path: &str) {
|
||||
let log_dir_path = std::path::PathBuf::from(log_dir_path);
|
||||
// create the directory if it does not exist
|
||||
if !log_dir_path.exists() {
|
||||
println!("Directory does not exist, creating: {}", log_dir_path.display());
|
||||
std::fs::create_dir_all(&log_dir_path).expect("Failed to create log directory");
|
||||
}
|
||||
let access_log_path = log_dir_path.join(ACCESS_LOG_FILE);
|
||||
let system_log_path = log_dir_path.join(SYSTEM_LOG_FILE);
|
||||
println!("Access log: {}", access_log_path.display());
|
||||
println!("System and error log: {}", system_log_path.display());
|
||||
|
||||
let access_log = open_log_file(&access_log_path);
|
||||
let system_log = open_log_file(&system_log_path);
|
||||
|
||||
let reg = tracing_subscriber::registry();
|
||||
|
||||
let access_log_base = fmt::layer()
|
||||
.with_line_number(false)
|
||||
.with_thread_ids(false)
|
||||
.with_thread_names(false)
|
||||
.with_target(false)
|
||||
.with_level(false)
|
||||
.compact()
|
||||
.with_ansi(false);
|
||||
let reg = reg.with(access_log_base.with_writer(access_log).with_filter(AccessLogFilter));
|
||||
|
||||
let system_log_base = fmt::layer()
|
||||
.with_line_number(false)
|
||||
.with_thread_ids(false)
|
||||
.with_thread_names(false)
|
||||
.with_target(false)
|
||||
.with_level(true) // with level for system log
|
||||
.compact()
|
||||
.with_ansi(false);
|
||||
let reg = reg.with(
|
||||
system_log_base
|
||||
.with_writer(system_log)
|
||||
.with_filter(tracing_subscriber::filter::filter_fn(move |metadata| {
|
||||
(metadata
|
||||
.target()
|
||||
.starts_with(env!("CARGO_PKG_NAME").replace('-', "_").as_str())
|
||||
&& metadata.name() != log_event_names::ACCESS_LOG
|
||||
&& metadata.level() <= &level)
|
||||
|| metadata.level() <= &tracing::Level::WARN.min(level)
|
||||
})),
|
||||
);
|
||||
|
||||
reg.init();
|
||||
}
|
||||
|
||||
/// stdio logging
|
||||
fn init_stdio_logger(level: tracing::Level) {
|
||||
// This limits the logger to emits only this crate with any level above RUST_LOG, for included crates it will emit only ERROR (in prod)/INFO (in dev) or above level.
|
||||
let stdio_layer = fmt::layer().with_level(true).with_thread_ids(false);
|
||||
if level <= tracing::Level::INFO {
|
||||
|
|
@ -42,3 +111,29 @@ pub fn init_logger() {
|
|||
tracing_subscriber::registry().with(stdio_layer).init();
|
||||
};
|
||||
}
|
||||
|
||||
/// Access log filter
|
||||
struct AccessLogFilter;
|
||||
impl<S> tracing_subscriber::layer::Filter<S> for AccessLogFilter {
|
||||
fn enabled(&self, metadata: &tracing::Metadata<'_>, _: &tracing_subscriber::layer::Context<'_, S>) -> bool {
|
||||
metadata
|
||||
.target()
|
||||
.starts_with(env!("CARGO_PKG_NAME").replace('-', "_").as_str())
|
||||
&& metadata.name().contains(log_event_names::ACCESS_LOG)
|
||||
&& metadata.level() <= &tracing::Level::INFO
|
||||
}
|
||||
}
|
||||
|
||||
#[inline]
|
||||
/// Create a file for logging
|
||||
fn open_log_file<P>(path: P) -> std::fs::File
|
||||
where
|
||||
P: AsRef<std::path::Path>,
|
||||
{
|
||||
// crate a file if it does not exist
|
||||
std::fs::OpenOptions::new()
|
||||
.create(true)
|
||||
.append(true)
|
||||
.open(path)
|
||||
.expect("Failed to open the log file")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -20,8 +20,6 @@ use std::sync::Arc;
|
|||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
fn main() {
|
||||
init_logger();
|
||||
|
||||
let mut runtime_builder = tokio::runtime::Builder::new_multi_thread();
|
||||
runtime_builder.enable_all();
|
||||
runtime_builder.thread_name("rpxy");
|
||||
|
|
@ -30,40 +28,34 @@ fn main() {
|
|||
runtime.block_on(async {
|
||||
// Initially load options
|
||||
let Ok(parsed_opts) = parse_opts() else {
|
||||
error!("Invalid toml file");
|
||||
std::process::exit(1);
|
||||
};
|
||||
|
||||
if !parsed_opts.watch {
|
||||
if let Err(e) = rpxy_service_without_watcher(&parsed_opts.config_file_path, runtime.handle().clone()).await {
|
||||
error!("rpxy service existed: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
} else {
|
||||
let (config_service, config_rx) = ReloaderService::<ConfigTomlReloader, ConfigToml, String>::new(
|
||||
&parsed_opts.config_file_path,
|
||||
CONFIG_WATCH_DELAY_SECS,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
init_logger(parsed_opts.log_dir_path.as_deref());
|
||||
|
||||
tokio::select! {
|
||||
config_res = config_service.start() => {
|
||||
if let Err(e) = config_res {
|
||||
error!("config reloader service exited: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
rpxy_res = rpxy_service_with_watcher(config_rx, runtime.handle().clone()) => {
|
||||
if let Err(e) = rpxy_res {
|
||||
error!("rpxy service existed: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
let (config_service, config_rx) = ReloaderService::<ConfigTomlReloader, ConfigToml, String>::new(
|
||||
&parsed_opts.config_file_path,
|
||||
CONFIG_WATCH_DELAY_SECS,
|
||||
false,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
tokio::select! {
|
||||
config_res = config_service.start() => {
|
||||
if let Err(e) = config_res {
|
||||
error!("config reloader service exited: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
rpxy_res = rpxy_service(config_rx, runtime.handle().clone()) => {
|
||||
if let Err(e) = rpxy_res {
|
||||
error!("rpxy service existed: {e}");
|
||||
std::process::exit(1);
|
||||
}
|
||||
}
|
||||
std::process::exit(0);
|
||||
}
|
||||
std::process::exit(0);
|
||||
});
|
||||
}
|
||||
|
||||
|
|
@ -233,18 +225,7 @@ impl RpxyService {
|
|||
}
|
||||
}
|
||||
|
||||
async fn rpxy_service_without_watcher(
|
||||
config_file_path: &str,
|
||||
runtime_handle: tokio::runtime::Handle,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
info!("Start rpxy service");
|
||||
let config_toml = ConfigToml::new(config_file_path).map_err(|e| anyhow!("Invalid toml file: {e}"))?;
|
||||
let service = RpxyService::new(&config_toml, runtime_handle).await?;
|
||||
// Create cancel token that is never be called as dummy
|
||||
service.start(tokio_util::sync::CancellationToken::new()).await
|
||||
}
|
||||
|
||||
async fn rpxy_service_with_watcher(
|
||||
async fn rpxy_service(
|
||||
mut config_rx: ReloaderReceiver<ConfigToml, String>,
|
||||
runtime_handle: tokio::runtime::Handle,
|
||||
) -> Result<(), anyhow::Error> {
|
||||
|
|
|
|||
|
|
@ -32,3 +32,9 @@ pub const MAX_CACHE_EACH_SIZE: usize = 65_535;
|
|||
pub const MAX_CACHE_EACH_SIZE_ON_MEMORY: usize = 4_096;
|
||||
|
||||
// TODO: max cache size in total
|
||||
|
||||
/// Logging event name TODO: Other separated logs?
|
||||
pub mod log_event_names {
|
||||
/// access log
|
||||
pub const ACCESS_LOG: &str = "rpxy::access";
|
||||
}
|
||||
|
|
|
|||
36
rpxy-lib/src/forwarder/cache/cache_main.rs
vendored
36
rpxy-lib/src/forwarder/cache/cache_main.rs
vendored
|
|
@ -1,10 +1,10 @@
|
|||
use super::cache_error::*;
|
||||
use crate::{
|
||||
globals::Globals,
|
||||
hyper_ext::body::{full, BoxBody, ResponseBody, UnboundedStreamBody},
|
||||
hyper_ext::body::{BoxBody, ResponseBody, UnboundedStreamBody, full},
|
||||
log::*,
|
||||
};
|
||||
use base64::{engine::general_purpose, Engine as _};
|
||||
use base64::{Engine as _, engine::general_purpose};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use futures::channel::mpsc;
|
||||
use http::{Request, Response, Uri};
|
||||
|
|
@ -16,8 +16,8 @@ use sha2::{Digest, Sha256};
|
|||
use std::{
|
||||
path::{Path, PathBuf},
|
||||
sync::{
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
Arc, Mutex,
|
||||
atomic::{AtomicUsize, Ordering},
|
||||
},
|
||||
time::SystemTime,
|
||||
};
|
||||
|
|
@ -59,9 +59,7 @@ impl RpxyCache {
|
|||
let max_each_size = globals.proxy_config.cache_max_each_size;
|
||||
let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory;
|
||||
if max_each_size < max_each_size_on_memory {
|
||||
warn!(
|
||||
"Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache"
|
||||
);
|
||||
warn!("Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache");
|
||||
max_each_size_on_memory = max_each_size;
|
||||
}
|
||||
|
||||
|
|
@ -89,12 +87,7 @@ impl RpxyCache {
|
|||
}
|
||||
|
||||
/// Put response into the cache
|
||||
pub(crate) async fn put(
|
||||
&self,
|
||||
uri: &hyper::Uri,
|
||||
mut body: Incoming,
|
||||
policy: &CachePolicy,
|
||||
) -> CacheResult<UnboundedStreamBody> {
|
||||
pub(crate) async fn put(&self, uri: &hyper::Uri, mut body: Incoming, policy: &CachePolicy) -> CacheResult<UnboundedStreamBody> {
|
||||
let cache_manager = self.inner.clone();
|
||||
let mut file_store = self.file_store.clone();
|
||||
let uri = uri.clone();
|
||||
|
|
@ -155,7 +148,7 @@ impl RpxyCache {
|
|||
let mut hasher = Sha256::new();
|
||||
hasher.update(buf.as_ref());
|
||||
let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref());
|
||||
debug!("Cached data: {} bytes, hash = {:?}", size, hash_bytes);
|
||||
trace!("Cached data: {} bytes, hash = {:?}", size, hash_bytes);
|
||||
|
||||
// Create cache object
|
||||
let cache_key = derive_cache_key_from_uri(&uri);
|
||||
|
|
@ -188,10 +181,7 @@ impl RpxyCache {
|
|||
|
||||
/// Get cached response
|
||||
pub(crate) async fn get<R>(&self, req: &Request<R>) -> Option<Response<ResponseBody>> {
|
||||
debug!(
|
||||
"Current cache status: (total, on-memory, file) = {:?}",
|
||||
self.count().await
|
||||
);
|
||||
trace!("Current cache status: (total, on-memory, file) = {:?}", self.count().await);
|
||||
let cache_key = derive_cache_key_from_uri(req.uri());
|
||||
|
||||
// First check cache chance
|
||||
|
|
@ -282,11 +272,7 @@ impl FileStore {
|
|||
};
|
||||
}
|
||||
/// Read a temporary file cache
|
||||
async fn read(
|
||||
&self,
|
||||
path: impl AsRef<Path> + Send + Sync + 'static,
|
||||
hash: &Bytes,
|
||||
) -> CacheResult<UnboundedStreamBody> {
|
||||
async fn read(&self, path: impl AsRef<Path> + Send + Sync + 'static, hash: &Bytes) -> CacheResult<UnboundedStreamBody> {
|
||||
let inner = self.inner.read().await;
|
||||
inner.read(path, hash).await
|
||||
}
|
||||
|
|
@ -336,11 +322,7 @@ impl FileStoreInner {
|
|||
}
|
||||
|
||||
/// Retrieve a stored temporary file cache
|
||||
async fn read(
|
||||
&self,
|
||||
path: impl AsRef<Path> + Send + Sync + 'static,
|
||||
hash: &Bytes,
|
||||
) -> CacheResult<UnboundedStreamBody> {
|
||||
async fn read(&self, path: impl AsRef<Path> + Send + Sync + 'static, hash: &Bytes) -> CacheResult<UnboundedStreamBody> {
|
||||
let Ok(mut file) = File::open(&path).await else {
|
||||
warn!("Cache file object cannot be opened");
|
||||
return Err(CacheError::FailedToOpenCacheFile);
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@ use std::sync::Arc;
|
|||
use tokio_util::sync::CancellationToken;
|
||||
|
||||
/* ------------------------------------------------ */
|
||||
pub use crate::constants::log_event_names;
|
||||
pub use crate::globals::{AppConfig, AppConfigList, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri};
|
||||
pub mod reexports {
|
||||
pub use hyper::Uri;
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
pub use tracing::{debug, error, info, warn};
|
||||
pub use tracing::{debug, error, info, trace, warn};
|
||||
|
|
|
|||
|
|
@ -71,7 +71,7 @@ where
|
|||
Ok(v)
|
||||
}
|
||||
Err(e) => {
|
||||
error!("{e}");
|
||||
error!("{e}: {log_data}");
|
||||
let code = StatusCode::from(e);
|
||||
log_data.status_code(&code).output();
|
||||
synthetic_error_response(code)
|
||||
|
|
|
|||
|
|
@ -1,11 +1,11 @@
|
|||
use super::{handler_main::HandlerContext, utils_headers::*, utils_request::update_request_line, HttpMessageHandler};
|
||||
use super::{HttpMessageHandler, handler_main::HandlerContext, utils_headers::*, utils_request::update_request_line};
|
||||
use crate::{
|
||||
backend::{BackendApp, UpstreamCandidates},
|
||||
constants::RESPONSE_HEADER_SERVER,
|
||||
log::*,
|
||||
};
|
||||
use anyhow::{anyhow, ensure, Result};
|
||||
use http::{header, HeaderValue, Request, Response, Uri};
|
||||
use anyhow::{Result, anyhow, ensure};
|
||||
use http::{HeaderValue, Request, Response, Uri, header};
|
||||
use hyper_util::client::legacy::connect::Connect;
|
||||
use std::net::SocketAddr;
|
||||
|
||||
|
|
@ -66,7 +66,7 @@ where
|
|||
upstream_candidates: &UpstreamCandidates,
|
||||
tls_enabled: bool,
|
||||
) -> Result<HandlerContext> {
|
||||
debug!("Generate request to be forwarded");
|
||||
trace!("Generate request to be forwarded");
|
||||
|
||||
// Add te: trailer if contained in original request
|
||||
let contains_te_trailers = {
|
||||
|
|
|
|||
|
|
@ -34,11 +34,7 @@ impl<T> From<&http::Request<T>> for HttpMessageLog {
|
|||
client_addr: "".to_string(),
|
||||
method: req.method().to_string(),
|
||||
host: header_mapper(header::HOST),
|
||||
p_and_q: req
|
||||
.uri()
|
||||
.path_and_query()
|
||||
.map_or_else(|| "", |v| v.as_str())
|
||||
.to_string(),
|
||||
p_and_q: req.uri().path_and_query().map_or_else(|| "", |v| v.as_str()).to_string(),
|
||||
version: req.version(),
|
||||
uri_scheme: req.uri().scheme_str().unwrap_or("").to_string(),
|
||||
uri_host: req.uri().host().unwrap_or("").to_string(),
|
||||
|
|
@ -50,6 +46,33 @@ impl<T> From<&http::Request<T>> for HttpMessageLog {
|
|||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for HttpMessageLog {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
write!(
|
||||
f,
|
||||
"{} <- {} -- {} {} {:?} -- {} -- {} \"{}\", \"{}\" \"{}\"",
|
||||
if !self.host.is_empty() {
|
||||
self.host.as_str()
|
||||
} else {
|
||||
self.uri_host.as_str()
|
||||
},
|
||||
self.client_addr,
|
||||
self.method,
|
||||
self.p_and_q,
|
||||
self.version,
|
||||
self.status,
|
||||
if !self.uri_scheme.is_empty() && !self.uri_host.is_empty() {
|
||||
format!("{}://{}", self.uri_scheme, self.uri_host)
|
||||
} else {
|
||||
"".to_string()
|
||||
},
|
||||
self.ua,
|
||||
self.xff,
|
||||
self.upstream
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
impl HttpMessageLog {
|
||||
pub fn client_addr(&mut self, client_addr: &SocketAddr) -> &mut Self {
|
||||
self.client_addr = client_addr.to_canonical().to_string();
|
||||
|
|
@ -74,26 +97,8 @@ impl HttpMessageLog {
|
|||
|
||||
pub fn output(&self) {
|
||||
info!(
|
||||
"{} <- {} -- {} {} {:?} -- {} -- {} \"{}\", \"{}\" \"{}\"",
|
||||
if !self.host.is_empty() {
|
||||
self.host.as_str()
|
||||
} else {
|
||||
self.uri_host.as_str()
|
||||
},
|
||||
self.client_addr,
|
||||
self.method,
|
||||
self.p_and_q,
|
||||
self.version,
|
||||
self.status,
|
||||
if !self.uri_scheme.is_empty() && !self.uri_host.is_empty() {
|
||||
format!("{}://{}", self.uri_scheme, self.uri_host)
|
||||
} else {
|
||||
"".to_string()
|
||||
},
|
||||
self.ua,
|
||||
self.xff,
|
||||
self.upstream,
|
||||
// self.tls_server_name
|
||||
name: crate::constants::log_event_names::ACCESS_LOG,
|
||||
"{}", self
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ where
|
|||
<<C as OpenStreams<Bytes>>::BidiStream as BidiStream<Bytes>>::SendStream: Send,
|
||||
{
|
||||
let mut h3_conn = h3::server::Connection::<_, Bytes>::new(quic_connection).await?;
|
||||
info!(
|
||||
debug!(
|
||||
"QUIC/HTTP3 connection established from {:?} {}",
|
||||
client_addr,
|
||||
<&ServerName as TryInto<String>>::try_into(&tls_server_name).unwrap_or_default()
|
||||
|
|
@ -115,7 +115,7 @@ where
|
|||
let mut sender = body_sender;
|
||||
let mut size = 0usize;
|
||||
while let Some(mut body) = recv_stream.recv_data().await? {
|
||||
debug!("HTTP/3 incoming request body: remaining {}", body.remaining());
|
||||
trace!("HTTP/3 incoming request body: remaining {}", body.remaining());
|
||||
size += body.remaining();
|
||||
if size > max_body_size {
|
||||
error!(
|
||||
|
|
@ -131,7 +131,7 @@ where
|
|||
// trailers: use inner for work around. (directly get trailer)
|
||||
let trailers = futures_util::future::poll_fn(|cx| recv_stream.as_mut().poll_recv_trailers(cx)).await?;
|
||||
if trailers.is_some() {
|
||||
debug!("HTTP/3 incoming request trailers");
|
||||
trace!("HTTP/3 incoming request trailers");
|
||||
sender.send_trailers(trailers.unwrap()).await?;
|
||||
}
|
||||
Ok(()) as RpxyResult<()>
|
||||
|
|
@ -154,13 +154,13 @@ where
|
|||
|
||||
match send_stream.send_response(new_res).await {
|
||||
Ok(_) => {
|
||||
debug!("HTTP/3 response to connection successful");
|
||||
trace!("HTTP/3 response to connection successful");
|
||||
// on-demand body streaming to downstream without expanding the object onto memory.
|
||||
loop {
|
||||
let frame = match new_body.frame().await {
|
||||
Some(frame) => frame,
|
||||
None => {
|
||||
debug!("Response body finished");
|
||||
trace!("Response body finished");
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,8 +2,8 @@ use super::{proxy_main::Proxy, socket::bind_udp_socket};
|
|||
use crate::{error::*, log::*, name_exp::ByteName};
|
||||
use hyper_util::client::legacy::connect::Connect;
|
||||
use quinn::{
|
||||
crypto::rustls::{HandshakeData, QuicServerConfig},
|
||||
Endpoint, TransportConfig,
|
||||
crypto::rustls::{HandshakeData, QuicServerConfig},
|
||||
};
|
||||
use rpxy_certs::ServerCrypto;
|
||||
use rustls::ServerConfig;
|
||||
|
|
@ -82,7 +82,7 @@ where
|
|||
let client_addr = incoming.remote_address();
|
||||
let quic_connection = match incoming.await {
|
||||
Ok(new_conn) => {
|
||||
info!("New connection established");
|
||||
trace!("New connection established");
|
||||
h3_quinn::Connection::new(new_conn)
|
||||
},
|
||||
Err(e) => {
|
||||
|
|
|
|||
|
|
@ -110,7 +110,7 @@ where
|
|||
|
||||
// quic event loop. this immediately cancels when crypto is updated by tokio::select!
|
||||
while let Some(new_conn) = server.accept().await {
|
||||
debug!("New QUIC connection established");
|
||||
trace!("New QUIC connection established");
|
||||
let Ok(Some(new_server_name)) = new_conn.server_name() else {
|
||||
warn!("HTTP/3 no SNI is given");
|
||||
continue;
|
||||
|
|
|
|||
|
|
@ -1 +1 @@
|
|||
Subproject commit 3dd7d1ff0d311acd1c1abcc86fd9f428a90a0f78
|
||||
Subproject commit f9d0c4feb83160b6fe66fe34da76c443fc2b381c
|
||||
Loading…
Add table
Add a link
Reference in a new issue