Merge pull request #31 from junkurihara/rustls-0.21
feat: rustls-0.21 with some work around
This commit is contained in:
commit
90079b2e37
6 changed files with 595 additions and 7 deletions
3
.gitmodules
vendored
3
.gitmodules
vendored
|
|
@ -1,3 +1,6 @@
|
||||||
[submodule "h3"]
|
[submodule "h3"]
|
||||||
path = h3
|
path = h3
|
||||||
url = git@github.com:junkurihara/h3.git
|
url = git@github.com:junkurihara/h3.git
|
||||||
|
[submodule "quinn"]
|
||||||
|
path = quinn
|
||||||
|
url = git@github.com:junkurihara/quinn.git
|
||||||
|
|
|
||||||
13
Cargo.toml
13
Cargo.toml
|
|
@ -43,24 +43,27 @@ hyper = { version = "0.14.25", default-features = false, features = [
|
||||||
"http2",
|
"http2",
|
||||||
"stream",
|
"stream",
|
||||||
] }
|
] }
|
||||||
hyper-rustls = { version = "0.23.2", default-features = false, features = [
|
hyper-rustls = { version = "0.24.0", default-features = false, features = [
|
||||||
"tokio-runtime",
|
"tokio-runtime",
|
||||||
"webpki-tokio",
|
"webpki-tokio",
|
||||||
"http1",
|
"http1",
|
||||||
"http2",
|
"http2",
|
||||||
] }
|
] }
|
||||||
tokio-rustls = { version = "0.23.4", features = ["early-data"] }
|
tokio-rustls = { version = "0.24.0", features = ["early-data"] }
|
||||||
rustls-pemfile = "1.0.2"
|
rustls-pemfile = "1.0.2"
|
||||||
rustls = { version = "0.20.8", default-features = false }
|
rustls = { version = "0.21.0", default-features = false }
|
||||||
|
webpki = "0.22.0"
|
||||||
|
|
||||||
# logging
|
# logging
|
||||||
tracing = { version = "0.1.37" }
|
tracing = { version = "0.1.37" }
|
||||||
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
|
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
|
||||||
|
|
||||||
# http/3
|
# http/3
|
||||||
quinn = { version = "0.9.3", optional = true }
|
# quinn = { version = "0.9.3", optional = true }
|
||||||
|
quinn = { path = "./quinn/quinn", optional = true } # Tentative to support rustls-0.21
|
||||||
h3 = { path = "./h3/h3/", optional = true }
|
h3 = { path = "./h3/h3/", optional = true }
|
||||||
h3-quinn = { path = "./h3/h3-quinn/", optional = true }
|
# h3-quinn = { path = "./h3/h3-quinn/", optional = true }
|
||||||
|
h3-quinn = { path = "./h3-quinn/", optional = true } # Tentative to support rustls-0.21
|
||||||
|
|
||||||
|
|
||||||
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
[target.'cfg(not(target_env = "msvc"))'.dependencies]
|
||||||
|
|
|
||||||
21
h3-quinn/Cargo.toml
Normal file
21
h3-quinn/Cargo.toml
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
[package]
|
||||||
|
name = "h3-quinn"
|
||||||
|
version = "0.0.1"
|
||||||
|
rust-version = "1.59"
|
||||||
|
authors = ["Jean-Christophe BEGUE <jc.begue@pm.me>"]
|
||||||
|
edition = "2018"
|
||||||
|
documentation = "https://docs.rs/h3-quinn"
|
||||||
|
repository = "https://github.com/hyperium/h3"
|
||||||
|
readme = "../README.md"
|
||||||
|
description = "QUIC transport implementation based on Quinn."
|
||||||
|
keywords = ["http3", "quic", "h3"]
|
||||||
|
categories = ["network-programming", "web-programming"]
|
||||||
|
license = "MIT"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
h3 = { version = "0.0.1", path = "../h3/h3" }
|
||||||
|
bytes = "1"
|
||||||
|
quinn = { path = "../quinn/quinn/", default-features = false }
|
||||||
|
quinn-proto = { path = "../quinn/quinn-proto/", default-features = false }
|
||||||
|
tokio-util = { version = "0.7.7" }
|
||||||
|
futures = { version = "0.3.27" }
|
||||||
559
h3-quinn/src/lib.rs
Normal file
559
h3-quinn/src/lib.rs
Normal file
|
|
@ -0,0 +1,559 @@
|
||||||
|
//! QUIC Transport implementation with Quinn
|
||||||
|
//!
|
||||||
|
//! This module implements QUIC traits with Quinn.
|
||||||
|
#![deny(missing_docs)]
|
||||||
|
|
||||||
|
use std::{
|
||||||
|
convert::TryInto,
|
||||||
|
fmt::{self, Display},
|
||||||
|
future::Future,
|
||||||
|
sync::Arc,
|
||||||
|
task::{self, Poll},
|
||||||
|
};
|
||||||
|
|
||||||
|
use bytes::{Buf, Bytes};
|
||||||
|
|
||||||
|
use futures::{
|
||||||
|
ready,
|
||||||
|
stream::{self, BoxStream},
|
||||||
|
StreamExt,
|
||||||
|
};
|
||||||
|
pub use quinn::{
|
||||||
|
self, crypto::Session, AcceptBi, AcceptUni, Endpoint, OpenBi, OpenUni, VarInt, WriteError,
|
||||||
|
};
|
||||||
|
|
||||||
|
use h3::quic::{self, Error, StreamId, WriteBuf};
|
||||||
|
use tokio_util::sync::ReusableBoxFuture;
|
||||||
|
|
||||||
|
/// A QUIC connection backed by Quinn
|
||||||
|
///
|
||||||
|
/// Implements a [`quic::Connection`] backed by a [`quinn::Connection`].
|
||||||
|
pub struct Connection {
|
||||||
|
conn: quinn::Connection,
|
||||||
|
incoming_bi: BoxStream<'static, <AcceptBi<'static> as Future>::Output>,
|
||||||
|
opening_bi: Option<BoxStream<'static, <OpenBi<'static> as Future>::Output>>,
|
||||||
|
incoming_uni: BoxStream<'static, <AcceptUni<'static> as Future>::Output>,
|
||||||
|
opening_uni: Option<BoxStream<'static, <OpenUni<'static> as Future>::Output>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Connection {
|
||||||
|
/// Create a [`Connection`] from a [`quinn::NewConnection`]
|
||||||
|
pub fn new(conn: quinn::Connection) -> Self {
|
||||||
|
Self {
|
||||||
|
conn: conn.clone(),
|
||||||
|
incoming_bi: Box::pin(stream::unfold(conn.clone(), |conn| async {
|
||||||
|
Some((conn.accept_bi().await, conn))
|
||||||
|
})),
|
||||||
|
opening_bi: None,
|
||||||
|
incoming_uni: Box::pin(stream::unfold(conn, |conn| async {
|
||||||
|
Some((conn.accept_uni().await, conn))
|
||||||
|
})),
|
||||||
|
opening_uni: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The error type for [`Connection`]
|
||||||
|
///
|
||||||
|
/// Wraps reasons a Quinn connection might be lost.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ConnectionError(quinn::ConnectionError);
|
||||||
|
|
||||||
|
impl std::error::Error for ConnectionError {}
|
||||||
|
|
||||||
|
impl fmt::Display for ConnectionError {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
self.0.fmt(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Error for ConnectionError {
|
||||||
|
fn is_timeout(&self) -> bool {
|
||||||
|
matches!(self.0, quinn::ConnectionError::TimedOut)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn err_code(&self) -> Option<u64> {
|
||||||
|
match self.0 {
|
||||||
|
quinn::ConnectionError::ApplicationClosed(quinn_proto::ApplicationClose {
|
||||||
|
error_code,
|
||||||
|
..
|
||||||
|
}) => Some(error_code.into_inner()),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<quinn::ConnectionError> for ConnectionError {
|
||||||
|
fn from(e: quinn::ConnectionError) -> Self {
|
||||||
|
Self(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> quic::Connection<B> for Connection
|
||||||
|
where
|
||||||
|
B: Buf,
|
||||||
|
{
|
||||||
|
type SendStream = SendStream<B>;
|
||||||
|
type RecvStream = RecvStream;
|
||||||
|
type BidiStream = BidiStream<B>;
|
||||||
|
type OpenStreams = OpenStreams;
|
||||||
|
type Error = ConnectionError;
|
||||||
|
|
||||||
|
fn poll_accept_bidi(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut task::Context<'_>,
|
||||||
|
) -> Poll<Result<Option<Self::BidiStream>, Self::Error>> {
|
||||||
|
let (send, recv) = match ready!(self.incoming_bi.poll_next_unpin(cx)) {
|
||||||
|
Some(x) => x?,
|
||||||
|
None => return Poll::Ready(Ok(None)),
|
||||||
|
};
|
||||||
|
Poll::Ready(Ok(Some(Self::BidiStream {
|
||||||
|
send: Self::SendStream::new(send),
|
||||||
|
recv: Self::RecvStream::new(recv),
|
||||||
|
})))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_accept_recv(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut task::Context<'_>,
|
||||||
|
) -> Poll<Result<Option<Self::RecvStream>, Self::Error>> {
|
||||||
|
let recv = match ready!(self.incoming_uni.poll_next_unpin(cx)) {
|
||||||
|
Some(x) => x?,
|
||||||
|
None => return Poll::Ready(Ok(None)),
|
||||||
|
};
|
||||||
|
Poll::Ready(Ok(Some(Self::RecvStream::new(recv))))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_open_bidi(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut task::Context<'_>,
|
||||||
|
) -> Poll<Result<Self::BidiStream, Self::Error>> {
|
||||||
|
if self.opening_bi.is_none() {
|
||||||
|
self.opening_bi = Some(Box::pin(stream::unfold(self.conn.clone(), |conn| async {
|
||||||
|
Some((conn.clone().open_bi().await, conn))
|
||||||
|
})));
|
||||||
|
}
|
||||||
|
|
||||||
|
let (send, recv) =
|
||||||
|
ready!(self.opening_bi.as_mut().unwrap().poll_next_unpin(cx)).unwrap()?;
|
||||||
|
Poll::Ready(Ok(Self::BidiStream {
|
||||||
|
send: Self::SendStream::new(send),
|
||||||
|
recv: Self::RecvStream::new(recv),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_open_send(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut task::Context<'_>,
|
||||||
|
) -> Poll<Result<Self::SendStream, Self::Error>> {
|
||||||
|
if self.opening_uni.is_none() {
|
||||||
|
self.opening_uni = Some(Box::pin(stream::unfold(self.conn.clone(), |conn| async {
|
||||||
|
Some((conn.open_uni().await, conn))
|
||||||
|
})));
|
||||||
|
}
|
||||||
|
|
||||||
|
let send = ready!(self.opening_uni.as_mut().unwrap().poll_next_unpin(cx)).unwrap()?;
|
||||||
|
Poll::Ready(Ok(Self::SendStream::new(send)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn opener(&self) -> Self::OpenStreams {
|
||||||
|
OpenStreams {
|
||||||
|
conn: self.conn.clone(),
|
||||||
|
opening_bi: None,
|
||||||
|
opening_uni: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close(&mut self, code: h3::error::Code, reason: &[u8]) {
|
||||||
|
self.conn.close(
|
||||||
|
VarInt::from_u64(code.value()).expect("error code VarInt"),
|
||||||
|
reason,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Stream opener backed by a Quinn connection
|
||||||
|
///
|
||||||
|
/// Implements [`quic::OpenStreams`] using [`quinn::Connection`],
|
||||||
|
/// [`quinn::OpenBi`], [`quinn::OpenUni`].
|
||||||
|
pub struct OpenStreams {
|
||||||
|
conn: quinn::Connection,
|
||||||
|
opening_bi: Option<BoxStream<'static, <OpenBi<'static> as Future>::Output>>,
|
||||||
|
opening_uni: Option<BoxStream<'static, <OpenUni<'static> as Future>::Output>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> quic::OpenStreams<B> for OpenStreams
|
||||||
|
where
|
||||||
|
B: Buf,
|
||||||
|
{
|
||||||
|
type RecvStream = RecvStream;
|
||||||
|
type SendStream = SendStream<B>;
|
||||||
|
type BidiStream = BidiStream<B>;
|
||||||
|
type Error = ConnectionError;
|
||||||
|
|
||||||
|
fn poll_open_bidi(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut task::Context<'_>,
|
||||||
|
) -> Poll<Result<Self::BidiStream, Self::Error>> {
|
||||||
|
if self.opening_bi.is_none() {
|
||||||
|
self.opening_bi = Some(Box::pin(stream::unfold(self.conn.clone(), |conn| async {
|
||||||
|
Some((conn.open_bi().await, conn))
|
||||||
|
})));
|
||||||
|
}
|
||||||
|
|
||||||
|
let (send, recv) =
|
||||||
|
ready!(self.opening_bi.as_mut().unwrap().poll_next_unpin(cx)).unwrap()?;
|
||||||
|
Poll::Ready(Ok(Self::BidiStream {
|
||||||
|
send: Self::SendStream::new(send),
|
||||||
|
recv: Self::RecvStream::new(recv),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_open_send(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut task::Context<'_>,
|
||||||
|
) -> Poll<Result<Self::SendStream, Self::Error>> {
|
||||||
|
if self.opening_uni.is_none() {
|
||||||
|
self.opening_uni = Some(Box::pin(stream::unfold(self.conn.clone(), |conn| async {
|
||||||
|
Some((conn.open_uni().await, conn))
|
||||||
|
})));
|
||||||
|
}
|
||||||
|
|
||||||
|
let send = ready!(self.opening_uni.as_mut().unwrap().poll_next_unpin(cx)).unwrap()?;
|
||||||
|
Poll::Ready(Ok(Self::SendStream::new(send)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close(&mut self, code: h3::error::Code, reason: &[u8]) {
|
||||||
|
self.conn.close(
|
||||||
|
VarInt::from_u64(code.value()).expect("error code VarInt"),
|
||||||
|
reason,
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Clone for OpenStreams {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
Self {
|
||||||
|
conn: self.conn.clone(),
|
||||||
|
opening_bi: None,
|
||||||
|
opening_uni: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Quinn-backed bidirectional stream
|
||||||
|
///
|
||||||
|
/// Implements [`quic::BidiStream`] which allows the stream to be split
|
||||||
|
/// into two structs each implementing one direction.
|
||||||
|
pub struct BidiStream<B>
|
||||||
|
where
|
||||||
|
B: Buf,
|
||||||
|
{
|
||||||
|
send: SendStream<B>,
|
||||||
|
recv: RecvStream,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> quic::BidiStream<B> for BidiStream<B>
|
||||||
|
where
|
||||||
|
B: Buf,
|
||||||
|
{
|
||||||
|
type SendStream = SendStream<B>;
|
||||||
|
type RecvStream = RecvStream;
|
||||||
|
|
||||||
|
fn split(self) -> (Self::SendStream, Self::RecvStream) {
|
||||||
|
(self.send, self.recv)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> quic::RecvStream for BidiStream<B>
|
||||||
|
where
|
||||||
|
B: Buf,
|
||||||
|
{
|
||||||
|
type Buf = Bytes;
|
||||||
|
type Error = ReadError;
|
||||||
|
|
||||||
|
fn poll_data(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut task::Context<'_>,
|
||||||
|
) -> Poll<Result<Option<Self::Buf>, Self::Error>> {
|
||||||
|
self.recv.poll_data(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stop_sending(&mut self, error_code: u64) {
|
||||||
|
self.recv.stop_sending(error_code)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> quic::SendStream<B> for BidiStream<B>
|
||||||
|
where
|
||||||
|
B: Buf,
|
||||||
|
{
|
||||||
|
type Error = SendStreamError;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.send.poll_ready(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.send.poll_finish(cx)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset(&mut self, reset_code: u64) {
|
||||||
|
self.send.reset(reset_code)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
|
||||||
|
self.send.send_data(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn id(&self) -> StreamId {
|
||||||
|
self.send.id()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Quinn-backed receive stream
|
||||||
|
///
|
||||||
|
/// Implements a [`quic::RecvStream`] backed by a [`quinn::RecvStream`].
|
||||||
|
pub struct RecvStream {
|
||||||
|
stream: Option<quinn::RecvStream>,
|
||||||
|
read_chunk_fut: ReadChunkFuture,
|
||||||
|
}
|
||||||
|
|
||||||
|
type ReadChunkFuture = ReusableBoxFuture<
|
||||||
|
'static,
|
||||||
|
(
|
||||||
|
quinn::RecvStream,
|
||||||
|
Result<Option<quinn::Chunk>, quinn::ReadError>,
|
||||||
|
),
|
||||||
|
>;
|
||||||
|
|
||||||
|
impl RecvStream {
|
||||||
|
fn new(stream: quinn::RecvStream) -> Self {
|
||||||
|
Self {
|
||||||
|
stream: Some(stream),
|
||||||
|
// Should only allocate once the first time it's used
|
||||||
|
read_chunk_fut: ReusableBoxFuture::new(async { unreachable!() }),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl quic::RecvStream for RecvStream {
|
||||||
|
type Buf = Bytes;
|
||||||
|
type Error = ReadError;
|
||||||
|
|
||||||
|
fn poll_data(
|
||||||
|
&mut self,
|
||||||
|
cx: &mut task::Context<'_>,
|
||||||
|
) -> Poll<Result<Option<Self::Buf>, Self::Error>> {
|
||||||
|
if let Some(mut stream) = self.stream.take() {
|
||||||
|
self.read_chunk_fut.set(async move {
|
||||||
|
let chunk = stream.read_chunk(usize::MAX, true).await;
|
||||||
|
(stream, chunk)
|
||||||
|
})
|
||||||
|
};
|
||||||
|
|
||||||
|
let (stream, chunk) = ready!(self.read_chunk_fut.poll(cx));
|
||||||
|
self.stream = Some(stream);
|
||||||
|
Poll::Ready(Ok(chunk?.map(|c| c.bytes)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stop_sending(&mut self, error_code: u64) {
|
||||||
|
self.stream
|
||||||
|
.as_mut()
|
||||||
|
.unwrap()
|
||||||
|
.stop(VarInt::from_u64(error_code).expect("invalid error_code"))
|
||||||
|
.ok();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The error type for [`RecvStream`]
|
||||||
|
///
|
||||||
|
/// Wraps errors that occur when reading from a receive stream.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct ReadError(quinn::ReadError);
|
||||||
|
|
||||||
|
impl std::error::Error for ReadError {}
|
||||||
|
|
||||||
|
impl fmt::Display for ReadError {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
self.0.fmt(f)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<ReadError> for Arc<dyn Error> {
|
||||||
|
fn from(e: ReadError) -> Self {
|
||||||
|
Arc::new(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<quinn::ReadError> for ReadError {
|
||||||
|
fn from(e: quinn::ReadError) -> Self {
|
||||||
|
Self(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Error for ReadError {
|
||||||
|
fn is_timeout(&self) -> bool {
|
||||||
|
matches!(
|
||||||
|
self.0,
|
||||||
|
quinn::ReadError::ConnectionLost(quinn::ConnectionError::TimedOut)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn err_code(&self) -> Option<u64> {
|
||||||
|
match self.0 {
|
||||||
|
quinn::ReadError::ConnectionLost(quinn::ConnectionError::ApplicationClosed(
|
||||||
|
quinn_proto::ApplicationClose { error_code, .. },
|
||||||
|
)) => Some(error_code.into_inner()),
|
||||||
|
quinn::ReadError::Reset(error_code) => Some(error_code.into_inner()),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Quinn-backed send stream
|
||||||
|
///
|
||||||
|
/// Implements a [`quic::SendStream`] backed by a [`quinn::SendStream`].
|
||||||
|
pub struct SendStream<B: Buf> {
|
||||||
|
stream: Option<quinn::SendStream>,
|
||||||
|
writing: Option<WriteBuf<B>>,
|
||||||
|
write_fut: WriteFuture,
|
||||||
|
}
|
||||||
|
|
||||||
|
type WriteFuture =
|
||||||
|
ReusableBoxFuture<'static, (quinn::SendStream, Result<usize, quinn::WriteError>)>;
|
||||||
|
|
||||||
|
impl<B> SendStream<B>
|
||||||
|
where
|
||||||
|
B: Buf,
|
||||||
|
{
|
||||||
|
fn new(stream: quinn::SendStream) -> SendStream<B> {
|
||||||
|
Self {
|
||||||
|
stream: Some(stream),
|
||||||
|
writing: None,
|
||||||
|
write_fut: ReusableBoxFuture::new(async { unreachable!() }),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<B> quic::SendStream<B> for SendStream<B>
|
||||||
|
where
|
||||||
|
B: Buf,
|
||||||
|
{
|
||||||
|
type Error = SendStreamError;
|
||||||
|
|
||||||
|
fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
if let Some(ref mut data) = self.writing {
|
||||||
|
while data.has_remaining() {
|
||||||
|
if let Some(mut stream) = self.stream.take() {
|
||||||
|
let chunk = data.chunk().to_owned(); // FIXME - avoid copy
|
||||||
|
self.write_fut.set(async move {
|
||||||
|
let ret = stream.write(&chunk).await;
|
||||||
|
(stream, ret)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let (stream, res) = ready!(self.write_fut.poll(cx));
|
||||||
|
self.stream = Some(stream);
|
||||||
|
match res {
|
||||||
|
Ok(cnt) => data.advance(cnt),
|
||||||
|
Err(err) => {
|
||||||
|
return Poll::Ready(Err(SendStreamError::Write(err)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.writing = None;
|
||||||
|
Poll::Ready(Ok(()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn poll_finish(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), Self::Error>> {
|
||||||
|
self.stream
|
||||||
|
.as_mut()
|
||||||
|
.unwrap()
|
||||||
|
.poll_finish(cx)
|
||||||
|
.map_err(Into::into)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reset(&mut self, reset_code: u64) {
|
||||||
|
let _ = self
|
||||||
|
.stream
|
||||||
|
.as_mut()
|
||||||
|
.unwrap()
|
||||||
|
.reset(VarInt::from_u64(reset_code).unwrap_or(VarInt::MAX));
|
||||||
|
}
|
||||||
|
|
||||||
|
fn send_data<D: Into<WriteBuf<B>>>(&mut self, data: D) -> Result<(), Self::Error> {
|
||||||
|
if self.writing.is_some() {
|
||||||
|
return Err(Self::Error::NotReady);
|
||||||
|
}
|
||||||
|
self.writing = Some(data.into());
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn id(&self) -> StreamId {
|
||||||
|
self.stream
|
||||||
|
.as_ref()
|
||||||
|
.unwrap()
|
||||||
|
.id()
|
||||||
|
.0
|
||||||
|
.try_into()
|
||||||
|
.expect("invalid stream id")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The error type for [`SendStream`]
|
||||||
|
///
|
||||||
|
/// Wraps errors that can happen writing to or polling a send stream.
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum SendStreamError {
|
||||||
|
/// Errors when writing, wrapping a [`quinn::WriteError`]
|
||||||
|
Write(WriteError),
|
||||||
|
/// Error when the stream is not ready, because it is still sending
|
||||||
|
/// data from a previous call
|
||||||
|
NotReady,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl std::error::Error for SendStreamError {}
|
||||||
|
|
||||||
|
impl Display for SendStreamError {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "{:?}", self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<WriteError> for SendStreamError {
|
||||||
|
fn from(e: WriteError) -> Self {
|
||||||
|
Self::Write(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Error for SendStreamError {
|
||||||
|
fn is_timeout(&self) -> bool {
|
||||||
|
matches!(
|
||||||
|
self,
|
||||||
|
Self::Write(quinn::WriteError::ConnectionLost(
|
||||||
|
quinn::ConnectionError::TimedOut
|
||||||
|
))
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn err_code(&self) -> Option<u64> {
|
||||||
|
match self {
|
||||||
|
Self::Write(quinn::WriteError::Stopped(error_code)) => Some(error_code.into_inner()),
|
||||||
|
Self::Write(quinn::WriteError::ConnectionLost(
|
||||||
|
quinn::ConnectionError::ApplicationClosed(quinn_proto::ApplicationClose {
|
||||||
|
error_code,
|
||||||
|
..
|
||||||
|
}),
|
||||||
|
)) => Some(error_code.into_inner()),
|
||||||
|
_ => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<SendStreamError> for Arc<dyn Error> {
|
||||||
|
fn from(e: SendStreamError) -> Self {
|
||||||
|
Arc::new(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
1
quinn
Submodule
1
quinn
Submodule
|
|
@ -0,0 +1 @@
|
||||||
|
Subproject commit b56d60bbec577d73e67abbba60ed389f0589f208
|
||||||
|
|
@ -168,7 +168,8 @@ impl Backend {
|
||||||
let owned_trust_anchors: Vec<_> = certs
|
let owned_trust_anchors: Vec<_> = certs
|
||||||
.iter()
|
.iter()
|
||||||
.map(|v| {
|
.map(|v| {
|
||||||
let trust_anchor = tokio_rustls::webpki::TrustAnchor::try_from_cert_der(&v.0).unwrap();
|
// let trust_anchor = tokio_rustls::webpki::TrustAnchor::try_from_cert_der(&v.0).unwrap();
|
||||||
|
let trust_anchor = webpki::TrustAnchor::try_from_cert_der(&v.0).unwrap();
|
||||||
rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
|
rustls::OwnedTrustAnchor::from_subject_spki_name_constraints(
|
||||||
trust_anchor.subject,
|
trust_anchor.subject,
|
||||||
trust_anchor.spki,
|
trust_anchor.spki,
|
||||||
|
|
@ -279,7 +280,7 @@ impl Backends {
|
||||||
let client_certs_verifier = rustls::server::AllowAnyAuthenticatedClient::new(client_ca_roots_local);
|
let client_certs_verifier = rustls::server::AllowAnyAuthenticatedClient::new(client_ca_roots_local);
|
||||||
ServerConfig::builder()
|
ServerConfig::builder()
|
||||||
.with_safe_defaults()
|
.with_safe_defaults()
|
||||||
.with_client_cert_verifier(client_certs_verifier)
|
.with_client_cert_verifier(Arc::new(client_certs_verifier))
|
||||||
.with_cert_resolver(Arc::new(resolver_local))
|
.with_cert_resolver(Arc::new(resolver_local))
|
||||||
};
|
};
|
||||||
server_config_local.alpn_protocols.push(b"h2".to_vec());
|
server_config_local.alpn_protocols.push(b"h2".to_vec());
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue