diff --git a/Cargo.toml b/Cargo.toml index 23574c6..87a6f98 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -17,7 +17,7 @@ http3 = ["quinn", "h3", "h3-quinn"] [dependencies] anyhow = "1.0.71" -clap = { version = "4.3.2", features = ["std", "cargo", "wrap_help"] } +clap = { version = "4.3.3", features = ["std", "cargo", "wrap_help"] } rand = "0.8.5" toml = { version = "0.7.4", default-features = false, features = ["parse"] } rustc-hash = "1.1.0" @@ -49,7 +49,7 @@ hyper-rustls = { version = "0.24.0", default-features = false, features = [ "http1", "http2", ] } -tokio-rustls = { version = "0.24.0", features = ["early-data"] } +tokio-rustls = { version = "0.24.1", features = ["early-data"] } rustls-pemfile = "1.0.2" rustls = { version = "0.21.1", default-features = false } webpki = "0.22.0" diff --git a/h3 b/h3 index d9cae33..22da938 160000 --- a/h3 +++ b/h3 @@ -1 +1 @@ -Subproject commit d9cae33d319cafd39f95503f87f738d4b2a34f16 +Subproject commit 22da9387f19d724852b3bf1dfd7e66f0fd45cb81 diff --git a/h3-quinn/Cargo.toml b/h3-quinn/Cargo.toml index ab95dbd..df34822 100644 --- a/h3-quinn/Cargo.toml +++ b/h3-quinn/Cargo.toml @@ -15,7 +15,10 @@ license = "MIT" [dependencies] h3 = { version = "0.0.2", path = "../h3/h3" } bytes = "1" -quinn = { path = "../quinn/quinn/", default-features = false } +quinn = { path = "../quinn/quinn/", default-features = false, features = [ + "futures-io", +] } quinn-proto = { path = "../quinn/quinn-proto/", default-features = false } -tokio-util = { version = "0.7.7" } +tokio-util = { version = "0.7.8" } futures = { version = "0.3.27" } +tokio = { version = "1.28", features = ["io-util"], default-features = false } diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs index 62d84d8..78696de 100644 --- a/h3-quinn/src/lib.rs +++ b/h3-quinn/src/lib.rs @@ -7,22 +7,27 @@ use std::{ convert::TryInto, fmt::{self, Display}, future::Future, + pin::Pin, sync::Arc, task::{self, Poll}, }; -use bytes::{Buf, Bytes}; +use bytes::{Buf, Bytes, BytesMut}; use futures::{ ready, stream::{self, BoxStream}, StreamExt, }; +use quinn::ReadDatagram; pub use quinn::{ self, crypto::Session, AcceptBi, AcceptUni, Endpoint, OpenBi, OpenUni, VarInt, WriteError, }; -use h3::quic::{self, Error, StreamId, WriteBuf}; +use h3::{ + ext::Datagram, + quic::{self, Error, StreamId, WriteBuf}, +}; use tokio_util::sync::ReusableBoxFuture; /// A QUIC connection backed by Quinn @@ -34,6 +39,7 @@ pub struct Connection { opening_bi: Option as Future>::Output>>, incoming_uni: BoxStream<'static, as Future>::Output>, opening_uni: Option as Future>::Output>>, + datagrams: BoxStream<'static, as Future>::Output>, } impl Connection { @@ -45,10 +51,13 @@ impl Connection { Some((conn.accept_bi().await, conn)) })), opening_bi: None, - incoming_uni: Box::pin(stream::unfold(conn, |conn| async { + incoming_uni: Box::pin(stream::unfold(conn.clone(), |conn| async { Some((conn.accept_uni().await, conn)) })), opening_uni: None, + datagrams: Box::pin(stream::unfold(conn, |conn| async { + Some((conn.read_datagram().await, conn)) + })), } } } @@ -89,6 +98,58 @@ impl From for ConnectionError { } } +/// Types of errors when sending a datagram. +#[derive(Debug)] +pub enum SendDatagramError { + /// Datagrams are not supported by the peer + UnsupportedByPeer, + /// Datagrams are locally disabled + Disabled, + /// The datagram was too large to be sent. + TooLarge, + /// Network error + ConnectionLost(Box), +} + +impl fmt::Display for SendDatagramError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + SendDatagramError::UnsupportedByPeer => write!(f, "datagrams not supported by peer"), + SendDatagramError::Disabled => write!(f, "datagram support disabled"), + SendDatagramError::TooLarge => write!(f, "datagram too large"), + SendDatagramError::ConnectionLost(_) => write!(f, "connection lost"), + } + } +} + +impl std::error::Error for SendDatagramError {} + +impl Error for SendDatagramError { + fn is_timeout(&self) -> bool { + false + } + + fn err_code(&self) -> Option { + match self { + Self::ConnectionLost(err) => err.err_code(), + _ => None, + } + } +} + +impl From for SendDatagramError { + fn from(value: quinn::SendDatagramError) -> Self { + match value { + quinn::SendDatagramError::UnsupportedByPeer => Self::UnsupportedByPeer, + quinn::SendDatagramError::Disabled => Self::Disabled, + quinn::SendDatagramError::TooLarge => Self::TooLarge, + quinn::SendDatagramError::ConnectionLost(err) => { + Self::ConnectionLost(ConnectionError::from(err).into()) + } + } + } +} + impl quic::Connection for Connection where B: Buf, @@ -172,6 +233,40 @@ where } } +impl quic::SendDatagramExt for Connection +where + B: Buf, +{ + type Error = SendDatagramError; + + fn send_datagram(&mut self, data: Datagram) -> Result<(), SendDatagramError> { + // TODO investigate static buffer from known max datagram size + let mut buf = BytesMut::new(); + data.encode(&mut buf); + self.conn.send_datagram(buf.freeze())?; + + Ok(()) + } +} + +impl quic::RecvDatagramExt for Connection { + type Buf = Bytes; + + type Error = ConnectionError; + + #[inline] + fn poll_accept_datagram( + &mut self, + cx: &mut task::Context<'_>, + ) -> Poll, Self::Error>> { + match ready!(self.datagrams.poll_next_unpin(cx)) { + Some(Ok(x)) => Poll::Ready(Ok(Some(x))), + Some(Err(e)) => Poll::Ready(Err(e.into())), + None => Poll::Ready(Ok(None)), + } + } +} + /// Stream opener backed by a Quinn connection /// /// Implements [`quic::OpenStreams`] using [`quinn::Connection`], @@ -265,10 +360,7 @@ where } } -impl quic::RecvStream for BidiStream -where - B: Buf, -{ +impl quic::RecvStream for BidiStream { type Buf = Bytes; type Error = ReadError; @@ -282,6 +374,10 @@ where fn stop_sending(&mut self, error_code: u64) { self.recv.stop_sending(error_code) } + + fn recv_id(&self) -> StreamId { + self.recv.recv_id() + } } impl quic::SendStream for BidiStream @@ -306,8 +402,20 @@ where self.send.send_data(data) } - fn id(&self) -> StreamId { - self.send.id() + fn send_id(&self) -> StreamId { + self.send.send_id() + } +} +impl quic::SendStreamUnframed for BidiStream +where + B: Buf, +{ + fn poll_send( + &mut self, + cx: &mut task::Context<'_>, + buf: &mut D, + ) -> Poll> { + self.send.poll_send(cx, buf) } } @@ -364,6 +472,16 @@ impl quic::RecvStream for RecvStream { .stop(VarInt::from_u64(error_code).expect("invalid error_code")) .ok(); } + + fn recv_id(&self) -> StreamId { + self.stream + .as_ref() + .unwrap() + .id() + .0 + .try_into() + .expect("invalid stream id") + } } /// The error type for [`RecvStream`] @@ -372,7 +490,17 @@ impl quic::RecvStream for RecvStream { #[derive(Debug)] pub struct ReadError(quinn::ReadError); -impl std::error::Error for ReadError {} +impl From for std::io::Error { + fn from(value: ReadError) -> Self { + value.0.into() + } +} + +impl std::error::Error for ReadError { + fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { + self.0.source() + } +} impl fmt::Display for ReadError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { @@ -491,7 +619,7 @@ where Ok(()) } - fn id(&self) -> StreamId { + fn send_id(&self) -> StreamId { self.stream .as_ref() .unwrap() @@ -502,6 +630,48 @@ where } } +impl quic::SendStreamUnframed for SendStream +where + B: Buf, +{ + fn poll_send( + &mut self, + cx: &mut task::Context<'_>, + buf: &mut D, + ) -> Poll> { + if self.writing.is_some() { + // This signifies a bug in implementation + panic!("poll_send called while send stream is not ready") + } + + let s = Pin::new(self.stream.as_mut().unwrap()); + + let res = ready!(futures::io::AsyncWrite::poll_write(s, cx, buf.chunk())); + match res { + Ok(written) => { + buf.advance(written); + Poll::Ready(Ok(written)) + } + Err(err) => { + // We are forced to use AsyncWrite for now because we cannot store + // the result of a call to: + // quinn::send_stream::write<'a>(&'a mut self, buf: &'a [u8]) -> Result. + // + // This is why we have to unpack the error from io::Error instead of having it + // returned directly. This should not panic as long as quinn's AsyncWrite impl + // doesn't change. + let err = err + .into_inner() + .expect("write stream returned an empty error") + .downcast::() + .expect("write stream returned an error which type is not WriteError"); + + Poll::Ready(Err(SendStreamError::Write(*err))) + } + } + } +} + /// The error type for [`SendStream`] /// /// Wraps errors that can happen writing to or polling a send stream. @@ -514,6 +684,17 @@ pub enum SendStreamError { NotReady, } +impl From for std::io::Error { + fn from(value: SendStreamError) -> Self { + match value { + SendStreamError::Write(err) => err.into(), + SendStreamError::NotReady => { + std::io::Error::new(std::io::ErrorKind::Other, "send stream is not ready") + } + } + } +} + impl std::error::Error for SendStreamError {} impl Display for SendStreamError { diff --git a/quinn b/quinn index 98f5fe2..7914468 160000 --- a/quinn +++ b/quinn @@ -1 +1 @@ -Subproject commit 98f5fe2a3fabb9ff991f8c831e8d43de76985ff3 +Subproject commit 7914468e27621633a8399c8d02fbf3f557d54df2