Merge pull request #29 from junkurihara/develop

0.2.0
This commit is contained in:
Jun Kurihara 2023-03-31 17:46:47 +09:00 committed by GitHub
commit af284cc3bb
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 63 additions and 470 deletions

View file

@ -16,17 +16,17 @@ default = ["http3"]
http3 = ["quinn", "h3", "h3-quinn"]
[dependencies]
env_logger = "0.10.0"
anyhow = "1.0.70"
clap = { version = "4.1.14", features = ["std", "cargo", "wrap_help"] }
futures = { version = "0.3.27", features = ["alloc", "async-await"] }
hyper = { version = "0.14.25", default-features = false, features = [
"server",
"http1",
"http2",
"stream",
] }
log = "0.4.17"
clap = { version = "4.2.1", features = ["std", "cargo", "wrap_help"] }
rand = "0.8.5"
toml = { version = "0.7.3", default-features = false, features = ["parse"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.159", default-features = false, features = ["derive"] }
bytes = "1.4.0"
thiserror = "1.0.40"
x509-parser = "0.15.0"
derive_builder = "0.12.0"
futures = { version = "0.3.28", features = ["alloc", "async-await"] }
tokio = { version = "1.27.0", default-features = false, features = [
"net",
"rt-multi-thread",
@ -35,26 +35,32 @@ tokio = { version = "1.27.0", default-features = false, features = [
"sync",
"macros",
] }
tokio-rustls = { version = "0.23.4", features = ["early-data"] }
rustls-pemfile = "1.0.2"
rustls = { version = "0.20.8", default-features = false }
rand = "0.8.5"
toml = { version = "0.7.3", default-features = false, features = ["parse"] }
rustc-hash = "1.1.0"
serde = { version = "1.0.158", default-features = false, features = ["derive"] }
# http and tls
hyper = { version = "0.14.25", default-features = false, features = [
"server",
"http1",
"http2",
"stream",
] }
hyper-rustls = { version = "0.23.2", default-features = false, features = [
"tokio-runtime",
"webpki-tokio",
"http1",
"http2",
] }
bytes = "1.4.0"
quinn = { version = "0.8.5", optional = true }
tokio-rustls = { version = "0.23.4", features = ["early-data"] }
rustls-pemfile = "1.0.2"
rustls = { version = "0.20.8", default-features = false }
# logging
tracing = { version = "0.1.37" }
tracing-subscriber = { version = "0.3.16", features = ["env-filter"] }
# http/3
quinn = { version = "0.9.3", optional = true }
h3 = { path = "./h3/h3/", optional = true }
h3-quinn = { path = "./h3/h3-quinn/", optional = true }
thiserror = "1.0.40"
x509-parser = "0.15.0"
derive_builder = "0.12.0"
[target.'cfg(not(target_env = "msvc"))'.dependencies]

2
h3

@ -1 +1 @@
Subproject commit d34fd56f9d787e9f796958eb438ce93e6d72dc39
Subproject commit 49301f18e15d3acffc2a8d8bea1a8038c5f3fe6d

2
h3-quinn/.gitignore vendored
View file

@ -1,2 +0,0 @@
Cargo.lock
target/

View file

@ -1,17 +0,0 @@
[package]
name = "h3-quinn"
version = "0.0.0"
authors = ["Original from hyperium/h3", "Jun Kurihara"]
edition = "2021"
publish = false
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
h3 = { path = "../h3/h3/" }
bytes = "1.3.0"
futures-util = { version = "0.3.25", default-features = false, features = [
"io",
] }
quinn = { version = "0.9.3", default-features = false }
quinn-proto = { version = "0.9.2", default-features = false }

View file

@ -1,405 +0,0 @@
//! [`h3::quic`] traits implemented with Quinn
//! Copied from the original pull request at `hyperium/h3`: https://github.com/hyperium/h3/pull/145.
//! Currently maintained by Jun.
#![deny(missing_docs)]
use std::{
convert::TryInto,
fmt::{self, Display},
sync::Arc,
task::{Context, Poll},
};
use bytes::{Buf, Bytes};
use futures_util::{ready, FutureExt};
use h3::quic::{self, Error, StreamId, WriteBuf};
use quinn::VarInt;
pub use quinn::{self};
/// QUIC connection
///
/// A [`quic::Connection`] backed by [`quinn::Connection`].
pub struct Connection {
conn: quinn::Connection,
}
impl Connection {
/// Create a [`Connection`] from a [`quinn::Connection`]
pub fn new(conn: quinn::Connection) -> Self {
Self { conn }
}
}
impl<B: Buf> quic::Connection<B> for Connection {
type OpenStreams = OpenStreams;
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
type Error = ConnectionError;
fn poll_accept_bidi(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<Self::BidiStream>, Self::Error>> {
Poll::Ready(match ready!(Box::pin(self.conn.accept_bi()).poll_unpin(cx)) {
Ok((send, recv)) => Ok(Some(Self::BidiStream::new(
Self::SendStream::new(send),
Self::RecvStream::new(recv),
))),
Err(e) => Err(ConnectionError(e)),
})
}
fn poll_accept_recv(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<Self::RecvStream>, Self::Error>> {
Poll::Ready(match ready!(Box::pin(self.conn.accept_uni()).poll_unpin(cx)) {
Ok(recv) => Ok(Some(Self::RecvStream::new(recv))),
Err(e) => Err(ConnectionError(e)),
})
}
fn poll_open_bidi(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::BidiStream, Self::Error>> {
Poll::Ready(match ready!(Box::pin(self.conn.open_bi()).poll_unpin(cx)) {
Ok((send, recv)) => Ok(Self::BidiStream::new(
Self::SendStream::new(send),
Self::RecvStream::new(recv),
)),
Err(e) => Err(ConnectionError(e)),
})
}
fn poll_open_send(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::SendStream, Self::Error>> {
Poll::Ready(match ready!(Box::pin(self.conn.open_uni()).poll_unpin(cx)) {
Ok(send) => Ok(Self::SendStream::new(send)),
Err(e) => Err(ConnectionError(e)),
})
}
fn opener(&self) -> Self::OpenStreams {
Self::OpenStreams {
conn: self.conn.clone(),
}
}
fn close(&mut self, code: h3::error::Code, reason: &[u8]) {
self
.conn
.close(VarInt::from_u64(code.value()).expect("Invalid error code"), reason);
}
}
/// Stream opener
///
/// Implements [`quic::OpenStreams`].
pub struct OpenStreams {
conn: quinn::Connection,
}
impl<B: Buf> quic::OpenStreams<B> for OpenStreams {
type BidiStream = BidiStream<B>;
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
type Error = ConnectionError;
fn poll_open_bidi(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::BidiStream, Self::Error>> {
Poll::Ready(match ready!(Box::pin(self.conn.open_bi()).poll_unpin(cx)) {
Ok((send, recv)) => Ok(Self::BidiStream::new(
Self::SendStream::new(send),
Self::RecvStream::new(recv),
)),
Err(e) => Err(ConnectionError(e)),
})
}
fn poll_open_send(&mut self, cx: &mut Context<'_>) -> Poll<Result<Self::SendStream, Self::Error>> {
Poll::Ready(match ready!(Box::pin(self.conn.open_uni()).poll_unpin(cx)) {
Ok(send) => Ok(Self::SendStream::new(send)),
Err(e) => Err(ConnectionError(e)),
})
}
fn close(&mut self, code: h3::error::Code, reason: &[u8]) {
self
.conn
.close(VarInt::from_u64(code.value()).expect("Invalid error code"), reason);
}
}
impl Clone for OpenStreams {
fn clone(&self) -> Self {
Self {
conn: self.conn.clone(),
}
}
}
/// Stream that can be used to send and receive data
///
/// A [`quic::BidiStream`], which can be split into one send-only and
/// one receive-only stream.
pub struct BidiStream<B: Buf> {
send: SendStream<B>,
recv: RecvStream,
}
impl<B: Buf> BidiStream<B> {
fn new(send: SendStream<B>, recv: RecvStream) -> Self {
Self { send, recv }
}
}
impl<B: Buf> quic::BidiStream<B> for BidiStream<B> {
type SendStream = SendStream<B>;
type RecvStream = RecvStream;
fn split(self) -> (Self::SendStream, Self::RecvStream) {
(self.send, self.recv)
}
}
impl<B: Buf> quic::SendStream<B> for BidiStream<B> {
type Error = SendError;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.send.poll_ready(cx)
}
fn send_data<T: Into<WriteBuf<B>>>(&mut self, data: T) -> Result<(), Self::Error> {
self.send.send_data(data)
}
fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.send.poll_finish(cx)
}
fn reset(&mut self, reset_code: u64) {
self.send.reset(reset_code)
}
fn id(&self) -> StreamId {
self.send.id()
}
}
impl<B: Buf> quic::RecvStream for BidiStream<B> {
type Buf = Bytes;
type Error = RecvError;
fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<Self::Buf>, Self::Error>> {
self.recv.poll_data(cx)
}
fn stop_sending(&mut self, err_code: u64) {
self.recv.stop_sending(err_code)
}
}
/// Send-only stream
///
/// A [`quic::SendStream`] backed by [`quinn::SendStream`].
pub struct SendStream<B: Buf> {
stream: quinn::SendStream,
writing: Option<WriteBuf<B>>,
}
impl<B: Buf> SendStream<B> {
fn new(stream: quinn::SendStream) -> Self {
Self { stream, writing: None }
}
}
impl<B: Buf> quic::SendStream<B> for SendStream<B> {
type Error = SendError;
fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
if let Some(ref mut data) = self.writing {
while data.has_remaining() {
match ready!({
let mut write_fut = Box::pin(self.stream.write(data.chunk()));
write_fut.poll_unpin(cx)
}) {
Ok(cnt) => data.advance(cnt),
Err(e) => return Poll::Ready(Err(Self::Error::Write(e))),
}
}
}
self.writing = None;
Poll::Ready(Ok(()))
}
fn send_data<T: Into<WriteBuf<B>>>(&mut self, data: T) -> Result<(), Self::Error> {
match self.writing {
Some(_) => Err(Self::Error::NotReady),
None => {
self.writing = Some(data.into());
Ok(())
}
}
}
fn poll_finish(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Box::pin(self.stream.finish()).poll_unpin(cx).map_err(Into::into)
}
fn reset(&mut self, reset_code: u64) {
let _ = self.stream.reset(VarInt::from_u64(reset_code).unwrap_or(VarInt::MAX));
}
fn id(&self) -> StreamId {
self.stream.id().0.try_into().expect("Invalid stream id")
}
}
/// Receive-only stream
///
/// A [`quic::RecvStream`] backed by [`quinn::RecvStream`].
pub struct RecvStream {
stream: quinn::RecvStream,
}
impl RecvStream {
fn new(stream: quinn::RecvStream) -> Self {
Self { stream }
}
}
impl quic::RecvStream for RecvStream {
type Buf = Bytes;
type Error = RecvError;
fn poll_data(&mut self, cx: &mut Context<'_>) -> Poll<Result<Option<Self::Buf>, Self::Error>> {
let data = ready!(Box::pin(self.stream.read_chunk(usize::MAX, true)).poll_unpin(cx))?;
Poll::Ready(Ok(data.map(|ch| ch.bytes)))
}
fn stop_sending(&mut self, err_code: u64) {
let _ = self
.stream
.stop(VarInt::from_u64(err_code).expect("Invalid error code"));
}
}
/// The error type for [`quic::Connection::Error`]
///
/// Used by [`Connection`].
#[derive(Debug)]
pub struct ConnectionError(quinn::ConnectionError);
impl Error for ConnectionError {
fn is_timeout(&self) -> bool {
matches!(self.0, quinn::ConnectionError::TimedOut)
}
fn err_code(&self) -> Option<u64> {
match self.0 {
quinn::ConnectionError::ApplicationClosed(quinn_proto::ApplicationClose { error_code, .. }) => {
Some(error_code.into_inner())
}
_ => None,
}
}
}
impl std::error::Error for ConnectionError {}
impl Display for ConnectionError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl From<quinn::ConnectionError> for ConnectionError {
fn from(e: quinn::ConnectionError) -> Self {
Self(e)
}
}
/// The error type for [`quic::SendStream::Error`]
///
/// Used by [`SendStream`] and [`BidiStream`].
#[derive(Debug)]
pub enum SendError {
/// For write errors, wrapping a [`quinn::WriteError`]
Write(quinn::WriteError),
/// For trying to send when stream is not ready, because it is
/// still sending data from the previous call
NotReady,
}
impl Error for SendError {
fn is_timeout(&self) -> bool {
matches!(
self,
Self::Write(quinn::WriteError::ConnectionLost(quinn::ConnectionError::TimedOut))
)
}
fn err_code(&self) -> Option<u64> {
match self {
Self::Write(quinn::WriteError::Stopped(error_code)) => Some(error_code.into_inner()),
Self::Write(quinn::WriteError::ConnectionLost(quinn::ConnectionError::ApplicationClosed(
quinn_proto::ApplicationClose { error_code, .. },
))) => Some(error_code.into_inner()),
_ => None,
}
}
}
impl std::error::Error for SendError {}
impl Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:?}", self)
}
}
impl From<quinn::WriteError> for SendError {
fn from(e: quinn::WriteError) -> Self {
Self::Write(e)
}
}
/// The error type for [`quic::RecvStream::Error`]
///
/// Used by [`RecvStream`] and [`BidiStream`].
#[derive(Debug)]
pub struct RecvError(quinn::ReadError);
impl Error for RecvError {
fn is_timeout(&self) -> bool {
matches!(
self.0,
quinn::ReadError::ConnectionLost(quinn::ConnectionError::TimedOut),
)
}
fn err_code(&self) -> Option<u64> {
match self.0 {
quinn::ReadError::ConnectionLost(quinn::ConnectionError::ApplicationClosed(quinn_proto::ApplicationClose {
error_code,
..
})) => Some(error_code.into_inner()),
quinn::ReadError::Reset(error_code) => Some(error_code.into_inner()),
_ => None,
}
}
}
impl std::error::Error for RecvError {}
impl fmt::Display for RecvError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
self.0.fmt(f)
}
}
impl From<RecvError> for Arc<dyn Error> {
fn from(e: RecvError) -> Self {
Arc::new(e)
}
}
impl From<quinn::ReadError> for RecvError {
fn from(e: quinn::ReadError) -> Self {
Self(e)
}
}

View file

@ -1,6 +1,6 @@
use crate::utils::ToCanonical;
pub use log::{debug, error, info, warn, Level};
use std::net::SocketAddr;
pub use tracing::{debug, error, info, warn};
#[derive(Debug, Clone)]
pub struct MessageLog {
@ -95,3 +95,26 @@ impl MessageLog {
);
}
}
pub fn init_logger() {
use tracing_subscriber::{fmt, prelude::*, EnvFilter};
let format_layer = fmt::layer()
.with_line_number(false)
.with_thread_ids(false)
.with_target(false)
.with_thread_names(true)
.with_target(true)
.with_level(true)
.compact();
// This limits the logger to emits only rpxy crate
let level_string = std::env::var(EnvFilter::DEFAULT_ENV).unwrap_or_else(|_| "info".to_string());
let filter_layer = EnvFilter::new(format!("{}={}", env!("CARGO_PKG_NAME"), level_string));
// let filter_layer = EnvFilter::from_default_env();
tracing_subscriber::registry()
.with(format_layer)
.with(filter_layer)
.init();
}

View file

@ -30,24 +30,11 @@ use futures::future::select_all;
use hyper::Client;
// use hyper_trust_dns::TrustDnsResolver;
use rustc_hash::FxHashMap as HashMap;
use std::{io::Write, sync::Arc};
use std::sync::Arc;
use tokio::time::Duration;
fn main() {
// env::set_var("RUST_LOG", "info");
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.format(|buf, rec| {
let ts = buf.timestamp();
match rec.level() {
log::Level::Debug => {
writeln!(buf, "{} [{}] {} ({})", ts, rec.level(), rec.args(), rec.target(),)
}
_ => {
writeln!(buf, "{} [{}] {}", ts, rec.level(), rec.args(),)
}
}
})
.init();
init_logger();
let mut runtime_builder = tokio::runtime::Builder::new_multi_thread();
runtime_builder.enable_all();

View file

@ -7,7 +7,6 @@ use crate::{
utils::BytesName,
};
#[cfg(feature = "http3")]
use futures::StreamExt;
use hyper::{client::connect::Connect, server::conn::Http};
#[cfg(feature = "http3")]
use quinn::{crypto::rustls::HandshakeData, Endpoint, ServerConfig as QuicServerConfig, TransportConfig};
@ -61,7 +60,7 @@ where
// spawns async handshake to avoid blocking thread by sequential handshake.
let handshake_fut = async move {
let acceptor = tokio_rustls::LazyConfigAcceptor::new(rustls::server::Acceptor::default(), raw_stream).await;
let acceptor = tokio_rustls::LazyConfigAcceptor::new(tokio_rustls::rustls::server::Acceptor::default(), raw_stream).await;
if let Err(e) = acceptor {
return Err(RpxyError::Proxy(format!("Failed to handshake TLS: {e}")));
}
@ -122,9 +121,11 @@ where
info!("Start UDP proxy serving with HTTP/3 request for configured host names");
// first set as null config server
let rustls_server_config = ServerConfig::builder()
.with_safe_defaults()
.with_safe_default_cipher_suites()
.with_safe_default_kx_groups()
.with_protocol_versions(&[&rustls::version::TLS13])?
.with_no_client_auth()
.with_cert_resolver(Arc::new(tokio_rustls::rustls::server::ResolvesServerCertUsingSni::new()));
.with_cert_resolver(Arc::new(rustls::server::ResolvesServerCertUsingSni::new()));
let mut transport_config_quic = TransportConfig::default();
transport_config_quic
@ -135,16 +136,16 @@ where
let mut server_config_h3 = QuicServerConfig::with_crypto(Arc::new(rustls_server_config));
server_config_h3.transport = Arc::new(transport_config_quic);
server_config_h3.concurrent_connections(self.globals.h3_max_concurrent_connections);
let (endpoint, mut incoming) = Endpoint::server(server_config_h3, self.listening_on)?;
let endpoint = Endpoint::server(server_config_h3, self.listening_on)?;
let mut server_crypto: Option<Arc<ServerCrypto>> = None;
loop {
tokio::select! {
new_conn = incoming.next() => {
new_conn = endpoint.accept() => {
if server_crypto.is_none() || new_conn.is_none() {
continue;
}
let mut conn = new_conn.unwrap();
let mut conn: quinn::Connecting = new_conn.unwrap();
let hsd = match conn.handshake_data().await {
Ok(h) => h,
Err(_) => continue