diff --git a/.gitmodules b/.gitmodules index c07680b..0d6a404 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,6 +1,3 @@ [submodule "submodules/rusty-http-cache-semantics"] path = submodules/rusty-http-cache-semantics url = git@github.com:junkurihara/rusty-http-cache-semantics.git -[submodule "submodules/s2n-quic"] - path = submodules/s2n-quic - url = git@github.com:junkurihara/s2n-quic.git diff --git a/rpxy-bin/Cargo.toml b/rpxy-bin/Cargo.toml index 9ea1935..ad604dc 100644 --- a/rpxy-bin/Cargo.toml +++ b/rpxy-bin/Cargo.toml @@ -13,8 +13,8 @@ publish.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = ["http3-quinn", "cache", "rustls-backend"] -# default = ["http3-s2n", "cache", "rustls-backend"] +# default = ["http3-quinn", "cache", "rustls-backend"] +default = ["http3-s2n", "cache", "rustls-backend"] http3-quinn = ["rpxy-lib/http3-quinn"] http3-s2n = ["rpxy-lib/http3-s2n"] native-tls-backend = ["rpxy-lib/native-tls-backend"] diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index f279a4e..a66b638 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -13,8 +13,8 @@ publish.workspace = true # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -# default = ["http3-s2n", "sticky-cookie", "cache", "rustls-backend"] -default = ["http3-quinn", "sticky-cookie", "cache", "rustls-backend"] +default = ["http3-s2n", "sticky-cookie", "cache", "rustls-backend"] +# default = ["http3-quinn", "sticky-cookie", "cache", "rustls-backend"] http3-quinn = ["socket2", "quinn", "h3", "h3-quinn", "rpxy-certs/http3"] http3-s2n = [ "h3", @@ -84,19 +84,12 @@ tracing = { version = "0.1.40" } quinn = { version = "0.11.1", optional = true } h3 = { version = "0.0.5", optional = true } h3-quinn = { version = "0.0.6", optional = true } -### TODO: workaround for s2n-quic, waiting for release of s2n-quic-0.38.0 -s2n-quic = { path = "../submodules/s2n-quic/quic/s2n-quic", optional = true, default-features = false, features = [ +s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true } # pin to 1.38.1 +s2n-quic = { version = "1.38.1", default-features = false, features = [ "provider-tls-rustls", -] } -s2n-quic-core = { path = "../submodules/s2n-quic/quic/s2n-quic-core", optional = true, default-features = false } -s2n-quic-rustls = { path = "../submodules/s2n-quic/quic/s2n-quic-rustls", optional = true } -s2n-quic-h3 = { path = "../submodules/s2n-quic/quic/s2n-quic-h3", optional = true } -# s2n-quic = { version = "1.37.0", default-features = false, features = [ -# "provider-tls-rustls", -# ], optional = true } -# s2n-quic-core = { version = "0.37.0", default-features = false, optional = true } -# s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", optional = true } -# s2n-quic-rustls = { version = "0.37.0", optional = true } +], optional = true } +s2n-quic-core = { version = "0.38.1", default-features = false, optional = true } +s2n-quic-rustls = { version = "0.38.1", optional = true } ########## # for UDP socket wit SO_REUSEADDR when h3 with quinn socket2 = { version = "0.5.7", features = ["all"], optional = true } diff --git a/submodules/rusty-http-cache-semantics b/submodules/rusty-http-cache-semantics index 88d23c2..08a6b5a 160000 --- a/submodules/rusty-http-cache-semantics +++ b/submodules/rusty-http-cache-semantics @@ -1 +1 @@ -Subproject commit 88d23c2f5a3ac36295dff4a804968c43932ba46b +Subproject commit 08a6b5a9dcb6f7d960007ae9c4265fe67851abfb diff --git a/submodules/s2n-quic b/submodules/s2n-quic deleted file mode 160000 index d90729d..0000000 --- a/submodules/s2n-quic +++ /dev/null @@ -1 +0,0 @@ -Subproject commit d90729de3f6d1fdc76ddff734591cfc2d8e61e80 diff --git a/submodules/s2n-quic-h3/Cargo.toml b/submodules/s2n-quic-h3/Cargo.toml new file mode 100644 index 0000000..44e3046 --- /dev/null +++ b/submodules/s2n-quic-h3/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "s2n-quic-h3" +# this in an unpublished internal crate so the version should not be changed +version = "0.1.0" +authors = ["AWS s2n"] +edition = "2021" +rust-version = "1.71" +license = "Apache-2.0" +# this contains an http3 implementation for testing purposes and should not be published +publish = false + +[dependencies] +bytes = { version = "1", default-features = false } +futures = { version = "0.3", default-features = false } +h3 = "0.0.5" +# s2n-quic = { path = "../s2n-quic" } +# s2n-quic-core = { path = "../s2n-quic-core" } +s2n-quic = { version = "1.38.1" } +s2n-quic-core = { version = "0.38.1" } diff --git a/submodules/s2n-quic-h3/README.md b/submodules/s2n-quic-h3/README.md new file mode 100644 index 0000000..aed9475 --- /dev/null +++ b/submodules/s2n-quic-h3/README.md @@ -0,0 +1,10 @@ +# s2n-quic-h3 + +This is an internal crate used by [s2n-quic](https://github.com/aws/s2n-quic) written as a proof of concept for implementing HTTP3 on top of s2n-quic. The API is not currently stable and should not be used directly. + +## License + +This project is licensed under the [Apache-2.0 License][license-url]. + +[license-badge]: https://img.shields.io/badge/license-apache-blue.svg +[license-url]: https://aws.amazon.com/apache-2-0/ diff --git a/submodules/s2n-quic-h3/src/lib.rs b/submodules/s2n-quic-h3/src/lib.rs new file mode 100644 index 0000000..c85f197 --- /dev/null +++ b/submodules/s2n-quic-h3/src/lib.rs @@ -0,0 +1,7 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +mod s2n_quic; + +pub use self::s2n_quic::*; +pub use h3; diff --git a/submodules/s2n-quic-h3/src/s2n_quic.rs b/submodules/s2n-quic-h3/src/s2n_quic.rs new file mode 100644 index 0000000..e94a710 --- /dev/null +++ b/submodules/s2n-quic-h3/src/s2n_quic.rs @@ -0,0 +1,506 @@ +// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use bytes::{Buf, Bytes}; +use core::task::ready; +use h3::quic::{self, Error, StreamId, WriteBuf}; +use s2n_quic::stream::{BidirectionalStream, ReceiveStream}; +use s2n_quic_core::varint::VarInt; +use std::{ + convert::TryInto, + fmt::{self, Display}, + sync::Arc, + task::{self, Poll}, +}; + +pub struct Connection { + conn: s2n_quic::connection::Handle, + bidi_acceptor: s2n_quic::connection::BidirectionalStreamAcceptor, + recv_acceptor: s2n_quic::connection::ReceiveStreamAcceptor, +} + +impl Connection { + pub fn new(new_conn: s2n_quic::Connection) -> Self { + let (handle, acceptor) = new_conn.split(); + let (bidi, recv) = acceptor.split(); + + Self { + conn: handle, + bidi_acceptor: bidi, + recv_acceptor: recv, + } + } +} + +#[derive(Debug)] +pub struct ConnectionError(s2n_quic::connection::Error); + +impl std::error::Error for ConnectionError {} + +impl fmt::Display for ConnectionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl Error for ConnectionError { + fn is_timeout(&self) -> bool { + matches!(self.0, s2n_quic::connection::Error::IdleTimerExpired { .. }) + } + + fn err_code(&self) -> Option { + match self.0 { + s2n_quic::connection::Error::Application { error, .. } => Some(error.into()), + _ => None, + } + } +} + +impl From for ConnectionError { + fn from(e: s2n_quic::connection::Error) -> Self { + Self(e) + } +} + +impl quic::Connection for Connection +where + B: Buf, +{ + type BidiStream = BidiStream; + type SendStream = SendStream; + type RecvStream = RecvStream; + type OpenStreams = OpenStreams; + type Error = ConnectionError; + + fn poll_accept_recv( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>> { + let recv = match ready!(self.recv_acceptor.poll_accept_receive_stream(cx))? { + Some(x) => x, + None => return Poll::Ready(Ok(None)), + }; + Poll::Ready(Ok(Some(Self::RecvStream::new(recv)))) + } + + fn poll_accept_bidi( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>> { + let (recv, send) = match ready!(self.bidi_acceptor.poll_accept_bidirectional_stream(cx))? { + Some(x) => x.split(), + None => return Poll::Ready(Ok(None)), + }; + Poll::Ready(Ok(Some(Self::BidiStream { + send: Self::SendStream::new(send), + recv: Self::RecvStream::new(recv), + }))) + } + + fn poll_open_bidi( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?; + Ok(stream.into()).into() + } + + fn poll_open_send( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + let stream = ready!(self.conn.poll_open_send_stream(cx))?; + Ok(stream.into()).into() + } + + fn opener(&self) -> Self::OpenStreams { + OpenStreams { + conn: self.conn.clone(), + } + } + + fn close(&mut self, code: h3::error::Code, _reason: &[u8]) { + self.conn.close( + code.value() + .try_into() + .expect("s2n-quic supports error codes up to 2^62-1"), + ); + } +} + +pub struct OpenStreams { + conn: s2n_quic::connection::Handle, +} + +impl quic::OpenStreams for OpenStreams +where + B: Buf, +{ + type BidiStream = BidiStream; + type SendStream = SendStream; + type RecvStream = RecvStream; + type Error = ConnectionError; + + fn poll_open_bidi( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?; + Ok(stream.into()).into() + } + + fn poll_open_send( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + let stream = ready!(self.conn.poll_open_send_stream(cx))?; + Ok(stream.into()).into() + } + + fn close(&mut self, code: h3::error::Code, _reason: &[u8]) { + self.conn.close( + code.value() + .try_into() + .unwrap_or_else(|_| VarInt::MAX.into()), + ); + } +} + +impl Clone for OpenStreams { + fn clone(&self) -> Self { + Self { + conn: self.conn.clone(), + } + } +} + +pub struct BidiStream +where + B: Buf, +{ + send: SendStream, + recv: RecvStream, +} + +impl quic::BidiStream for BidiStream +where + B: Buf, +{ + type SendStream = SendStream; + type RecvStream = RecvStream; + + fn split(self) -> (Self::SendStream, Self::RecvStream) { + (self.send, self.recv) + } +} + +impl quic::RecvStream for BidiStream +where + B: Buf, +{ + type Buf = Bytes; + type Error = ReadError; + + fn poll_data( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>> { + self.recv.poll_data(cx) + } + + fn stop_sending(&mut self, error_code: u64) { + self.recv.stop_sending(error_code) + } + + fn recv_id(&self) -> StreamId { + self.recv.recv_id() + } +} + +impl quic::SendStream for BidiStream +where + B: Buf, +{ + type Error = SendStreamError; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.send.poll_ready(cx) + } + + fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.send.poll_finish(cx) + } + + fn reset(&mut self, reset_code: u64) { + self.send.reset(reset_code) + } + + fn send_data>>(&mut self, data: D) -> Result<(), Self::Error> { + self.send.send_data(data) + } + + fn send_id(&self) -> StreamId { + self.send.send_id() + } +} + +impl From for BidiStream +where + B: Buf, +{ + fn from(bidi: BidirectionalStream) -> Self { + let (recv, send) = bidi.split(); + BidiStream { + send: send.into(), + recv: recv.into(), + } + } +} + +pub struct RecvStream { + stream: s2n_quic::stream::ReceiveStream, +} + +impl RecvStream { + fn new(stream: s2n_quic::stream::ReceiveStream) -> Self { + Self { stream } + } +} + +impl quic::RecvStream for RecvStream { + type Buf = Bytes; + type Error = ReadError; + + fn poll_data( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>> { + let buf = ready!(self.stream.poll_receive(cx))?; + Ok(buf).into() + } + + fn stop_sending(&mut self, error_code: u64) { + let _ = self.stream.stop_sending( + s2n_quic::application::Error::new(error_code) + .expect("s2n-quic supports error codes up to 2^62-1"), + ); + } + + fn recv_id(&self) -> StreamId { + self.stream.id().try_into().expect("invalid stream id") + } +} + +impl From for RecvStream { + fn from(recv: ReceiveStream) -> Self { + RecvStream::new(recv) + } +} + +#[derive(Debug)] +pub struct ReadError(s2n_quic::stream::Error); + +impl std::error::Error for ReadError {} + +impl fmt::Display for ReadError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl From for Arc { + fn from(e: ReadError) -> Self { + Arc::new(e) + } +} + +impl From for ReadError { + fn from(e: s2n_quic::stream::Error) -> Self { + Self(e) + } +} + +impl Error for ReadError { + fn is_timeout(&self) -> bool { + matches!( + self.0, + s2n_quic::stream::Error::ConnectionError { + error: s2n_quic::connection::Error::IdleTimerExpired { .. }, + .. + } + ) + } + + fn err_code(&self) -> Option { + match self.0 { + s2n_quic::stream::Error::ConnectionError { + error: s2n_quic::connection::Error::Application { error, .. }, + .. + } => Some(error.into()), + s2n_quic::stream::Error::StreamReset { error, .. } => Some(error.into()), + _ => None, + } + } +} + +pub struct SendStream { + stream: s2n_quic::stream::SendStream, + chunk: Option, + buf: Option>, // TODO: Replace with buf: PhantomData + // after https://github.com/hyperium/h3/issues/78 is resolved +} + +impl SendStream +where + B: Buf, +{ + fn new(stream: s2n_quic::stream::SendStream) -> SendStream { + Self { + stream, + chunk: None, + buf: Default::default(), + } + } +} + +impl quic::SendStream for SendStream +where + B: Buf, +{ + type Error = SendStreamError; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + loop { + // try to flush the current chunk if we have one + if let Some(chunk) = self.chunk.as_mut() { + ready!(self.stream.poll_send(chunk, cx))?; + + // s2n-quic will take the whole chunk on send, even if it exceeds the limits + debug_assert!(chunk.is_empty()); + self.chunk = None; + } + + // try to take the next chunk from the WriteBuf + if let Some(ref mut data) = self.buf { + let len = data.chunk().len(); + + // if the write buf is empty, then clear it and break + if len == 0 { + self.buf = None; + break; + } + + // copy the first chunk from WriteBuf and prepare it to flush + let chunk = data.copy_to_bytes(len); + self.chunk = Some(chunk); + + // loop back around to flush the chunk + continue; + } + + // if we didn't have either a chunk or WriteBuf, then we're ready + break; + } + + Poll::Ready(Ok(())) + + // TODO: Replace with following after https://github.com/hyperium/h3/issues/78 is resolved + // self.available_bytes = ready!(self.stream.poll_send_ready(cx))?; + // Poll::Ready(Ok(())) + } + + fn send_data>>(&mut self, data: D) -> Result<(), Self::Error> { + if self.buf.is_some() { + return Err(Self::Error::NotReady); + } + self.buf = Some(data.into()); + Ok(()) + + // TODO: Replace with following after https://github.com/hyperium/h3/issues/78 is resolved + // let mut data = data.into(); + // while self.available_bytes > 0 && data.has_remaining() { + // let len = data.chunk().len(); + // let chunk = data.copy_to_bytes(len); + // self.stream.send_data(chunk)?; + // self.available_bytes = self.available_bytes.saturating_sub(len); + // } + // Ok(()) + } + + fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll> { + // ensure all chunks are flushed to the QUIC stream before finishing + ready!(self.poll_ready(cx))?; + self.stream.finish()?; + Ok(()).into() + } + + fn reset(&mut self, reset_code: u64) { + let _ = self + .stream + .reset(reset_code.try_into().unwrap_or_else(|_| VarInt::MAX.into())); + } + + fn send_id(&self) -> StreamId { + self.stream.id().try_into().expect("invalid stream id") + } +} + +impl From for SendStream +where + B: Buf, +{ + fn from(send: s2n_quic::stream::SendStream) -> Self { + SendStream::new(send) + } +} + +#[derive(Debug)] +pub enum SendStreamError { + Write(s2n_quic::stream::Error), + NotReady, +} + +impl std::error::Error for SendStreamError {} + +impl Display for SendStreamError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{self:?}") + } +} + +impl From for SendStreamError { + fn from(e: s2n_quic::stream::Error) -> Self { + Self::Write(e) + } +} + +impl Error for SendStreamError { + fn is_timeout(&self) -> bool { + matches!( + self, + Self::Write(s2n_quic::stream::Error::ConnectionError { + error: s2n_quic::connection::Error::IdleTimerExpired { .. }, + .. + }) + ) + } + + fn err_code(&self) -> Option { + match self { + Self::Write(s2n_quic::stream::Error::StreamReset { error, .. }) => { + Some((*error).into()) + } + Self::Write(s2n_quic::stream::Error::ConnectionError { + error: s2n_quic::connection::Error::Application { error, .. }, + .. + }) => Some((*error).into()), + _ => None, + } + } +} + +impl From for Arc { + fn from(e: SendStreamError) -> Self { + Arc::new(e) + } +}