deps: h3-0.0.6

This commit is contained in:
Jun Kurihara 2024-07-02 17:03:05 +09:00
commit 9f88bfcafa
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
7 changed files with 78 additions and 44 deletions

View file

@ -1,5 +1,5 @@
[workspace.package] [workspace.package]
version = "0.8.0" version = "0.8.1"
authors = ["Jun Kurihara"] authors = ["Jun Kurihara"]
homepage = "https://github.com/junkurihara/rust-rpxy" homepage = "https://github.com/junkurihara/rust-rpxy"
repository = "https://github.com/junkurihara/rust-rpxy" repository = "https://github.com/junkurihara/rust-rpxy"

View file

@ -13,8 +13,8 @@ publish.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
default = ["http3-quinn", "cache", "rustls-backend"] # default = ["http3-quinn", "cache", "rustls-backend"]
# default = ["http3-s2n", "cache", "rustls-backend"] default = ["http3-s2n", "cache", "rustls-backend"]
http3-quinn = ["rpxy-lib/http3-quinn"] http3-quinn = ["rpxy-lib/http3-quinn"]
http3-s2n = ["rpxy-lib/http3-s2n"] http3-s2n = ["rpxy-lib/http3-s2n"]
native-tls-backend = ["rpxy-lib/native-tls-backend"] native-tls-backend = ["rpxy-lib/native-tls-backend"]
@ -42,7 +42,7 @@ async-trait = "0.1.80"
# config # config
clap = { version = "4.5.7", features = ["std", "cargo", "wrap_help"] } clap = { version = "4.5.8", features = ["std", "cargo", "wrap_help"] }
toml = { version = "0.8.14", default-features = false, features = ["parse"] } toml = { version = "0.8.14", default-features = false, features = ["parse"] }
hot_reload = "0.1.5" hot_reload = "0.1.5"

View file

@ -13,11 +13,10 @@ publish.workspace = true
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[features] [features]
# default = ["http3-s2n", "sticky-cookie", "cache", "rustls-backend"] default = ["http3-s2n", "sticky-cookie", "cache", "rustls-backend"]
default = ["http3-quinn", "sticky-cookie", "cache", "rustls-backend"] # default = ["http3-quinn", "sticky-cookie", "cache", "rustls-backend"]
http3-quinn = ["socket2", "quinn", "h3", "h3-quinn", "rpxy-certs/http3"] http3-quinn = ["socket2", "quinn", "h3", "h3-quinn", "rpxy-certs/http3"]
http3-s2n = [ http3-s2n = [
"h3",
"s2n-quic", "s2n-quic",
"s2n-quic-core", "s2n-quic-core",
"s2n-quic-rustls", "s2n-quic-rustls",
@ -54,8 +53,8 @@ thiserror = "1.0.61"
# http for both server and client # http for both server and client
http = "1.1.0" http = "1.1.0"
http-body-util = "0.1.2" http-body-util = "0.1.2"
hyper = { version = "1.3.1", default-features = false } hyper = { version = "1.4.0", default-features = false }
hyper-util = { version = "0.1.5", features = ["full"] } hyper-util = { version = "0.1.6", features = ["full"] }
futures-util = { version = "0.3.30", default-features = false } futures-util = { version = "0.3.30", default-features = false }
futures-channel = { version = "0.3.30", default-features = false } futures-channel = { version = "0.3.30", default-features = false }
@ -84,9 +83,11 @@ tracing = { version = "0.1.40" }
# http/3 # http/3
quinn = { version = "0.11.2", optional = true } quinn = { version = "0.11.2", optional = true }
h3 = { version = "0.0.5", optional = true } h3 = { version = "0.0.6", features = ["tracing"], optional = true }
h3-quinn = { version = "0.0.6", optional = true } h3-quinn = { version = "0.0.7", optional = true }
s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true } s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", features = [
"tracing",
], optional = true }
s2n-quic = { version = "1.41.0", default-features = false, features = [ s2n-quic = { version = "1.41.0", default-features = false, features = [
"provider-tls-rustls", "provider-tls-rustls",
], optional = true } ], optional = true }

View file

@ -36,9 +36,12 @@ pub enum RpxyError {
HyperBodyError(#[from] hyper::Error), HyperBodyError(#[from] hyper::Error),
// http/3 errors // http/3 errors
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] #[cfg(feature = "http3-quinn")]
#[error("H3 error: {0}")] #[error("H3 error: {0}")]
H3Error(#[from] h3::Error), H3Error(#[from] h3::Error),
#[cfg(feature = "http3-s2n")]
#[error("H3 error: {0}")]
H3Error(#[from] s2n_quic_h3::h3::Error),
#[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))] #[cfg(any(feature = "http3-quinn", feature = "http3-s2n"))]
#[error("Exceeds max request body size for HTTP/3")] #[error("Exceeds max request body size for HTTP/3")]
H3TooLargeBody, H3TooLargeBody,

View file

@ -12,9 +12,9 @@ use hyper_util::client::legacy::connect::Connect;
use std::net::SocketAddr; use std::net::SocketAddr;
#[cfg(feature = "http3-quinn")] #[cfg(feature = "http3-quinn")]
use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream}; use h3::{quic::BidiStream, quic::Connection as ConnectionQuic, quic::OpenStreams, server::RequestStream};
#[cfg(all(feature = "http3-s2n", not(feature = "http3-quinn")))] #[cfg(all(feature = "http3-s2n", not(feature = "http3-quinn")))]
use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic, server::RequestStream}; use s2n_quic_h3::h3::{self, quic::BidiStream, quic::Connection as ConnectionQuic, quic::OpenStreams, server::RequestStream};
impl<T> Proxy<T> impl<T> Proxy<T>
where where
@ -28,9 +28,9 @@ where
) -> RpxyResult<()> ) -> RpxyResult<()>
where where
C: ConnectionQuic<Bytes>, C: ConnectionQuic<Bytes>,
<C as ConnectionQuic<Bytes>>::BidiStream: BidiStream<Bytes> + Send + 'static, <C as OpenStreams<Bytes>>::BidiStream: BidiStream<Bytes> + Send + 'static,
<<C as ConnectionQuic<Bytes>>::BidiStream as BidiStream<Bytes>>::RecvStream: Send, <<C as OpenStreams<Bytes>>::BidiStream as BidiStream<Bytes>>::RecvStream: Send,
<<C as ConnectionQuic<Bytes>>::BidiStream as BidiStream<Bytes>>::SendStream: Send, <<C as OpenStreams<Bytes>>::BidiStream as BidiStream<Bytes>>::SendStream: Send,
{ {
let mut h3_conn = h3::server::Connection::<_, Bytes>::new(quic_connection).await?; let mut h3_conn = h3::server::Connection::<_, Bytes>::new(quic_connection).await?;
info!( info!(

View file

@ -12,8 +12,12 @@ publish = false
[dependencies] [dependencies]
bytes = { version = "1", default-features = false } bytes = { version = "1", default-features = false }
futures = { version = "0.3", default-features = false } futures = { version = "0.3", default-features = false }
h3 = "0.0.5" h3 = { version = "0.0.6", features = ["tracing"] }
# s2n-quic = { path = "../s2n-quic" } # s2n-quic = { path = "../s2n-quic" }
# s2n-quic-core = { path = "../s2n-quic-core" } # s2n-quic-core = { path = "../s2n-quic-core" }
s2n-quic = { version = "1.41.0" } s2n-quic = { version = "1.41.0" }
s2n-quic-core = { version = "0.41.0" } s2n-quic-core = { version = "0.41.0" }
tracing = { version = "0.1.40", optional = true }
[features]
tracing = ["dep:tracing"]

View file

@ -13,6 +13,9 @@ use std::{
task::{self, Poll}, task::{self, Poll},
}; };
#[cfg(feature = "tracing")]
use tracing::instrument;
pub struct Connection { pub struct Connection {
conn: s2n_quic::connection::Handle, conn: s2n_quic::connection::Handle,
bidi_acceptor: s2n_quic::connection::BidirectionalStreamAcceptor, bidi_acceptor: s2n_quic::connection::BidirectionalStreamAcceptor,
@ -66,16 +69,15 @@ impl<B> quic::Connection<B> for Connection
where where
B: Buf, B: Buf,
{ {
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type RecvStream = RecvStream; type RecvStream = RecvStream;
type OpenStreams = OpenStreams; type OpenStreams = OpenStreams;
type Error = ConnectionError; type AcceptError = ConnectionError;
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_accept_recv( fn poll_accept_recv(
&mut self, &mut self,
cx: &mut task::Context<'_>, cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::RecvStream>, Self::Error>> { ) -> Poll<Result<Option<Self::RecvStream>, Self::AcceptError>> {
let recv = match ready!(self.recv_acceptor.poll_accept_receive_stream(cx))? { let recv = match ready!(self.recv_acceptor.poll_accept_receive_stream(cx))? {
Some(x) => x, Some(x) => x,
None => return Poll::Ready(Ok(None)), None => return Poll::Ready(Ok(None)),
@ -83,10 +85,11 @@ where
Poll::Ready(Ok(Some(Self::RecvStream::new(recv)))) Poll::Ready(Ok(Some(Self::RecvStream::new(recv))))
} }
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_accept_bidi( fn poll_accept_bidi(
&mut self, &mut self,
cx: &mut task::Context<'_>, cx: &mut task::Context<'_>,
) -> Poll<Result<Option<Self::BidiStream>, Self::Error>> { ) -> Poll<Result<Option<Self::BidiStream>, Self::AcceptError>> {
let (recv, send) = match ready!(self.bidi_acceptor.poll_accept_bidirectional_stream(cx))? { let (recv, send) = match ready!(self.bidi_acceptor.poll_accept_bidirectional_stream(cx))? {
Some(x) => x.split(), Some(x) => x.split(),
None => return Poll::Ready(Ok(None)), None => return Poll::Ready(Ok(None)),
@ -97,28 +100,41 @@ where
}))) })))
} }
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>> {
let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?;
Ok(stream.into()).into()
}
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>> {
let stream = ready!(self.conn.poll_open_send_stream(cx))?;
Ok(stream.into()).into()
}
fn opener(&self) -> Self::OpenStreams { fn opener(&self) -> Self::OpenStreams {
OpenStreams { OpenStreams {
conn: self.conn.clone(), conn: self.conn.clone(),
} }
} }
}
impl<B> quic::OpenStreams<B> for Connection
where
B: Buf,
{
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type OpenError = ConnectionError;
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_bidi(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?;
Ok(stream.into()).into()
}
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_send(
&mut self,
cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_send_stream(cx))?;
Ok(stream.into()).into()
}
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn close(&mut self, code: h3::error::Code, _reason: &[u8]) { fn close(&mut self, code: h3::error::Code, _reason: &[u8]) {
self.conn.close( self.conn.close(
code.value() code.value()
@ -138,25 +154,27 @@ where
{ {
type BidiStream = BidiStream<B>; type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>; type SendStream = SendStream<B>;
type RecvStream = RecvStream; type OpenError = ConnectionError;
type Error = ConnectionError;
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_bidi( fn poll_open_bidi(
&mut self, &mut self,
cx: &mut task::Context<'_>, cx: &mut task::Context<'_>,
) -> Poll<Result<Self::BidiStream, Self::Error>> { ) -> Poll<Result<Self::BidiStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?; let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?;
Ok(stream.into()).into() Ok(stream.into()).into()
} }
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_open_send( fn poll_open_send(
&mut self, &mut self,
cx: &mut task::Context<'_>, cx: &mut task::Context<'_>,
) -> Poll<Result<Self::SendStream, Self::Error>> { ) -> Poll<Result<Self::SendStream, Self::OpenError>> {
let stream = ready!(self.conn.poll_open_send_stream(cx))?; let stream = ready!(self.conn.poll_open_send_stream(cx))?;
Ok(stream.into()).into() Ok(stream.into()).into()
} }
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn close(&mut self, code: h3::error::Code, _reason: &[u8]) { fn close(&mut self, code: h3::error::Code, _reason: &[u8]) {
self.conn.close( self.conn.close(
code.value() code.value()
@ -271,6 +289,7 @@ impl quic::RecvStream for RecvStream {
type Buf = Bytes; type Buf = Bytes;
type Error = ReadError; type Error = ReadError;
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_data( fn poll_data(
&mut self, &mut self,
cx: &mut task::Context<'_>, cx: &mut task::Context<'_>,
@ -279,6 +298,7 @@ impl quic::RecvStream for RecvStream {
Ok(buf).into() Ok(buf).into()
} }
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn stop_sending(&mut self, error_code: u64) { fn stop_sending(&mut self, error_code: u64) {
let _ = self.stream.stop_sending( let _ = self.stream.stop_sending(
s2n_quic::application::Error::new(error_code) s2n_quic::application::Error::new(error_code)
@ -286,6 +306,7 @@ impl quic::RecvStream for RecvStream {
); );
} }
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn recv_id(&self) -> StreamId { fn recv_id(&self) -> StreamId {
self.stream.id().try_into().expect("invalid stream id") self.stream.id().try_into().expect("invalid stream id")
} }
@ -369,6 +390,7 @@ where
{ {
type Error = SendStreamError; type Error = SendStreamError;
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
loop { loop {
// try to flush the current chunk if we have one // try to flush the current chunk if we have one
@ -409,6 +431,7 @@ where
// Poll::Ready(Ok(())) // Poll::Ready(Ok(()))
} }
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> { fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
if self.buf.is_some() { if self.buf.is_some() {
return Err(Self::Error::NotReady); return Err(Self::Error::NotReady);
@ -427,6 +450,7 @@ where
// Ok(()) // Ok(())
} }
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> { fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
// ensure all chunks are flushed to the QUIC stream before finishing // ensure all chunks are flushed to the QUIC stream before finishing
ready!(self.poll_ready(cx))?; ready!(self.poll_ready(cx))?;
@ -434,12 +458,14 @@ where
Ok(()).into() Ok(()).into()
} }
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn reset(&mut self, reset_code: u64) { fn reset(&mut self, reset_code: u64) {
let _ = self let _ = self
.stream .stream
.reset(reset_code.try_into().unwrap_or_else(|_| VarInt::MAX.into())); .reset(reset_code.try_into().unwrap_or_else(|_| VarInt::MAX.into()));
} }
#[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))]
fn send_id(&self) -> StreamId { fn send_id(&self) -> StreamId {
self.stream.id().try_into().expect("invalid stream id") self.stream.id().try_into().expect("invalid stream id")
} }