diff --git a/.gitmodules b/.gitmodules index 7ff65fe..a8c5d14 100644 --- a/.gitmodules +++ b/.gitmodules @@ -4,3 +4,7 @@ [submodule "submodules/rustls-acme"] path = submodules/rustls-acme url = git@github.com:junkurihara/rustls-acme.git +[submodule "submodules/s2n-quic"] + path = submodules/s2n-quic + url = git@github.com:junkurihara/s2n-quic.git + branch = rustls-pq diff --git a/rpxy-lib/Cargo.toml b/rpxy-lib/Cargo.toml index 2d46293..fb21a4f 100644 --- a/rpxy-lib/Cargo.toml +++ b/rpxy-lib/Cargo.toml @@ -32,6 +32,7 @@ post-quantum = [ "rustls-post-quantum", "rpxy-acme/post-quantum", "rpxy-certs/post-quantum", + "s2n-quic-rustls/post-quantum", ] [dependencies] @@ -93,12 +94,12 @@ tracing = { version = "0.1.40" } quinn = { version = "0.11.5", optional = true } h3 = { version = "0.0.6", features = ["tracing"], optional = true } h3-quinn = { version = "0.0.7", optional = true } -s2n-quic = { version = "1.48.0", default-features = false, features = [ +s2n-quic = { version = "1.48.0", path = "../submodules/s2n-quic/quic/s2n-quic/", default-features = false, features = [ "provider-tls-rustls", ], optional = true } -s2n-quic-core = { version = "0.48.0", default-features = false, optional = true } -s2n-quic-rustls = { version = "0.48.0", optional = true } -s2n-quic-h3 = { path = "../submodules/s2n-quic-h3/", features = [ +s2n-quic-core = { version = "0.48.0", path = "../submodules/s2n-quic/quic/s2n-quic-core", default-features = false, optional = true } +s2n-quic-rustls = { version = "0.48.0", path = "../submodules/s2n-quic/quic/s2n-quic-rustls", optional = true } +s2n-quic-h3 = { path = "../submodules/s2n-quic/quic/s2n-quic-h3/", features = [ "tracing", ], optional = true } ########## diff --git a/submodules/s2n-quic b/submodules/s2n-quic new file mode 160000 index 0000000..ffeaac1 --- /dev/null +++ b/submodules/s2n-quic @@ -0,0 +1 @@ +Subproject commit ffeaac1eb32589599c9be357f2273a2824741c7d diff --git a/submodules/s2n-quic-h3/Cargo.toml b/submodules/s2n-quic-h3/Cargo.toml deleted file mode 100644 index 0058e45..0000000 --- a/submodules/s2n-quic-h3/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "s2n-quic-h3" -# this in an unpublished internal crate so the version should not be changed -version = "0.1.0" -authors = ["AWS s2n"] -edition = "2021" -rust-version = "1.71" -license = "Apache-2.0" -# this contains an http3 implementation for testing purposes and should not be published -publish = false - -[dependencies] -bytes = { version = "1", default-features = false } -futures = { version = "0.3", default-features = false } -h3 = { version = "0.0.6", features = ["tracing"] } -# s2n-quic = { path = "../s2n-quic" } -s2n-quic = { version = "1.47.0" } -tracing = { version = "0.1.40", optional = true } - -[features] -tracing = ["dep:tracing"] diff --git a/submodules/s2n-quic-h3/README.md b/submodules/s2n-quic-h3/README.md deleted file mode 100644 index aed9475..0000000 --- a/submodules/s2n-quic-h3/README.md +++ /dev/null @@ -1,10 +0,0 @@ -# s2n-quic-h3 - -This is an internal crate used by [s2n-quic](https://github.com/aws/s2n-quic) written as a proof of concept for implementing HTTP3 on top of s2n-quic. The API is not currently stable and should not be used directly. - -## License - -This project is licensed under the [Apache-2.0 License][license-url]. - -[license-badge]: https://img.shields.io/badge/license-apache-blue.svg -[license-url]: https://aws.amazon.com/apache-2-0/ diff --git a/submodules/s2n-quic-h3/src/lib.rs b/submodules/s2n-quic-h3/src/lib.rs deleted file mode 100644 index c85f197..0000000 --- a/submodules/s2n-quic-h3/src/lib.rs +++ /dev/null @@ -1,7 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -mod s2n_quic; - -pub use self::s2n_quic::*; -pub use h3; diff --git a/submodules/s2n-quic-h3/src/s2n_quic.rs b/submodules/s2n-quic-h3/src/s2n_quic.rs deleted file mode 100644 index 622f965..0000000 --- a/submodules/s2n-quic-h3/src/s2n_quic.rs +++ /dev/null @@ -1,534 +0,0 @@ -// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. -// SPDX-License-Identifier: Apache-2.0 - -use bytes::{Buf, Bytes}; -use core::task::ready; -use h3::quic::{self, Error, StreamId, WriteBuf}; -use s2n_quic::{ - application, - stream::{BidirectionalStream, ReceiveStream}, -}; -use std::{ - convert::TryInto, - fmt::{self, Display}, - sync::Arc, - task::{self, Poll}, -}; - -#[cfg(feature = "tracing")] -use tracing::instrument; - -pub struct Connection { - conn: s2n_quic::connection::Handle, - bidi_acceptor: s2n_quic::connection::BidirectionalStreamAcceptor, - recv_acceptor: s2n_quic::connection::ReceiveStreamAcceptor, -} - -impl Connection { - pub fn new(new_conn: s2n_quic::Connection) -> Self { - let (handle, acceptor) = new_conn.split(); - let (bidi, recv) = acceptor.split(); - - Self { - conn: handle, - bidi_acceptor: bidi, - recv_acceptor: recv, - } - } -} - -#[derive(Debug)] -pub struct ConnectionError(s2n_quic::connection::Error); - -impl std::error::Error for ConnectionError {} - -impl fmt::Display for ConnectionError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} - -impl Error for ConnectionError { - fn is_timeout(&self) -> bool { - matches!(self.0, s2n_quic::connection::Error::IdleTimerExpired { .. }) - } - - fn err_code(&self) -> Option { - match self.0 { - s2n_quic::connection::Error::Application { error, .. } => Some(error.into()), - _ => None, - } - } -} - -impl From for ConnectionError { - fn from(e: s2n_quic::connection::Error) -> Self { - Self(e) - } -} - -impl quic::Connection for Connection -where - B: Buf, -{ - type RecvStream = RecvStream; - type OpenStreams = OpenStreams; - type AcceptError = ConnectionError; - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn poll_accept_recv( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Self::AcceptError>> { - let recv = match ready!(self.recv_acceptor.poll_accept_receive_stream(cx))? { - Some(x) => x, - None => return Poll::Ready(Ok(None)), - }; - Poll::Ready(Ok(Some(Self::RecvStream::new(recv)))) - } - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn poll_accept_bidi( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Self::AcceptError>> { - let (recv, send) = match ready!(self.bidi_acceptor.poll_accept_bidirectional_stream(cx))? { - Some(x) => x.split(), - None => return Poll::Ready(Ok(None)), - }; - Poll::Ready(Ok(Some(Self::BidiStream { - send: Self::SendStream::new(send), - recv: Self::RecvStream::new(recv), - }))) - } - - fn opener(&self) -> Self::OpenStreams { - OpenStreams { - conn: self.conn.clone(), - } - } -} - -impl quic::OpenStreams for Connection -where - B: Buf, -{ - type BidiStream = BidiStream; - type SendStream = SendStream; - type OpenError = ConnectionError; - - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn poll_open_bidi( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll> { - let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?; - Ok(stream.into()).into() - } - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn poll_open_send( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll> { - let stream = ready!(self.conn.poll_open_send_stream(cx))?; - Ok(stream.into()).into() - } - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn close(&mut self, code: h3::error::Code, _reason: &[u8]) { - self.conn.close( - code.value() - .try_into() - .expect("s2n-quic supports error codes up to 2^62-1"), - ); - } -} - -pub struct OpenStreams { - conn: s2n_quic::connection::Handle, -} - -impl quic::OpenStreams for OpenStreams -where - B: Buf, -{ - type BidiStream = BidiStream; - type SendStream = SendStream; - type OpenError = ConnectionError; - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn poll_open_bidi( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll> { - let stream = ready!(self.conn.poll_open_bidirectional_stream(cx))?; - Ok(stream.into()).into() - } - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn poll_open_send( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll> { - let stream = ready!(self.conn.poll_open_send_stream(cx))?; - Ok(stream.into()).into() - } - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn close(&mut self, code: h3::error::Code, _reason: &[u8]) { - self.conn.close( - code.value() - .try_into() - .unwrap_or(application::Error::UNKNOWN), - ); - } -} - -impl Clone for OpenStreams { - fn clone(&self) -> Self { - Self { - conn: self.conn.clone(), - } - } -} - -pub struct BidiStream -where - B: Buf, -{ - send: SendStream, - recv: RecvStream, -} - -impl quic::BidiStream for BidiStream -where - B: Buf, -{ - type SendStream = SendStream; - type RecvStream = RecvStream; - - fn split(self) -> (Self::SendStream, Self::RecvStream) { - (self.send, self.recv) - } -} - -impl quic::RecvStream for BidiStream -where - B: Buf, -{ - type Buf = Bytes; - type Error = ReadError; - - fn poll_data( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Self::Error>> { - self.recv.poll_data(cx) - } - - fn stop_sending(&mut self, error_code: u64) { - self.recv.stop_sending(error_code) - } - - fn recv_id(&self) -> StreamId { - self.recv.recv_id() - } -} - -impl quic::SendStream for BidiStream -where - B: Buf, -{ - type Error = SendStreamError; - - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - self.send.poll_ready(cx) - } - - fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll> { - self.send.poll_finish(cx) - } - - fn reset(&mut self, reset_code: u64) { - self.send.reset(reset_code) - } - - fn send_data>>(&mut self, data: D) -> Result<(), Self::Error> { - self.send.send_data(data) - } - - fn send_id(&self) -> StreamId { - self.send.send_id() - } -} - -impl From for BidiStream -where - B: Buf, -{ - fn from(bidi: BidirectionalStream) -> Self { - let (recv, send) = bidi.split(); - BidiStream { - send: send.into(), - recv: recv.into(), - } - } -} - -pub struct RecvStream { - stream: s2n_quic::stream::ReceiveStream, -} - -impl RecvStream { - fn new(stream: s2n_quic::stream::ReceiveStream) -> Self { - Self { stream } - } -} - -impl quic::RecvStream for RecvStream { - type Buf = Bytes; - type Error = ReadError; - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn poll_data( - &mut self, - cx: &mut task::Context<'_>, - ) -> Poll, Self::Error>> { - let buf = ready!(self.stream.poll_receive(cx))?; - Ok(buf).into() - } - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn stop_sending(&mut self, error_code: u64) { - let _ = self.stream.stop_sending( - s2n_quic::application::Error::new(error_code) - .expect("s2n-quic supports error codes up to 2^62-1"), - ); - } - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn recv_id(&self) -> StreamId { - self.stream.id().try_into().expect("invalid stream id") - } -} - -impl From for RecvStream { - fn from(recv: ReceiveStream) -> Self { - RecvStream::new(recv) - } -} - -#[derive(Debug)] -pub struct ReadError(s2n_quic::stream::Error); - -impl std::error::Error for ReadError {} - -impl fmt::Display for ReadError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} - -impl From for Arc { - fn from(e: ReadError) -> Self { - Arc::new(e) - } -} - -impl From for ReadError { - fn from(e: s2n_quic::stream::Error) -> Self { - Self(e) - } -} - -impl Error for ReadError { - fn is_timeout(&self) -> bool { - matches!( - self.0, - s2n_quic::stream::Error::ConnectionError { - error: s2n_quic::connection::Error::IdleTimerExpired { .. }, - .. - } - ) - } - - fn err_code(&self) -> Option { - match self.0 { - s2n_quic::stream::Error::ConnectionError { - error: s2n_quic::connection::Error::Application { error, .. }, - .. - } => Some(error.into()), - s2n_quic::stream::Error::StreamReset { error, .. } => Some(error.into()), - _ => None, - } - } -} - -pub struct SendStream { - stream: s2n_quic::stream::SendStream, - chunk: Option, - buf: Option>, // TODO: Replace with buf: PhantomData - // after https://github.com/hyperium/h3/issues/78 is resolved -} - -impl SendStream -where - B: Buf, -{ - fn new(stream: s2n_quic::stream::SendStream) -> SendStream { - Self { - stream, - chunk: None, - buf: Default::default(), - } - } -} - -impl quic::SendStream for SendStream -where - B: Buf, -{ - type Error = SendStreamError; - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { - loop { - // try to flush the current chunk if we have one - if let Some(chunk) = self.chunk.as_mut() { - ready!(self.stream.poll_send(chunk, cx))?; - - // s2n-quic will take the whole chunk on send, even if it exceeds the limits - debug_assert!(chunk.is_empty()); - self.chunk = None; - } - - // try to take the next chunk from the WriteBuf - if let Some(ref mut data) = self.buf { - let len = data.chunk().len(); - - // if the write buf is empty, then clear it and break - if len == 0 { - self.buf = None; - break; - } - - // copy the first chunk from WriteBuf and prepare it to flush - let chunk = data.copy_to_bytes(len); - self.chunk = Some(chunk); - - // loop back around to flush the chunk - continue; - } - - // if we didn't have either a chunk or WriteBuf, then we're ready - break; - } - - Poll::Ready(Ok(())) - - // TODO: Replace with following after https://github.com/hyperium/h3/issues/78 is resolved - // self.available_bytes = ready!(self.stream.poll_send_ready(cx))?; - // Poll::Ready(Ok(())) - } - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn send_data>>(&mut self, data: D) -> Result<(), Self::Error> { - if self.buf.is_some() { - return Err(Self::Error::NotReady); - } - self.buf = Some(data.into()); - Ok(()) - - // TODO: Replace with following after https://github.com/hyperium/h3/issues/78 is resolved - // let mut data = data.into(); - // while self.available_bytes > 0 && data.has_remaining() { - // let len = data.chunk().len(); - // let chunk = data.copy_to_bytes(len); - // self.stream.send_data(chunk)?; - // self.available_bytes = self.available_bytes.saturating_sub(len); - // } - // Ok(()) - } - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll> { - // ensure all chunks are flushed to the QUIC stream before finishing - ready!(self.poll_ready(cx))?; - self.stream.finish()?; - Ok(()).into() - } - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn reset(&mut self, reset_code: u64) { - let _ = self - .stream - .reset(reset_code.try_into().unwrap_or(application::Error::UNKNOWN)); - } - - #[cfg_attr(feature = "tracing", instrument(skip_all, level = "trace"))] - fn send_id(&self) -> StreamId { - self.stream.id().try_into().expect("invalid stream id") - } -} - -impl From for SendStream -where - B: Buf, -{ - fn from(send: s2n_quic::stream::SendStream) -> Self { - SendStream::new(send) - } -} - -#[derive(Debug)] -pub enum SendStreamError { - Write(s2n_quic::stream::Error), - NotReady, -} - -impl std::error::Error for SendStreamError {} - -impl Display for SendStreamError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - write!(f, "{self:?}") - } -} - -impl From for SendStreamError { - fn from(e: s2n_quic::stream::Error) -> Self { - Self::Write(e) - } -} - -impl Error for SendStreamError { - fn is_timeout(&self) -> bool { - matches!( - self, - Self::Write(s2n_quic::stream::Error::ConnectionError { - error: s2n_quic::connection::Error::IdleTimerExpired { .. }, - .. - }) - ) - } - - fn err_code(&self) -> Option { - match self { - Self::Write(s2n_quic::stream::Error::StreamReset { error, .. }) => { - Some((*error).into()) - } - Self::Write(s2n_quic::stream::Error::ConnectionError { - error: s2n_quic::connection::Error::Application { error, .. }, - .. - }) => Some((*error).into()), - _ => None, - } - } -} - -impl From for Arc { - fn from(e: SendStreamError) -> Self { - Arc::new(e) - } -}