From d5b020dcfb9088e9d0147d999dc2f472361de3f1 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Wed, 30 Apr 2025 17:36:14 +0900 Subject: [PATCH] feat: add log-to-file --- rpxy-bin/src/constants.rs | 3 + rpxy-bin/src/log.rs | 71 +++++++++++++++++-- rpxy-lib/src/constants.rs | 6 ++ rpxy-lib/src/forwarder/cache/cache_main.rs | 36 +++------- rpxy-lib/src/lib.rs | 1 + rpxy-lib/src/log.rs | 2 +- rpxy-lib/src/message_handler/handler_main.rs | 2 +- .../handler_manipulate_messages.rs | 8 +-- rpxy-lib/src/message_handler/http_log.rs | 55 +++++++------- rpxy-lib/src/proxy/proxy_h3.rs | 10 +-- rpxy-lib/src/proxy/proxy_quic_quinn.rs | 4 +- rpxy-lib/src/proxy/proxy_quic_s2n.rs | 2 +- 12 files changed, 130 insertions(+), 70 deletions(-) diff --git a/rpxy-bin/src/constants.rs b/rpxy-bin/src/constants.rs index 2f27735..889f7d5 100644 --- a/rpxy-bin/src/constants.rs +++ b/rpxy-bin/src/constants.rs @@ -5,3 +5,6 @@ pub const CONFIG_WATCH_DELAY_SECS: u32 = 15; #[cfg(feature = "cache")] // Cache directory pub const CACHE_DIR: &str = "./cache"; + +pub(crate) const ACCESS_LOG_FILE: &str = "access.log"; +pub(crate) const SYSTEM_LOG_FILE: &str = "rpxy.log"; diff --git a/rpxy-bin/src/log.rs b/rpxy-bin/src/log.rs index 16a6b60..151c94b 100644 --- a/rpxy-bin/src/log.rs +++ b/rpxy-bin/src/log.rs @@ -1,3 +1,5 @@ +use crate::constants::{ACCESS_LOG_FILE, SYSTEM_LOG_FILE}; +use rpxy_lib::log_event_names; use std::str::FromStr; use tracing_subscriber::{fmt, prelude::*}; @@ -22,10 +24,56 @@ pub fn init_logger(log_dir_path: Option<&str>) { } } -/// file logging +/// file logging TODO: fn init_file_logger(level: tracing::Level, log_dir_path: &str) { - // TODO: implement - init_stdio_logger(level); + let log_dir_path = std::path::PathBuf::from(log_dir_path); + // create the directory if it does not exist + if !log_dir_path.exists() { + println!("Directory does not exist, creating: {}", log_dir_path.display()); + std::fs::create_dir_all(&log_dir_path).expect("Failed to create log directory"); + } + let access_log_path = log_dir_path.join(ACCESS_LOG_FILE); + let system_log_path = log_dir_path.join(SYSTEM_LOG_FILE); + println!("Access log: {}", access_log_path.display()); + println!("System and error log: {}", system_log_path.display()); + + let access_log = open_log_file(&access_log_path); + let system_log = open_log_file(&system_log_path); + + let reg = tracing_subscriber::registry(); + + let access_log_base = fmt::layer() + .with_line_number(false) + .with_thread_ids(false) + .with_thread_names(false) + .with_target(false) + .with_level(false) + .compact() + .with_ansi(false); + let reg = reg.with(access_log_base.with_writer(access_log).with_filter(AccessLogFilter)); + + let system_log_base = fmt::layer() + .with_line_number(false) + .with_thread_ids(false) + .with_thread_names(false) + .with_target(false) + .with_level(true) // with level for system log + .compact() + .with_ansi(false); + let reg = reg.with( + system_log_base + .with_writer(system_log) + .with_filter(tracing_subscriber::filter::filter_fn(move |metadata| { + (metadata + .target() + .starts_with(env!("CARGO_PKG_NAME").replace('-', "_").as_str()) + && metadata.name() != log_event_names::ACCESS_LOG + && metadata.level() <= &level) + || metadata.level() <= &tracing::Level::WARN.min(level) + })), + ); + + reg.init(); } /// stdio logging @@ -64,9 +112,24 @@ fn init_stdio_logger(level: tracing::Level) { }; } +/// Access log filter +struct AccessLogFilter; +impl tracing_subscriber::layer::Filter for AccessLogFilter { + fn enabled(&self, metadata: &tracing::Metadata<'_>, _: &tracing_subscriber::layer::Context<'_, S>) -> bool { + metadata + .target() + .starts_with(env!("CARGO_PKG_NAME").replace('-', "_").as_str()) + && metadata.name().contains(log_event_names::ACCESS_LOG) + && metadata.level() <= &tracing::Level::INFO + } +} + #[inline] /// Create a file for logging -fn open_log_file(path: &str) -> std::fs::File { +fn open_log_file

(path: P) -> std::fs::File +where + P: AsRef, +{ // crate a file if it does not exist std::fs::OpenOptions::new() .create(true) diff --git a/rpxy-lib/src/constants.rs b/rpxy-lib/src/constants.rs index 9e2bc6a..3f21d87 100644 --- a/rpxy-lib/src/constants.rs +++ b/rpxy-lib/src/constants.rs @@ -32,3 +32,9 @@ pub const MAX_CACHE_EACH_SIZE: usize = 65_535; pub const MAX_CACHE_EACH_SIZE_ON_MEMORY: usize = 4_096; // TODO: max cache size in total + +/// Logging event name TODO: Other separated logs? +pub mod log_event_names { + /// access log + pub const ACCESS_LOG: &str = "rpxy::access"; +} diff --git a/rpxy-lib/src/forwarder/cache/cache_main.rs b/rpxy-lib/src/forwarder/cache/cache_main.rs index edb1ec5..37e1fa9 100644 --- a/rpxy-lib/src/forwarder/cache/cache_main.rs +++ b/rpxy-lib/src/forwarder/cache/cache_main.rs @@ -1,10 +1,10 @@ use super::cache_error::*; use crate::{ globals::Globals, - hyper_ext::body::{full, BoxBody, ResponseBody, UnboundedStreamBody}, + hyper_ext::body::{BoxBody, ResponseBody, UnboundedStreamBody, full}, log::*, }; -use base64::{engine::general_purpose, Engine as _}; +use base64::{Engine as _, engine::general_purpose}; use bytes::{Buf, Bytes, BytesMut}; use futures::channel::mpsc; use http::{Request, Response, Uri}; @@ -16,8 +16,8 @@ use sha2::{Digest, Sha256}; use std::{ path::{Path, PathBuf}, sync::{ - atomic::{AtomicUsize, Ordering}, Arc, Mutex, + atomic::{AtomicUsize, Ordering}, }, time::SystemTime, }; @@ -59,9 +59,7 @@ impl RpxyCache { let max_each_size = globals.proxy_config.cache_max_each_size; let mut max_each_size_on_memory = globals.proxy_config.cache_max_each_size_on_memory; if max_each_size < max_each_size_on_memory { - warn!( - "Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache" - ); + warn!("Maximum size of on memory cache per entry must be smaller than or equal to the maximum of each file cache"); max_each_size_on_memory = max_each_size; } @@ -89,12 +87,7 @@ impl RpxyCache { } /// Put response into the cache - pub(crate) async fn put( - &self, - uri: &hyper::Uri, - mut body: Incoming, - policy: &CachePolicy, - ) -> CacheResult { + pub(crate) async fn put(&self, uri: &hyper::Uri, mut body: Incoming, policy: &CachePolicy) -> CacheResult { let cache_manager = self.inner.clone(); let mut file_store = self.file_store.clone(); let uri = uri.clone(); @@ -155,7 +148,7 @@ impl RpxyCache { let mut hasher = Sha256::new(); hasher.update(buf.as_ref()); let hash_bytes = Bytes::copy_from_slice(hasher.finalize().as_ref()); - debug!("Cached data: {} bytes, hash = {:?}", size, hash_bytes); + trace!("Cached data: {} bytes, hash = {:?}", size, hash_bytes); // Create cache object let cache_key = derive_cache_key_from_uri(&uri); @@ -188,10 +181,7 @@ impl RpxyCache { /// Get cached response pub(crate) async fn get(&self, req: &Request) -> Option> { - debug!( - "Current cache status: (total, on-memory, file) = {:?}", - self.count().await - ); + trace!("Current cache status: (total, on-memory, file) = {:?}", self.count().await); let cache_key = derive_cache_key_from_uri(req.uri()); // First check cache chance @@ -282,11 +272,7 @@ impl FileStore { }; } /// Read a temporary file cache - async fn read( - &self, - path: impl AsRef + Send + Sync + 'static, - hash: &Bytes, - ) -> CacheResult { + async fn read(&self, path: impl AsRef + Send + Sync + 'static, hash: &Bytes) -> CacheResult { let inner = self.inner.read().await; inner.read(path, hash).await } @@ -336,11 +322,7 @@ impl FileStoreInner { } /// Retrieve a stored temporary file cache - async fn read( - &self, - path: impl AsRef + Send + Sync + 'static, - hash: &Bytes, - ) -> CacheResult { + async fn read(&self, path: impl AsRef + Send + Sync + 'static, hash: &Bytes) -> CacheResult { let Ok(mut file) = File::open(&path).await else { warn!("Cache file object cannot be opened"); return Err(CacheError::FailedToOpenCacheFile); diff --git a/rpxy-lib/src/lib.rs b/rpxy-lib/src/lib.rs index 4ec60e0..72ddbd3 100644 --- a/rpxy-lib/src/lib.rs +++ b/rpxy-lib/src/lib.rs @@ -27,6 +27,7 @@ use std::sync::Arc; use tokio_util::sync::CancellationToken; /* ------------------------------------------------ */ +pub use crate::constants::log_event_names; pub use crate::globals::{AppConfig, AppConfigList, ProxyConfig, ReverseProxyConfig, TlsConfig, UpstreamUri}; pub mod reexports { pub use hyper::Uri; diff --git a/rpxy-lib/src/log.rs b/rpxy-lib/src/log.rs index c55b5c2..f4af411 100644 --- a/rpxy-lib/src/log.rs +++ b/rpxy-lib/src/log.rs @@ -1 +1 @@ -pub use tracing::{debug, error, info, warn}; +pub use tracing::{debug, error, info, trace, warn}; diff --git a/rpxy-lib/src/message_handler/handler_main.rs b/rpxy-lib/src/message_handler/handler_main.rs index 23133a2..9496e6a 100644 --- a/rpxy-lib/src/message_handler/handler_main.rs +++ b/rpxy-lib/src/message_handler/handler_main.rs @@ -71,7 +71,7 @@ where Ok(v) } Err(e) => { - error!("{e}"); + error!("{e}: {log_data}"); let code = StatusCode::from(e); log_data.status_code(&code).output(); synthetic_error_response(code) diff --git a/rpxy-lib/src/message_handler/handler_manipulate_messages.rs b/rpxy-lib/src/message_handler/handler_manipulate_messages.rs index e7cecc4..680c6ee 100644 --- a/rpxy-lib/src/message_handler/handler_manipulate_messages.rs +++ b/rpxy-lib/src/message_handler/handler_manipulate_messages.rs @@ -1,11 +1,11 @@ -use super::{handler_main::HandlerContext, utils_headers::*, utils_request::update_request_line, HttpMessageHandler}; +use super::{HttpMessageHandler, handler_main::HandlerContext, utils_headers::*, utils_request::update_request_line}; use crate::{ backend::{BackendApp, UpstreamCandidates}, constants::RESPONSE_HEADER_SERVER, log::*, }; -use anyhow::{anyhow, ensure, Result}; -use http::{header, HeaderValue, Request, Response, Uri}; +use anyhow::{Result, anyhow, ensure}; +use http::{HeaderValue, Request, Response, Uri, header}; use hyper_util::client::legacy::connect::Connect; use std::net::SocketAddr; @@ -66,7 +66,7 @@ where upstream_candidates: &UpstreamCandidates, tls_enabled: bool, ) -> Result { - debug!("Generate request to be forwarded"); + trace!("Generate request to be forwarded"); // Add te: trailer if contained in original request let contains_te_trailers = { diff --git a/rpxy-lib/src/message_handler/http_log.rs b/rpxy-lib/src/message_handler/http_log.rs index acda9f0..40d13cc 100644 --- a/rpxy-lib/src/message_handler/http_log.rs +++ b/rpxy-lib/src/message_handler/http_log.rs @@ -34,11 +34,7 @@ impl From<&http::Request> for HttpMessageLog { client_addr: "".to_string(), method: req.method().to_string(), host: header_mapper(header::HOST), - p_and_q: req - .uri() - .path_and_query() - .map_or_else(|| "", |v| v.as_str()) - .to_string(), + p_and_q: req.uri().path_and_query().map_or_else(|| "", |v| v.as_str()).to_string(), version: req.version(), uri_scheme: req.uri().scheme_str().unwrap_or("").to_string(), uri_host: req.uri().host().unwrap_or("").to_string(), @@ -50,6 +46,33 @@ impl From<&http::Request> for HttpMessageLog { } } +impl std::fmt::Display for HttpMessageLog { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!( + f, + "{} <- {} -- {} {} {:?} -- {} -- {} \"{}\", \"{}\" \"{}\"", + if !self.host.is_empty() { + self.host.as_str() + } else { + self.uri_host.as_str() + }, + self.client_addr, + self.method, + self.p_and_q, + self.version, + self.status, + if !self.uri_scheme.is_empty() && !self.uri_host.is_empty() { + format!("{}://{}", self.uri_scheme, self.uri_host) + } else { + "".to_string() + }, + self.ua, + self.xff, + self.upstream + ) + } +} + impl HttpMessageLog { pub fn client_addr(&mut self, client_addr: &SocketAddr) -> &mut Self { self.client_addr = client_addr.to_canonical().to_string(); @@ -74,26 +97,8 @@ impl HttpMessageLog { pub fn output(&self) { info!( - "{} <- {} -- {} {} {:?} -- {} -- {} \"{}\", \"{}\" \"{}\"", - if !self.host.is_empty() { - self.host.as_str() - } else { - self.uri_host.as_str() - }, - self.client_addr, - self.method, - self.p_and_q, - self.version, - self.status, - if !self.uri_scheme.is_empty() && !self.uri_host.is_empty() { - format!("{}://{}", self.uri_scheme, self.uri_host) - } else { - "".to_string() - }, - self.ua, - self.xff, - self.upstream, - // self.tls_server_name + name: crate::constants::log_event_names::ACCESS_LOG, + "{}", self ); } } diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index c1c293f..3d5143f 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -33,7 +33,7 @@ where <>::BidiStream as BidiStream>::SendStream: Send, { let mut h3_conn = h3::server::Connection::<_, Bytes>::new(quic_connection).await?; - info!( + debug!( "QUIC/HTTP3 connection established from {:?} {}", client_addr, <&ServerName as TryInto>::try_into(&tls_server_name).unwrap_or_default() @@ -115,7 +115,7 @@ where let mut sender = body_sender; let mut size = 0usize; while let Some(mut body) = recv_stream.recv_data().await? { - debug!("HTTP/3 incoming request body: remaining {}", body.remaining()); + trace!("HTTP/3 incoming request body: remaining {}", body.remaining()); size += body.remaining(); if size > max_body_size { error!( @@ -131,7 +131,7 @@ where // trailers: use inner for work around. (directly get trailer) let trailers = futures_util::future::poll_fn(|cx| recv_stream.as_mut().poll_recv_trailers(cx)).await?; if trailers.is_some() { - debug!("HTTP/3 incoming request trailers"); + trace!("HTTP/3 incoming request trailers"); sender.send_trailers(trailers.unwrap()).await?; } Ok(()) as RpxyResult<()> @@ -154,13 +154,13 @@ where match send_stream.send_response(new_res).await { Ok(_) => { - debug!("HTTP/3 response to connection successful"); + trace!("HTTP/3 response to connection successful"); // on-demand body streaming to downstream without expanding the object onto memory. loop { let frame = match new_body.frame().await { Some(frame) => frame, None => { - debug!("Response body finished"); + trace!("Response body finished"); break; } } diff --git a/rpxy-lib/src/proxy/proxy_quic_quinn.rs b/rpxy-lib/src/proxy/proxy_quic_quinn.rs index c316ed9..4fd4c38 100644 --- a/rpxy-lib/src/proxy/proxy_quic_quinn.rs +++ b/rpxy-lib/src/proxy/proxy_quic_quinn.rs @@ -2,8 +2,8 @@ use super::{proxy_main::Proxy, socket::bind_udp_socket}; use crate::{error::*, log::*, name_exp::ByteName}; use hyper_util::client::legacy::connect::Connect; use quinn::{ - crypto::rustls::{HandshakeData, QuicServerConfig}, Endpoint, TransportConfig, + crypto::rustls::{HandshakeData, QuicServerConfig}, }; use rpxy_certs::ServerCrypto; use rustls::ServerConfig; @@ -82,7 +82,7 @@ where let client_addr = incoming.remote_address(); let quic_connection = match incoming.await { Ok(new_conn) => { - info!("New connection established"); + trace!("New connection established"); h3_quinn::Connection::new(new_conn) }, Err(e) => { diff --git a/rpxy-lib/src/proxy/proxy_quic_s2n.rs b/rpxy-lib/src/proxy/proxy_quic_s2n.rs index 0fd1c7c..70659dd 100644 --- a/rpxy-lib/src/proxy/proxy_quic_s2n.rs +++ b/rpxy-lib/src/proxy/proxy_quic_s2n.rs @@ -110,7 +110,7 @@ where // quic event loop. this immediately cancels when crypto is updated by tokio::select! while let Some(new_conn) = server.accept().await { - debug!("New QUIC connection established"); + trace!("New QUIC connection established"); let Ok(Some(new_server_name)) = new_conn.server_name() else { warn!("HTTP/3 no SNI is given"); continue;