Merge pull request #46 from junkurihara/sticky-backend
deps and submodule: update deps, support updated h3-quinn
This commit is contained in:
		
				commit
				
					
						ec0726b8a9
					
				
			
		
					 5 changed files with 201 additions and 17 deletions
				
			
		|  | @ -17,7 +17,7 @@ http3 = ["quinn", "h3", "h3-quinn"] | |||
| 
 | ||||
| [dependencies] | ||||
| anyhow = "1.0.71" | ||||
| clap = { version = "4.3.2", features = ["std", "cargo", "wrap_help"] } | ||||
| clap = { version = "4.3.3", features = ["std", "cargo", "wrap_help"] } | ||||
| rand = "0.8.5" | ||||
| toml = { version = "0.7.4", default-features = false, features = ["parse"] } | ||||
| rustc-hash = "1.1.0" | ||||
|  | @ -49,7 +49,7 @@ hyper-rustls = { version = "0.24.0", default-features = false, features = [ | |||
|   "http1", | ||||
|   "http2", | ||||
| ] } | ||||
| tokio-rustls = { version = "0.24.0", features = ["early-data"] } | ||||
| tokio-rustls = { version = "0.24.1", features = ["early-data"] } | ||||
| rustls-pemfile = "1.0.2" | ||||
| rustls = { version = "0.21.1", default-features = false } | ||||
| webpki = "0.22.0" | ||||
|  |  | |||
							
								
								
									
										2
									
								
								h3
									
										
									
									
									
								
							
							
						
						
									
										2
									
								
								h3
									
										
									
									
									
								
							|  | @ -1 +1 @@ | |||
| Subproject commit d9cae33d319cafd39f95503f87f738d4b2a34f16 | ||||
| Subproject commit 22da9387f19d724852b3bf1dfd7e66f0fd45cb81 | ||||
|  | @ -15,7 +15,10 @@ license = "MIT" | |||
| [dependencies] | ||||
| h3 = { version = "0.0.2", path = "../h3/h3" } | ||||
| bytes = "1" | ||||
| quinn = { path = "../quinn/quinn/", default-features = false } | ||||
| quinn = { path = "../quinn/quinn/", default-features = false, features = [ | ||||
|   "futures-io", | ||||
| ] } | ||||
| quinn-proto = { path = "../quinn/quinn-proto/", default-features = false } | ||||
| tokio-util = { version = "0.7.7" } | ||||
| tokio-util = { version = "0.7.8" } | ||||
| futures = { version = "0.3.27" } | ||||
| tokio = { version = "1.28", features = ["io-util"], default-features = false } | ||||
|  |  | |||
|  | @ -7,22 +7,27 @@ use std::{ | |||
|     convert::TryInto, | ||||
|     fmt::{self, Display}, | ||||
|     future::Future, | ||||
|     pin::Pin, | ||||
|     sync::Arc, | ||||
|     task::{self, Poll}, | ||||
| }; | ||||
| 
 | ||||
| use bytes::{Buf, Bytes}; | ||||
| use bytes::{Buf, Bytes, BytesMut}; | ||||
| 
 | ||||
| use futures::{ | ||||
|     ready, | ||||
|     stream::{self, BoxStream}, | ||||
|     StreamExt, | ||||
| }; | ||||
| use quinn::ReadDatagram; | ||||
| pub use quinn::{ | ||||
|     self, crypto::Session, AcceptBi, AcceptUni, Endpoint, OpenBi, OpenUni, VarInt, WriteError, | ||||
| }; | ||||
| 
 | ||||
| use h3::quic::{self, Error, StreamId, WriteBuf}; | ||||
| use h3::{ | ||||
|     ext::Datagram, | ||||
|     quic::{self, Error, StreamId, WriteBuf}, | ||||
| }; | ||||
| use tokio_util::sync::ReusableBoxFuture; | ||||
| 
 | ||||
| /// A QUIC connection backed by Quinn
 | ||||
|  | @ -34,6 +39,7 @@ pub struct Connection { | |||
|     opening_bi: Option<BoxStream<'static, <OpenBi<'static> as Future>::Output>>, | ||||
|     incoming_uni: BoxStream<'static, <AcceptUni<'static> as Future>::Output>, | ||||
|     opening_uni: Option<BoxStream<'static, <OpenUni<'static> as Future>::Output>>, | ||||
|     datagrams: BoxStream<'static, <ReadDatagram<'static> as Future>::Output>, | ||||
| } | ||||
| 
 | ||||
| impl Connection { | ||||
|  | @ -45,10 +51,13 @@ impl Connection { | |||
|                 Some((conn.accept_bi().await, conn)) | ||||
|             })), | ||||
|             opening_bi: None, | ||||
|             incoming_uni: Box::pin(stream::unfold(conn, |conn| async { | ||||
|             incoming_uni: Box::pin(stream::unfold(conn.clone(), |conn| async { | ||||
|                 Some((conn.accept_uni().await, conn)) | ||||
|             })), | ||||
|             opening_uni: None, | ||||
|             datagrams: Box::pin(stream::unfold(conn, |conn| async { | ||||
|                 Some((conn.read_datagram().await, conn)) | ||||
|             })), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | @ -89,6 +98,58 @@ impl From<quinn::ConnectionError> for ConnectionError { | |||
|     } | ||||
| } | ||||
| 
 | ||||
| /// Types of errors when sending a datagram.
 | ||||
| #[derive(Debug)] | ||||
| pub enum SendDatagramError { | ||||
|     /// Datagrams are not supported by the peer
 | ||||
|     UnsupportedByPeer, | ||||
|     /// Datagrams are locally disabled
 | ||||
|     Disabled, | ||||
|     /// The datagram was too large to be sent.
 | ||||
|     TooLarge, | ||||
|     /// Network error
 | ||||
|     ConnectionLost(Box<dyn Error>), | ||||
| } | ||||
| 
 | ||||
| impl fmt::Display for SendDatagramError { | ||||
|     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||||
|         match self { | ||||
|             SendDatagramError::UnsupportedByPeer => write!(f, "datagrams not supported by peer"), | ||||
|             SendDatagramError::Disabled => write!(f, "datagram support disabled"), | ||||
|             SendDatagramError::TooLarge => write!(f, "datagram too large"), | ||||
|             SendDatagramError::ConnectionLost(_) => write!(f, "connection lost"), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl std::error::Error for SendDatagramError {} | ||||
| 
 | ||||
| impl Error for SendDatagramError { | ||||
|     fn is_timeout(&self) -> bool { | ||||
|         false | ||||
|     } | ||||
| 
 | ||||
|     fn err_code(&self) -> Option<u64> { | ||||
|         match self { | ||||
|             Self::ConnectionLost(err) => err.err_code(), | ||||
|             _ => None, | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<quinn::SendDatagramError> for SendDatagramError { | ||||
|     fn from(value: quinn::SendDatagramError) -> Self { | ||||
|         match value { | ||||
|             quinn::SendDatagramError::UnsupportedByPeer => Self::UnsupportedByPeer, | ||||
|             quinn::SendDatagramError::Disabled => Self::Disabled, | ||||
|             quinn::SendDatagramError::TooLarge => Self::TooLarge, | ||||
|             quinn::SendDatagramError::ConnectionLost(err) => { | ||||
|                 Self::ConnectionLost(ConnectionError::from(err).into()) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<B> quic::Connection<B> for Connection | ||||
| where | ||||
|     B: Buf, | ||||
|  | @ -172,6 +233,40 @@ where | |||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<B> quic::SendDatagramExt<B> for Connection | ||||
| where | ||||
|     B: Buf, | ||||
| { | ||||
|     type Error = SendDatagramError; | ||||
| 
 | ||||
|     fn send_datagram(&mut self, data: Datagram<B>) -> Result<(), SendDatagramError> { | ||||
|         // TODO investigate static buffer from known max datagram size
 | ||||
|         let mut buf = BytesMut::new(); | ||||
|         data.encode(&mut buf); | ||||
|         self.conn.send_datagram(buf.freeze())?; | ||||
| 
 | ||||
|         Ok(()) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl quic::RecvDatagramExt for Connection { | ||||
|     type Buf = Bytes; | ||||
| 
 | ||||
|     type Error = ConnectionError; | ||||
| 
 | ||||
|     #[inline] | ||||
|     fn poll_accept_datagram( | ||||
|         &mut self, | ||||
|         cx: &mut task::Context<'_>, | ||||
|     ) -> Poll<Result<Option<Self::Buf>, Self::Error>> { | ||||
|         match ready!(self.datagrams.poll_next_unpin(cx)) { | ||||
|             Some(Ok(x)) => Poll::Ready(Ok(Some(x))), | ||||
|             Some(Err(e)) => Poll::Ready(Err(e.into())), | ||||
|             None => Poll::Ready(Ok(None)), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /// Stream opener backed by a Quinn connection
 | ||||
| ///
 | ||||
| /// Implements [`quic::OpenStreams`] using [`quinn::Connection`],
 | ||||
|  | @ -265,10 +360,7 @@ where | |||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<B> quic::RecvStream for BidiStream<B> | ||||
| where | ||||
|     B: Buf, | ||||
| { | ||||
| impl<B: Buf> quic::RecvStream for BidiStream<B> { | ||||
|     type Buf = Bytes; | ||||
|     type Error = ReadError; | ||||
| 
 | ||||
|  | @ -282,6 +374,10 @@ where | |||
|     fn stop_sending(&mut self, error_code: u64) { | ||||
|         self.recv.stop_sending(error_code) | ||||
|     } | ||||
| 
 | ||||
|     fn recv_id(&self) -> StreamId { | ||||
|         self.recv.recv_id() | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<B> quic::SendStream<B> for BidiStream<B> | ||||
|  | @ -306,8 +402,20 @@ where | |||
|         self.send.send_data(data) | ||||
|     } | ||||
| 
 | ||||
|     fn id(&self) -> StreamId { | ||||
|         self.send.id() | ||||
|     fn send_id(&self) -> StreamId { | ||||
|         self.send.send_id() | ||||
|     } | ||||
| } | ||||
| impl<B> quic::SendStreamUnframed<B> for BidiStream<B> | ||||
| where | ||||
|     B: Buf, | ||||
| { | ||||
|     fn poll_send<D: Buf>( | ||||
|         &mut self, | ||||
|         cx: &mut task::Context<'_>, | ||||
|         buf: &mut D, | ||||
|     ) -> Poll<Result<usize, Self::Error>> { | ||||
|         self.send.poll_send(cx, buf) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|  | @ -364,6 +472,16 @@ impl quic::RecvStream for RecvStream { | |||
|             .stop(VarInt::from_u64(error_code).expect("invalid error_code")) | ||||
|             .ok(); | ||||
|     } | ||||
| 
 | ||||
|     fn recv_id(&self) -> StreamId { | ||||
|         self.stream | ||||
|             .as_ref() | ||||
|             .unwrap() | ||||
|             .id() | ||||
|             .0 | ||||
|             .try_into() | ||||
|             .expect("invalid stream id") | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /// The error type for [`RecvStream`]
 | ||||
|  | @ -372,7 +490,17 @@ impl quic::RecvStream for RecvStream { | |||
| #[derive(Debug)] | ||||
| pub struct ReadError(quinn::ReadError); | ||||
| 
 | ||||
| impl std::error::Error for ReadError {} | ||||
| impl From<ReadError> for std::io::Error { | ||||
|     fn from(value: ReadError) -> Self { | ||||
|         value.0.into() | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl std::error::Error for ReadError { | ||||
|     fn source(&self) -> Option<&(dyn std::error::Error + 'static)> { | ||||
|         self.0.source() | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl fmt::Display for ReadError { | ||||
|     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { | ||||
|  | @ -491,7 +619,7 @@ where | |||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     fn id(&self) -> StreamId { | ||||
|     fn send_id(&self) -> StreamId { | ||||
|         self.stream | ||||
|             .as_ref() | ||||
|             .unwrap() | ||||
|  | @ -502,6 +630,48 @@ where | |||
|     } | ||||
| } | ||||
| 
 | ||||
| impl<B> quic::SendStreamUnframed<B> for SendStream<B> | ||||
| where | ||||
|     B: Buf, | ||||
| { | ||||
|     fn poll_send<D: Buf>( | ||||
|         &mut self, | ||||
|         cx: &mut task::Context<'_>, | ||||
|         buf: &mut D, | ||||
|     ) -> Poll<Result<usize, Self::Error>> { | ||||
|         if self.writing.is_some() { | ||||
|             // This signifies a bug in implementation
 | ||||
|             panic!("poll_send called while send stream is not ready") | ||||
|         } | ||||
| 
 | ||||
|         let s = Pin::new(self.stream.as_mut().unwrap()); | ||||
| 
 | ||||
|         let res = ready!(futures::io::AsyncWrite::poll_write(s, cx, buf.chunk())); | ||||
|         match res { | ||||
|             Ok(written) => { | ||||
|                 buf.advance(written); | ||||
|                 Poll::Ready(Ok(written)) | ||||
|             } | ||||
|             Err(err) => { | ||||
|                 // We are forced to use AsyncWrite for now because we cannot store
 | ||||
|                 // the result of a call to:
 | ||||
|                 // quinn::send_stream::write<'a>(&'a mut self, buf: &'a [u8]) -> Result<usize, WriteError>.
 | ||||
|                 //
 | ||||
|                 // This is why we have to unpack the error from io::Error instead of having it
 | ||||
|                 // returned directly. This should not panic as long as quinn's AsyncWrite impl
 | ||||
|                 // doesn't change.
 | ||||
|                 let err = err | ||||
|                     .into_inner() | ||||
|                     .expect("write stream returned an empty error") | ||||
|                     .downcast::<WriteError>() | ||||
|                     .expect("write stream returned an error which type is not WriteError"); | ||||
| 
 | ||||
|                 Poll::Ready(Err(SendStreamError::Write(*err))) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| /// The error type for [`SendStream`]
 | ||||
| ///
 | ||||
| /// Wraps errors that can happen writing to or polling a send stream.
 | ||||
|  | @ -514,6 +684,17 @@ pub enum SendStreamError { | |||
|     NotReady, | ||||
| } | ||||
| 
 | ||||
| impl From<SendStreamError> for std::io::Error { | ||||
|     fn from(value: SendStreamError) -> Self { | ||||
|         match value { | ||||
|             SendStreamError::Write(err) => err.into(), | ||||
|             SendStreamError::NotReady => { | ||||
|                 std::io::Error::new(std::io::ErrorKind::Other, "send stream is not ready") | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl std::error::Error for SendStreamError {} | ||||
| 
 | ||||
| impl Display for SendStreamError { | ||||
|  |  | |||
							
								
								
									
										2
									
								
								quinn
									
										
									
									
									
								
							
							
						
						
									
										2
									
								
								quinn
									
										
									
									
									
								
							|  | @ -1 +1 @@ | |||
| Subproject commit 98f5fe2a3fabb9ff991f8c831e8d43de76985ff3 | ||||
| Subproject commit 7914468e27621633a8399c8d02fbf3f557d54df2 | ||||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue
	
	 Jun Kurihara
				Jun Kurihara