diff --git a/.gitmodules b/.gitmodules index 0a7fc93..b9069a0 100644 --- a/.gitmodules +++ b/.gitmodules @@ -1,3 +1,6 @@ [submodule "h3"] path = h3 url = git@github.com:junkurihara/h3.git +[submodule "quinn"] + path = quinn + url = git@github.com:junkurihara/quinn.git diff --git a/Cargo.toml b/Cargo.toml index 166b06c..8da0a8f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -43,24 +43,27 @@ hyper = { version = "0.14.25", default-features = false, features = [ "http2", "stream", ] } -hyper-rustls = { version = "0.23.2", default-features = false, features = [ +hyper-rustls = { version = "0.24.0", default-features = false, features = [ "tokio-runtime", "webpki-tokio", "http1", "http2", ] } -tokio-rustls = { version = "0.23.4", features = ["early-data"] } +tokio-rustls = { version = "0.24.0", features = ["early-data"] } rustls-pemfile = "1.0.2" -rustls = { version = "0.20.8", default-features = false } +rustls = { version = "0.21.0", default-features = false } +webpki = "0.22.0" # logging tracing = { version = "0.1.37" } tracing-subscriber = { version = "0.3.16", features = ["env-filter"] } # http/3 -quinn = { version = "0.9.3", optional = true } +# quinn = { version = "0.9.3", optional = true } +quinn = { path = "./quinn/quinn", optional = true } # Tentative to support rustls-0.21 h3 = { path = "./h3/h3/", optional = true } -h3-quinn = { path = "./h3/h3-quinn/", optional = true } +# h3-quinn = { path = "./h3/h3-quinn/", optional = true } +h3-quinn = { path = "./h3-quinn/", optional = true } # Tentative to support rustls-0.21 [target.'cfg(not(target_env = "msvc"))'.dependencies] diff --git a/h3-quinn/Cargo.toml b/h3-quinn/Cargo.toml new file mode 100644 index 0000000..5f7661a --- /dev/null +++ b/h3-quinn/Cargo.toml @@ -0,0 +1,21 @@ +[package] +name = "h3-quinn" +version = "0.0.1" +rust-version = "1.59" +authors = ["Jean-Christophe BEGUE "] +edition = "2018" +documentation = "https://docs.rs/h3-quinn" +repository = "https://github.com/hyperium/h3" +readme = "../README.md" +description = "QUIC transport implementation based on Quinn." +keywords = ["http3", "quic", "h3"] +categories = ["network-programming", "web-programming"] +license = "MIT" + +[dependencies] +h3 = { version = "0.0.1", path = "../h3/h3" } +bytes = "1" +quinn = { path = "../quinn/quinn/", default-features = false } +quinn-proto = { path = "../quinn/quinn-proto/", default-features = false } +tokio-util = { version = "0.7.7" } +futures = { version = "0.3.27" } diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs new file mode 100644 index 0000000..62d84d8 --- /dev/null +++ b/h3-quinn/src/lib.rs @@ -0,0 +1,559 @@ +//! QUIC Transport implementation with Quinn +//! +//! This module implements QUIC traits with Quinn. +#![deny(missing_docs)] + +use std::{ + convert::TryInto, + fmt::{self, Display}, + future::Future, + sync::Arc, + task::{self, Poll}, +}; + +use bytes::{Buf, Bytes}; + +use futures::{ + ready, + stream::{self, BoxStream}, + StreamExt, +}; +pub use quinn::{ + self, crypto::Session, AcceptBi, AcceptUni, Endpoint, OpenBi, OpenUni, VarInt, WriteError, +}; + +use h3::quic::{self, Error, StreamId, WriteBuf}; +use tokio_util::sync::ReusableBoxFuture; + +/// A QUIC connection backed by Quinn +/// +/// Implements a [`quic::Connection`] backed by a [`quinn::Connection`]. +pub struct Connection { + conn: quinn::Connection, + incoming_bi: BoxStream<'static, as Future>::Output>, + opening_bi: Option as Future>::Output>>, + incoming_uni: BoxStream<'static, as Future>::Output>, + opening_uni: Option as Future>::Output>>, +} + +impl Connection { + /// Create a [`Connection`] from a [`quinn::NewConnection`] + pub fn new(conn: quinn::Connection) -> Self { + Self { + conn: conn.clone(), + incoming_bi: Box::pin(stream::unfold(conn.clone(), |conn| async { + Some((conn.accept_bi().await, conn)) + })), + opening_bi: None, + incoming_uni: Box::pin(stream::unfold(conn, |conn| async { + Some((conn.accept_uni().await, conn)) + })), + opening_uni: None, + } + } +} + +/// The error type for [`Connection`] +/// +/// Wraps reasons a Quinn connection might be lost. +#[derive(Debug)] +pub struct ConnectionError(quinn::ConnectionError); + +impl std::error::Error for ConnectionError {} + +impl fmt::Display for ConnectionError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl Error for ConnectionError { + fn is_timeout(&self) -> bool { + matches!(self.0, quinn::ConnectionError::TimedOut) + } + + fn err_code(&self) -> Option { + match self.0 { + quinn::ConnectionError::ApplicationClosed(quinn_proto::ApplicationClose { + error_code, + .. + }) => Some(error_code.into_inner()), + _ => None, + } + } +} + +impl From for ConnectionError { + fn from(e: quinn::ConnectionError) -> Self { + Self(e) + } +} + +impl quic::Connection for Connection +where + B: Buf, +{ + type SendStream = SendStream; + type RecvStream = RecvStream; + type BidiStream = BidiStream; + type OpenStreams = OpenStreams; + type Error = ConnectionError; + + fn poll_accept_bidi( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>> { + let (send, recv) = match ready!(self.incoming_bi.poll_next_unpin(cx)) { + Some(x) => x?, + None => return Poll::Ready(Ok(None)), + }; + Poll::Ready(Ok(Some(Self::BidiStream { + send: Self::SendStream::new(send), + recv: Self::RecvStream::new(recv), + }))) + } + + fn poll_accept_recv( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>> { + let recv = match ready!(self.incoming_uni.poll_next_unpin(cx)) { + Some(x) => x?, + None => return Poll::Ready(Ok(None)), + }; + Poll::Ready(Ok(Some(Self::RecvStream::new(recv)))) + } + + fn poll_open_bidi( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + if self.opening_bi.is_none() { + self.opening_bi = Some(Box::pin(stream::unfold(self.conn.clone(), |conn| async { + Some((conn.clone().open_bi().await, conn)) + }))); + } + + let (send, recv) = + ready!(self.opening_bi.as_mut().unwrap().poll_next_unpin(cx)).unwrap()?; + Poll::Ready(Ok(Self::BidiStream { + send: Self::SendStream::new(send), + recv: Self::RecvStream::new(recv), + })) + } + + fn poll_open_send( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + if self.opening_uni.is_none() { + self.opening_uni = Some(Box::pin(stream::unfold(self.conn.clone(), |conn| async { + Some((conn.open_uni().await, conn)) + }))); + } + + let send = ready!(self.opening_uni.as_mut().unwrap().poll_next_unpin(cx)).unwrap()?; + Poll::Ready(Ok(Self::SendStream::new(send))) + } + + fn opener(&self) -> Self::OpenStreams { + OpenStreams { + conn: self.conn.clone(), + opening_bi: None, + opening_uni: None, + } + } + + fn close(&mut self, code: h3::error::Code, reason: &[u8]) { + self.conn.close( + VarInt::from_u64(code.value()).expect("error code VarInt"), + reason, + ); + } +} + +/// Stream opener backed by a Quinn connection +/// +/// Implements [`quic::OpenStreams`] using [`quinn::Connection`], +/// [`quinn::OpenBi`], [`quinn::OpenUni`]. +pub struct OpenStreams { + conn: quinn::Connection, + opening_bi: Option as Future>::Output>>, + opening_uni: Option as Future>::Output>>, +} + +impl quic::OpenStreams for OpenStreams +where + B: Buf, +{ + type RecvStream = RecvStream; + type SendStream = SendStream; + type BidiStream = BidiStream; + type Error = ConnectionError; + + fn poll_open_bidi( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + if self.opening_bi.is_none() { + self.opening_bi = Some(Box::pin(stream::unfold(self.conn.clone(), |conn| async { + Some((conn.open_bi().await, conn)) + }))); + } + + let (send, recv) = + ready!(self.opening_bi.as_mut().unwrap().poll_next_unpin(cx)).unwrap()?; + Poll::Ready(Ok(Self::BidiStream { + send: Self::SendStream::new(send), + recv: Self::RecvStream::new(recv), + })) + } + + fn poll_open_send( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll> { + if self.opening_uni.is_none() { + self.opening_uni = Some(Box::pin(stream::unfold(self.conn.clone(), |conn| async { + Some((conn.open_uni().await, conn)) + }))); + } + + let send = ready!(self.opening_uni.as_mut().unwrap().poll_next_unpin(cx)).unwrap()?; + Poll::Ready(Ok(Self::SendStream::new(send))) + } + + fn close(&mut self, code: h3::error::Code, reason: &[u8]) { + self.conn.close( + VarInt::from_u64(code.value()).expect("error code VarInt"), + reason, + ); + } +} + +impl Clone for OpenStreams { + fn clone(&self) -> Self { + Self { + conn: self.conn.clone(), + opening_bi: None, + opening_uni: None, + } + } +} + +/// Quinn-backed bidirectional stream +/// +/// Implements [`quic::BidiStream`] which allows the stream to be split +/// into two structs each implementing one direction. +pub struct BidiStream +where + B: Buf, +{ + send: SendStream, + recv: RecvStream, +} + +impl quic::BidiStream for BidiStream +where + B: Buf, +{ + type SendStream = SendStream; + type RecvStream = RecvStream; + + fn split(self) -> (Self::SendStream, Self::RecvStream) { + (self.send, self.recv) + } +} + +impl quic::RecvStream for BidiStream +where + B: Buf, +{ + type Buf = Bytes; + type Error = ReadError; + + fn poll_data( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>> { + self.recv.poll_data(cx) + } + + fn stop_sending(&mut self, error_code: u64) { + self.recv.stop_sending(error_code) + } +} + +impl quic::SendStream for BidiStream +where + B: Buf, +{ + type Error = SendStreamError; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.send.poll_ready(cx) + } + + fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.send.poll_finish(cx) + } + + fn reset(&mut self, reset_code: u64) { + self.send.reset(reset_code) + } + + fn send_data>>(&mut self, data: D) -> Result<(), Self::Error> { + self.send.send_data(data) + } + + fn id(&self) -> StreamId { + self.send.id() + } +} + +/// Quinn-backed receive stream +/// +/// Implements a [`quic::RecvStream`] backed by a [`quinn::RecvStream`]. +pub struct RecvStream { + stream: Option, + read_chunk_fut: ReadChunkFuture, +} + +type ReadChunkFuture = ReusableBoxFuture< + 'static, + ( + quinn::RecvStream, + Result, quinn::ReadError>, + ), +>; + +impl RecvStream { + fn new(stream: quinn::RecvStream) -> Self { + Self { + stream: Some(stream), + // Should only allocate once the first time it's used + read_chunk_fut: ReusableBoxFuture::new(async { unreachable!() }), + } + } +} + +impl quic::RecvStream for RecvStream { + type Buf = Bytes; + type Error = ReadError; + + fn poll_data( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>> { + if let Some(mut stream) = self.stream.take() { + self.read_chunk_fut.set(async move { + let chunk = stream.read_chunk(usize::MAX, true).await; + (stream, chunk) + }) + }; + + let (stream, chunk) = ready!(self.read_chunk_fut.poll(cx)); + self.stream = Some(stream); + Poll::Ready(Ok(chunk?.map(|c| c.bytes))) + } + + fn stop_sending(&mut self, error_code: u64) { + self.stream + .as_mut() + .unwrap() + .stop(VarInt::from_u64(error_code).expect("invalid error_code")) + .ok(); + } +} + +/// The error type for [`RecvStream`] +/// +/// Wraps errors that occur when reading from a receive stream. +#[derive(Debug)] +pub struct ReadError(quinn::ReadError); + +impl std::error::Error for ReadError {} + +impl fmt::Display for ReadError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.0.fmt(f) + } +} + +impl From for Arc { + fn from(e: ReadError) -> Self { + Arc::new(e) + } +} + +impl From for ReadError { + fn from(e: quinn::ReadError) -> Self { + Self(e) + } +} + +impl Error for ReadError { + fn is_timeout(&self) -> bool { + matches!( + self.0, + quinn::ReadError::ConnectionLost(quinn::ConnectionError::TimedOut) + ) + } + + fn err_code(&self) -> Option { + match self.0 { + quinn::ReadError::ConnectionLost(quinn::ConnectionError::ApplicationClosed( + quinn_proto::ApplicationClose { error_code, .. }, + )) => Some(error_code.into_inner()), + quinn::ReadError::Reset(error_code) => Some(error_code.into_inner()), + _ => None, + } + } +} + +/// Quinn-backed send stream +/// +/// Implements a [`quic::SendStream`] backed by a [`quinn::SendStream`]. +pub struct SendStream { + stream: Option, + writing: Option>, + write_fut: WriteFuture, +} + +type WriteFuture = + ReusableBoxFuture<'static, (quinn::SendStream, Result)>; + +impl SendStream +where + B: Buf, +{ + fn new(stream: quinn::SendStream) -> SendStream { + Self { + stream: Some(stream), + writing: None, + write_fut: ReusableBoxFuture::new(async { unreachable!() }), + } + } +} + +impl quic::SendStream for SendStream +where + B: Buf, +{ + type Error = SendStreamError; + + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + if let Some(ref mut data) = self.writing { + while data.has_remaining() { + if let Some(mut stream) = self.stream.take() { + let chunk = data.chunk().to_owned(); // FIXME - avoid copy + self.write_fut.set(async move { + let ret = stream.write(&chunk).await; + (stream, ret) + }); + } + + let (stream, res) = ready!(self.write_fut.poll(cx)); + self.stream = Some(stream); + match res { + Ok(cnt) => data.advance(cnt), + Err(err) => { + return Poll::Ready(Err(SendStreamError::Write(err))); + } + } + } + } + self.writing = None; + Poll::Ready(Ok(())) + } + + fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll> { + self.stream + .as_mut() + .unwrap() + .poll_finish(cx) + .map_err(Into::into) + } + + fn reset(&mut self, reset_code: u64) { + let _ = self + .stream + .as_mut() + .unwrap() + .reset(VarInt::from_u64(reset_code).unwrap_or(VarInt::MAX)); + } + + fn send_data>>(&mut self, data: D) -> Result<(), Self::Error> { + if self.writing.is_some() { + return Err(Self::Error::NotReady); + } + self.writing = Some(data.into()); + Ok(()) + } + + fn id(&self) -> StreamId { + self.stream + .as_ref() + .unwrap() + .id() + .0 + .try_into() + .expect("invalid stream id") + } +} + +/// The error type for [`SendStream`] +/// +/// Wraps errors that can happen writing to or polling a send stream. +#[derive(Debug)] +pub enum SendStreamError { + /// Errors when writing, wrapping a [`quinn::WriteError`] + Write(WriteError), + /// Error when the stream is not ready, because it is still sending + /// data from a previous call + NotReady, +} + +impl std::error::Error for SendStreamError {} + +impl Display for SendStreamError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} + +impl From for SendStreamError { + fn from(e: WriteError) -> Self { + Self::Write(e) + } +} + +impl Error for SendStreamError { + fn is_timeout(&self) -> bool { + matches!( + self, + Self::Write(quinn::WriteError::ConnectionLost( + quinn::ConnectionError::TimedOut + )) + ) + } + + fn err_code(&self) -> Option { + match self { + Self::Write(quinn::WriteError::Stopped(error_code)) => Some(error_code.into_inner()), + Self::Write(quinn::WriteError::ConnectionLost( + quinn::ConnectionError::ApplicationClosed(quinn_proto::ApplicationClose { + error_code, + .. + }), + )) => Some(error_code.into_inner()), + _ => None, + } + } +} + +impl From for Arc { + fn from(e: SendStreamError) -> Self { + Arc::new(e) + } +} diff --git a/quinn b/quinn new file mode 160000 index 0000000..b56d60b --- /dev/null +++ b/quinn @@ -0,0 +1 @@ +Subproject commit b56d60bbec577d73e67abbba60ed389f0589f208 diff --git a/src/backend/mod.rs b/src/backend/mod.rs index 6a92ec8..b502369 100644 --- a/src/backend/mod.rs +++ b/src/backend/mod.rs @@ -168,7 +168,8 @@ impl Backend { let owned_trust_anchors: Vec<_> = certs .iter() .map(|v| { - let trust_anchor = tokio_rustls::webpki::TrustAnchor::try_from_cert_der(&v.0).unwrap(); + // let trust_anchor = tokio_rustls::webpki::TrustAnchor::try_from_cert_der(&v.0).unwrap(); + let trust_anchor = webpki::TrustAnchor::try_from_cert_der(&v.0).unwrap(); rustls::OwnedTrustAnchor::from_subject_spki_name_constraints( trust_anchor.subject, trust_anchor.spki, @@ -279,7 +280,7 @@ impl Backends { let client_certs_verifier = rustls::server::AllowAnyAuthenticatedClient::new(client_ca_roots_local); ServerConfig::builder() .with_safe_defaults() - .with_client_cert_verifier(client_certs_verifier) + .with_client_cert_verifier(Arc::new(client_certs_verifier)) .with_cert_resolver(Arc::new(resolver_local)) }; server_config_local.alpn_protocols.push(b"h2".to_vec());