diff --git a/Cargo.toml b/Cargo.toml index f51c4db..355e5b1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,5 @@ [workspace.package] -version = "0.8.0" +version = "0.8.1" authors = ["Jun Kurihara"] homepage = "https://github.com/junkurihara/rust-rpxy" repository = "https://github.com/junkurihara/rust-rpxy" diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index 1905692..24451f5 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -13,8 +13,8 @@ publish.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["http3-quinn", "cache", "rustls-backend"] -# default = ["http3-s2n", "cache", "rustls-backend"] +# default = ["http3-quinn", "cache", "rustls-backend"] +default = ["http3-s2n", "cache", "rustls-backend"] http3-quinn = ["rpxy-lib/http3-quinn"] http3-s2n = ["rpxy-lib/http3-s2n"] native-tls-backend = ["rpxy-lib/native-tls-backend"] @@ -42,7 +42,7 @@ async-trait = "0.1.80" # config -clap = { version = "4.5.7", features = ["std", "cargo", "wrap_help"] } +clap = { version = "4.5.8", features = ["std", "cargo", "wrap_help"] } toml = { version = "0.8.14", default-features = false, features = ["parse"] } hot_reload = "0.1.5" diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index ea8feb0..1c8a657 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -13,11 +13,10 @@ publish.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -# default = ["http3-s2n", "sticky-cookie", "cache", "rustls-backend"] -default = ["http3-quinn", "sticky-cookie", "cache", "rustls-backend"] +default = ["http3-s2n", "sticky-cookie", "cache", "rustls-backend"] +# default = ["http3-quinn", "sticky-cookie", "cache", "rustls-backend"] http3-quinn = ["socket2", "quinn", "h3", "h3-quinn", "rpxy-certs/http3"] http3-s2n = [ - "h3", "s2n-quic", "s2n-quic-core", "s2n-quic-rustls", @@ -54,8 +53,8 @@ thiserror = "1.0.61" # http for both server and client http = "1.1.0" http-body-util = "0.1.2" -hyper = { version = "1.3.1", default-features = false } -hyper-util = { version = "0.1.5", features = ["full"] } +hyper = { version = "1.4.0", default-features = false } +hyper-util = { version = "0.1.6", features = ["full"] } futures-util = { version = "0.3.30", default-features = false } futures-channel = { version = "0.3.30", default-features = false } @@ -84,9 +83,11 @@ tracing = { version = "0.1.40" } # http/3 quinn = { version = "0.11.2", optional = true } -h3 = { version = "0.0.5", optional = true } -h3-quinn = { version = "0.0.6", optional = true } -s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true } +h3 = { version = "0.0.6", features = ["tracing"], optional = true } +h3-quinn = { version = "0.0.7", optional = true } +s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", features = [ + "tracing", +], optional = true } s2n-quic = { version = "1.41.0", default-features = false, features = [ "provider-tls-rustls", ], optional = true } diff --git a/rpxy-lib/src/error.rs b/rpxy-lib/src/error.rs index 0b7741f..3eb09e4 100644 --- a/rpxy-lib/src/error.rs +++ b/rpxy-lib/src/error.rs @@ -36,9 +36,12 @@ pub enum RpxyError { HyperBodyError(#[from] hyper::Error), // http/3 errors - #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] + #[cfg(feature = "http3-quinn")] #[error("H3 error: {0}")] H3Error(#[from] h3::Error), + #[cfg(feature = "http3-s2n")] + #[error("H3 error: {0}")] + H3Error(#[from] s2n_quic_h3::h3::Error), #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] #[error("Exceeds max request body size for HTTP/3")] H3TooLargeBody, diff --git a/rpxy-lib/src/proxy/proxy_h3.rs b/rpxy-lib/src/proxy/proxy_h3.rs index 998b6ee..358dd9e 100644 --- a/rpxy-lib/src/proxy/proxy_h3.rs +++ b/rpxy-lib/src/proxy/proxy_h3.rs @@ -12,9 +12,9 @@ use hyper_util::client::legacy::connect::Connect; use std::net::SocketAddr; #[cfg(feature = "http3-quinn")] -use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream}; +use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, quic::OpenStreams, server::RequestStream}; #[cfg(all(feature = "http3-s2n", not(feature = "http3-quinn")))] -use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream}; +use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic, quic::OpenStreams, server::RequestStream}; impl Proxy where @@ -28,9 +28,9 @@ where ) -> RpxyResult<()> where C: ConnectionQuic, - >::BidiStream: BidiStream + Send + 'static, - <>::BidiStream as BidiStream>::RecvStream: Send, - <>::BidiStream as BidiStream>::SendStream: Send, + >::BidiStream: BidiStream + Send + 'static, + <>::BidiStream as BidiStream>::RecvStream: Send, + <>::BidiStream as BidiStream>::SendStream: Send, { let mut h3_conn = h3::server::Connection::<_, Bytes>::new(quic_connection).await?; info!( diff --git a/submodules/s2n-quic-h3/Cargo.toml b/submodules/s2n-quic-h3/Cargo.toml index 23f9951..f2a92de 100644 --- a/submodules/s2n-quic-h3/Cargo.toml +++ b/submodules/s2n-quic-h3/Cargo.toml @@ -12,8 +12,12 @@ publish = false [dependencies] bytes = { version = "1", default-features = false } futures = { version = "0.3", default-features = false } -h3 = "0.0.5" +h3 = { version = "0.0.6", features = ["tracing"] } # s2n-quic = { path = "../s2n-quic" } # s2n-quic-core = { path = "../s2n-quic-core" } s2n-quic = { version = "1.41.0" } s2n-quic-core = { version = "0.41.0" } +tracing = { version = "0.1.40", optional = true } + +[features] +tracing = ["dep:tracing"] diff --git a/submodules/s2n-quic-h3/src/s2n_quic.rs b/submodules/s2n-quic-h3/src/s2n_quic.rs index e94a710..abe0a33 100644 --- a/submodules/s2n-quic-h3/src/s2n_quic.rs +++ b/submodules/s2n-quic-h3/src/s2n_quic.rs @@ -13,6 +13,9 @@ use std::{ task::{self, Poll}, }; +#[cfg(feature = "tracing")] +use tracing::instrument; + pub struct Connection { conn: s2n_quic::connection::Handle, bidi_acceptor: s2n_quic::connection::BidirectionalStreamAcceptor, @@ -66,16 +69,15 @@ impl quic::Connection for Connection where B: Buf, { - type BidiStream = BidiStream; - type SendStream = SendStream; type RecvStream = RecvStream; type OpenStreams = OpenStreams; - type Error = ConnectionError; + type AcceptError = ConnectionError; + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn poll_accept_recv( &mut self, cx: &mut task::Context<'_>, - ) -> Poll, Self::Error>> { + ) -> Poll, Self::AcceptError>> { let recv = match ready!(self.recv_acceptor.poll_accept_receive_stream(cx))? { Some(x) => x, None => return Poll::Ready(Ok(None)), @@ -83,10 +85,11 @@ where Poll::Ready(Ok(Some(Self::RecvStream::new(recv)))) } + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn poll_accept_bidi( &mut self, cx: &mut task::Context<'_>, - ) -> Poll, Self::Error>> { + ) -> Poll, Self::AcceptError>> { let (recv, send) = match ready!(self.bidi_acceptor.poll_accept_bidirectional_stream(cx))? { Some(x) => x.split(), None => return Poll::Ready(Ok(None)), @@ -97,28 +100,41 @@ where }))) } - fn poll_open_bidi( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll> { - let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?; - Ok(stream.into()).into() - } - - fn poll_open_send( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll> { - let stream = ready!(self.conn.poll_open_send_stream(cx))?; - Ok(stream.into()).into() - } - fn opener(&self) -> Self::OpenStreams { OpenStreams { conn: self.conn.clone(), } } +} +impl quic::OpenStreams for Connection +where + B: Buf, +{ + type BidiStream = BidiStream; + type SendStream = SendStream; + type OpenError = ConnectionError; + + + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] + fn poll_open_bidi( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?; + Ok(stream.into()).into() + } + + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] + fn poll_open_send( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + let stream = ready!(self.conn.poll_open_send_stream(cx))?; + Ok(stream.into()).into() + } + + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn close(&mut self, code: h3::error::Code, _reason: &[u8]) { self.conn.close( code.value() @@ -138,25 +154,27 @@ where { type BidiStream = BidiStream; type SendStream = SendStream; - type RecvStream = RecvStream; - type Error = ConnectionError; + type OpenError = ConnectionError; + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn poll_open_bidi( &mut self, cx: &mut task::Context<'_>, - ) -> Poll> { + ) -> Poll> { let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?; Ok(stream.into()).into() } + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn poll_open_send( &mut self, cx: &mut task::Context<'_>, - ) -> Poll> { + ) -> Poll> { let stream = ready!(self.conn.poll_open_send_stream(cx))?; Ok(stream.into()).into() } + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn close(&mut self, code: h3::error::Code, _reason: &[u8]) { self.conn.close( code.value() @@ -271,6 +289,7 @@ impl quic::RecvStream for RecvStream { type Buf = Bytes; type Error = ReadError; + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn poll_data( &mut self, cx: &mut task::Context<'_>, @@ -279,6 +298,7 @@ impl quic::RecvStream for RecvStream { Ok(buf).into() } + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn stop_sending(&mut self, error_code: u64) { let _ = self.stream.stop_sending( s2n_quic::application::Error::new(error_code) @@ -286,6 +306,7 @@ impl quic::RecvStream for RecvStream { ); } + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn recv_id(&self) -> StreamId { self.stream.id().try_into().expect("invalid stream id") } @@ -369,6 +390,7 @@ where { type Error = SendStreamError; + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { loop { // try to flush the current chunk if we have one @@ -409,6 +431,7 @@ where // Poll::Ready(Ok(())) } + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn send_data>>(&mut self, data: D) -> Result<(), Self::Error> { if self.buf.is_some() { return Err(Self::Error::NotReady); @@ -427,6 +450,7 @@ where // Ok(()) } + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll> { // ensure all chunks are flushed to the QUIC stream before finishing ready!(self.poll_ready(cx))?; @@ -434,12 +458,14 @@ where Ok(()).into() } + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn reset(&mut self, reset_code: u64) { let _ = self .stream .reset(reset_code.try_into().unwrap_or_else(|_| VarInt::MAX.into())); } + #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] fn send_id(&self) -> StreamId { self.stream.id().try_into().expect("invalid stream id") }