From 6aa503c745905c2c4c9fea0e8bb367a10afe303e Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Fri, 31 Mar 2023 14:10:45 +0900 Subject: [PATCH] apply hyperium/h3 update --- Cargo.toml | 8 +- h3 | 2 +- h3-quinn/.gitignore | 2 - h3-quinn/Cargo.toml | 17 -- h3-quinn/src/lib.rs | 405 ----------------------------------------- src/proxy/proxy_tls.rs | 15 +- 6 files changed, 13 insertions(+), 436 deletions(-) delete mode 100644 h3-quinn/.gitignore delete mode 100644 h3-quinn/Cargo.toml delete mode 100644 h3-quinn/src/lib.rs diff --git a/Cargo.toml b/Cargo.toml index 776ac0c..f9f1061 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -18,8 +18,8 @@ http3 = ["quinn", "h3", "h3-quinn"] [dependencies] env_logger = "0.10.0" anyhow = "1.0.70" -clap = { version = "4.1.14", features = ["std", "cargo", "wrap_help"] } -futures = { version = "0.3.27", features = ["alloc", "async-await"] } +clap = { version = "4.2.1", features = ["std", "cargo", "wrap_help"] } +futures = { version = "0.3.28", features = ["alloc", "async-await"] } hyper = { version = "0.14.25", default-features = false, features = [ "server", "http1", @@ -41,7 +41,7 @@ rustls = { version = "0.20.8", default-features = false } rand = "0.8.5" toml = { version = "0.7.3", default-features = false, features = ["parse"] } rustc-hash = "1.1.0" -serde = { version = "1.0.158", default-features = false, features = ["derive"] } +serde = { version = "1.0.159", default-features = false, features = ["derive"] } hyper-rustls = { version = "0.23.2", default-features = false, features = [ "tokio-runtime", "webpki-tokio", @@ -49,7 +49,7 @@ hyper-rustls = { version = "0.23.2", default-features = false, features = [ "http2", ] } bytes = "1.4.0" -quinn = { version = "0.8.5", optional = true } +quinn = { version = "0.9.3", optional = true } h3 = { path = "./h3/h3/", optional = true } h3-quinn = { path = "./h3/h3-quinn/", optional = true } thiserror = "1.0.40" diff --git a/h3 b/h3 index d34fd56..49301f1 160000 --- a/h3 +++ b/h3 @@ -1 +1 @@ -Subproject commit d34fd56f9d787e9f796958eb438ce93e6d72dc39 +Subproject commit 49301f18e15d3acffc2a8d8bea1a8038c5f3fe6d diff --git a/h3-quinn/.gitignore b/h3-quinn/.gitignore deleted file mode 100644 index 1e7caa9..0000000 --- a/h3-quinn/.gitignore +++ /dev/null @@ -1,2 +0,0 @@ -Cargo.lock -target/ diff --git a/h3-quinn/Cargo.toml b/h3-quinn/Cargo.toml deleted file mode 100644 index 9c28e3e..0000000 --- a/h3-quinn/Cargo.toml +++ /dev/null @@ -1,17 +0,0 @@ -[package] -name = "h3-quinn" -version = "0.0.0" -authors = ["Original from hyperium/h3", "Jun Kurihara"] -edition = "2021" -publish = false - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -h3 = { path = "../h3/h3/" } -bytes = "1.3.0" -futures-util = { version = "0.3.25", default-features = false, features = [ - "io", -] } -quinn = { version = "0.9.3", default-features = false } -quinn-proto = { version = "0.9.2", default-features = false } diff --git a/h3-quinn/src/lib.rs b/h3-quinn/src/lib.rs deleted file mode 100644 index 26f1327..0000000 --- a/h3-quinn/src/lib.rs +++ /dev/null @@ -1,405 +0,0 @@ -//! [`h3::quic`] traits implemented with Quinn -//! Copied from the original pull request at `hyperium/h3`: https://github.com/hyperium/h3/pull/145. -//! Currently maintained by Jun. - -#![deny(missing_docs)] - -use std::{ - convert::TryInto, - fmt::{self, Display}, - sync::Arc, - task::{Context, Poll}, -}; - -use bytes::{Buf, Bytes}; -use futures_util::{ready, FutureExt}; -use h3::quic::{self, Error, StreamId, WriteBuf}; -use quinn::VarInt; - -pub use quinn::{self}; - -/// QUIC connection -/// -/// A [`quic::Connection`] backed by [`quinn::Connection`]. -pub struct Connection { - conn: quinn::Connection, -} - -impl Connection { - /// Create a [`Connection`] from a [`quinn::Connection`] - pub fn new(conn: quinn::Connection) -> Self { - Self { conn } - } -} - -impl quic::Connection for Connection { - type OpenStreams = OpenStreams; - type BidiStream = BidiStream; - type SendStream = SendStream; - type RecvStream = RecvStream; - type Error = ConnectionError; - - fn poll_accept_bidi(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error>> { - Poll::Ready(match ready!(Box::pin(self.conn.accept_bi()).poll_unpin(cx)) { - Ok((send, recv)) => Ok(Some(Self::BidiStream::new( - Self::SendStream::new(send), - Self::RecvStream::new(recv), - ))), - Err(e) => Err(ConnectionError(e)), - }) - } - - fn poll_accept_recv(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error>> { - Poll::Ready(match ready!(Box::pin(self.conn.accept_uni()).poll_unpin(cx)) { - Ok(recv) => Ok(Some(Self::RecvStream::new(recv))), - Err(e) => Err(ConnectionError(e)), - }) - } - - fn poll_open_bidi(&mut self, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(match ready!(Box::pin(self.conn.open_bi()).poll_unpin(cx)) { - Ok((send, recv)) => Ok(Self::BidiStream::new( - Self::SendStream::new(send), - Self::RecvStream::new(recv), - )), - Err(e) => Err(ConnectionError(e)), - }) - } - - fn poll_open_send(&mut self, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(match ready!(Box::pin(self.conn.open_uni()).poll_unpin(cx)) { - Ok(send) => Ok(Self::SendStream::new(send)), - Err(e) => Err(ConnectionError(e)), - }) - } - - fn opener(&self) -> Self::OpenStreams { - Self::OpenStreams { - conn: self.conn.clone(), - } - } - - fn close(&mut self, code: h3::error::Code, reason: &[u8]) { - self - .conn - .close(VarInt::from_u64(code.value()).expect("Invalid error code"), reason); - } -} - -/// Stream opener -/// -/// Implements [`quic::OpenStreams`]. -pub struct OpenStreams { - conn: quinn::Connection, -} - -impl quic::OpenStreams for OpenStreams { - type BidiStream = BidiStream; - type SendStream = SendStream; - type RecvStream = RecvStream; - type Error = ConnectionError; - - fn poll_open_bidi(&mut self, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(match ready!(Box::pin(self.conn.open_bi()).poll_unpin(cx)) { - Ok((send, recv)) => Ok(Self::BidiStream::new( - Self::SendStream::new(send), - Self::RecvStream::new(recv), - )), - Err(e) => Err(ConnectionError(e)), - }) - } - - fn poll_open_send(&mut self, cx: &mut Context<'_>) -> Poll> { - Poll::Ready(match ready!(Box::pin(self.conn.open_uni()).poll_unpin(cx)) { - Ok(send) => Ok(Self::SendStream::new(send)), - Err(e) => Err(ConnectionError(e)), - }) - } - - fn close(&mut self, code: h3::error::Code, reason: &[u8]) { - self - .conn - .close(VarInt::from_u64(code.value()).expect("Invalid error code"), reason); - } -} - -impl Clone for OpenStreams { - fn clone(&self) -> Self { - Self { - conn: self.conn.clone(), - } - } -} - -/// Stream that can be used to send and receive data -/// -/// A [`quic::BidiStream`], which can be split into one send-only and -/// one receive-only stream. -pub struct BidiStream { - send: SendStream, - recv: RecvStream, -} - -impl BidiStream { - fn new(send: SendStream, recv: RecvStream) -> Self { - Self { send, recv } - } -} - -impl quic::BidiStream for BidiStream { - type SendStream = SendStream; - type RecvStream = RecvStream; - - fn split(self) -> (Self::SendStream, Self::RecvStream) { - (self.send, self.recv) - } -} - -impl quic::SendStream for BidiStream { - type Error = SendError; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - self.send.poll_ready(cx) - } - - fn send_data>>(&mut self, data: T) -> Result<(), Self::Error> { - self.send.send_data(data) - } - - fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll> { - self.send.poll_finish(cx) - } - - fn reset(&mut self, reset_code: u64) { - self.send.reset(reset_code) - } - - fn id(&self) -> StreamId { - self.send.id() - } -} - -impl quic::RecvStream for BidiStream { - type Buf = Bytes; - type Error = RecvError; - - fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error>> { - self.recv.poll_data(cx) - } - - fn stop_sending(&mut self, err_code: u64) { - self.recv.stop_sending(err_code) - } -} - -/// Send-only stream -/// -/// A [`quic::SendStream`] backed by [`quinn::SendStream`]. -pub struct SendStream { - stream: quinn::SendStream, - writing: Option>, -} - -impl SendStream { - fn new(stream: quinn::SendStream) -> Self { - Self { stream, writing: None } - } -} - -impl quic::SendStream for SendStream { - type Error = SendError; - - fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll> { - if let Some(ref mut data) = self.writing { - while data.has_remaining() { - match ready!({ - let mut write_fut = Box::pin(self.stream.write(data.chunk())); - write_fut.poll_unpin(cx) - }) { - Ok(cnt) => data.advance(cnt), - Err(e) => return Poll::Ready(Err(Self::Error::Write(e))), - } - } - } - - self.writing = None; - Poll::Ready(Ok(())) - } - - fn send_data>>(&mut self, data: T) -> Result<(), Self::Error> { - match self.writing { - Some(_) => Err(Self::Error::NotReady), - None => { - self.writing = Some(data.into()); - Ok(()) - } - } - } - - fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll> { - Box::pin(self.stream.finish()).poll_unpin(cx).map_err(Into::into) - } - - fn reset(&mut self, reset_code: u64) { - let _ = self.stream.reset(VarInt::from_u64(reset_code).unwrap_or(VarInt::MAX)); - } - - fn id(&self) -> StreamId { - self.stream.id().0.try_into().expect("Invalid stream id") - } -} - -/// Receive-only stream -/// -/// A [`quic::RecvStream`] backed by [`quinn::RecvStream`]. -pub struct RecvStream { - stream: quinn::RecvStream, -} - -impl RecvStream { - fn new(stream: quinn::RecvStream) -> Self { - Self { stream } - } -} - -impl quic::RecvStream for RecvStream { - type Buf = Bytes; - type Error = RecvError; - - fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll, Self::Error>> { - let data = ready!(Box::pin(self.stream.read_chunk(usize::MAX, true)).poll_unpin(cx))?; - Poll::Ready(Ok(data.map(|ch| ch.bytes))) - } - - fn stop_sending(&mut self, err_code: u64) { - let _ = self - .stream - .stop(VarInt::from_u64(err_code).expect("Invalid error code")); - } -} - -/// The error type for [`quic::Connection::Error`] -/// -/// Used by [`Connection`]. -#[derive(Debug)] -pub struct ConnectionError(quinn::ConnectionError); - -impl Error for ConnectionError { - fn is_timeout(&self) -> bool { - matches!(self.0, quinn::ConnectionError::TimedOut) - } - - fn err_code(&self) -> Option { - match self.0 { - quinn::ConnectionError::ApplicationClosed(quinn_proto::ApplicationClose { error_code, .. }) => { - Some(error_code.into_inner()) - } - _ => None, - } - } -} - -impl std::error::Error for ConnectionError {} - -impl Display for ConnectionError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} - -impl From for ConnectionError { - fn from(e: quinn::ConnectionError) -> Self { - Self(e) - } -} - -/// The error type for [`quic::SendStream::Error`] -/// -/// Used by [`SendStream`] and [`BidiStream`]. -#[derive(Debug)] -pub enum SendError { - /// For write errors, wrapping a [`quinn::WriteError`] - Write(quinn::WriteError), - /// For trying to send when stream is not ready, because it is - /// still sending data from the previous call - NotReady, -} - -impl Error for SendError { - fn is_timeout(&self) -> bool { - matches!( - self, - Self::Write(quinn::WriteError::ConnectionLost(quinn::ConnectionError::TimedOut)) - ) - } - - fn err_code(&self) -> Option { - match self { - Self::Write(quinn::WriteError::Stopped(error_code)) => Some(error_code.into_inner()), - Self::Write(quinn::WriteError::ConnectionLost(quinn::ConnectionError::ApplicationClosed( - quinn_proto::ApplicationClose { error_code, .. }, - ))) => Some(error_code.into_inner()), - _ => None, - } - } -} - -impl std::error::Error for SendError {} - -impl Display for SendError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "{:?}", self) - } -} - -impl From for SendError { - fn from(e: quinn::WriteError) -> Self { - Self::Write(e) - } -} - -/// The error type for [`quic::RecvStream::Error`] -/// -/// Used by [`RecvStream`] and [`BidiStream`]. -#[derive(Debug)] -pub struct RecvError(quinn::ReadError); - -impl Error for RecvError { - fn is_timeout(&self) -> bool { - matches!( - self.0, - quinn::ReadError::ConnectionLost(quinn::ConnectionError::TimedOut), - ) - } - - fn err_code(&self) -> Option { - match self.0 { - quinn::ReadError::ConnectionLost(quinn::ConnectionError::ApplicationClosed(quinn_proto::ApplicationClose { - error_code, - .. - })) => Some(error_code.into_inner()), - quinn::ReadError::Reset(error_code) => Some(error_code.into_inner()), - _ => None, - } - } -} - -impl std::error::Error for RecvError {} - -impl fmt::Display for RecvError { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.0.fmt(f) - } -} - -impl From for Arc { - fn from(e: RecvError) -> Self { - Arc::new(e) - } -} - -impl From for RecvError { - fn from(e: quinn::ReadError) -> Self { - Self(e) - } -} diff --git a/src/proxy/proxy_tls.rs b/src/proxy/proxy_tls.rs index 8242d19..a3ed081 100644 --- a/src/proxy/proxy_tls.rs +++ b/src/proxy/proxy_tls.rs @@ -7,7 +7,6 @@ use crate::{ utils::BytesName, }; #[cfg(feature = "http3")] -use futures::StreamExt; use hyper::{client::connect::Connect, server::conn::Http}; #[cfg(feature = "http3")] use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerConfig, TransportConfig}; @@ -61,7 +60,7 @@ where // spawns async handshake to avoid blocking thread by sequential handshake. let handshake_fut = async move { - let acceptor = tokio_rustls::LazyConfigAcceptor::new(rustls::server::Acceptor::default(), raw_stream).await; + let acceptor = tokio_rustls::LazyConfigAcceptor::new(tokio_rustls::rustls::server::Acceptor::default(), raw_stream).await; if let Err(e) = acceptor { return Err(RpxyError::Proxy(format!("Failed to handshake TLS: {e}"))); } @@ -122,9 +121,11 @@ where info!("Start UDP proxy serving with HTTP/3 request for configured host names"); // first set as null config server let rustls_server_config = ServerConfig::builder() - .with_safe_defaults() + .with_safe_default_cipher_suites() + .with_safe_default_kx_groups() + .with_protocol_versions(&[&rustls::version::TLS13])? .with_no_client_auth() - .with_cert_resolver(Arc::new(tokio_rustls::rustls::server::ResolvesServerCertUsingSni::new())); + .with_cert_resolver(Arc::new(rustls::server::ResolvesServerCertUsingSni::new())); let mut transport_config_quic = TransportConfig::default(); transport_config_quic @@ -135,16 +136,16 @@ where let mut server_config_h3 = QuicServerConfig::with_crypto(Arc::new(rustls_server_config)); server_config_h3.transport = Arc::new(transport_config_quic); server_config_h3.concurrent_connections(self.globals.h3_max_concurrent_connections); - let (endpoint, mut incoming) = Endpoint::server(server_config_h3, self.listening_on)?; + let endpoint = Endpoint::server(server_config_h3, self.listening_on)?; let mut server_crypto: Option> = None; loop { tokio::select! { - new_conn = incoming.next() => { + new_conn = endpoint.accept() => { if server_crypto.is_none() || new_conn.is_none() { continue; } - let mut conn = new_conn.unwrap(); + let mut conn: quinn::Connecting = new_conn.unwrap(); let hsd = match conn.handshake_data().await { Ok(h) => h, Err(_) => continue