From 9d4cf08fbac52412dcd845a5a543bf44eafe09b7 Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Mon, 4 Jul 2022 23:15:25 +0900 Subject: [PATCH] tests for http3 --- Cargo.toml | 9 +- bench/bench.sh | 6 +- config-example.toml | 1 + src/backend.rs | 15 ++- src/config/parse.rs | 11 +- src/config/toml.rs | 1 + src/proxy/mod.rs | 2 + src/proxy/proxy_h3.rs | 214 +++++++++++++++++++++++++++++++++++++ src/proxy/proxy_handler.rs | 8 +- src/proxy/proxy_main.rs | 27 ++--- src/proxy/proxy_tls.rs | 57 ++++++++-- 11 files changed, 317 insertions(+), 34 deletions(-) create mode 100644 src/proxy/proxy_h3.rs diff --git a/Cargo.toml b/Cargo.toml index ed3f832..b18377b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,7 +12,8 @@ publish = false # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [features] -default = [] +default = ["h3"] +h3 = ["quinn"] [dependencies] anyhow = "1.0.58" @@ -26,7 +27,7 @@ hyper = { version = "0.14.19", default-features = false, features = [ "stream", ] } log = "0.4.17" -tokio = { version = "1.19.2", features = [ +tokio = { version = "1.19.2", default-features = false, features = [ "net", "rt-multi-thread", "parking_lot", @@ -48,6 +49,10 @@ hyper-rustls = { version = "0.23.0", default-features = false, features = [ "http2", ] } parking_lot = "0.12.1" +quinn = { version = "0.8.3", optional = true } +h3 = { git = "https://github.com/hyperium/h3.git" } +h3-quinn = { git = "https://github.com/hyperium/h3.git" } +bytes = "1.1.0" [target.'cfg(not(target_env = "msvc"))'.dependencies] tikv-jemallocator = "0.5.0" diff --git a/bench/bench.sh b/bench/bench.sh index e024055..428ab62 100644 --- a/bench/bench.sh +++ b/bench/bench.sh @@ -2,12 +2,12 @@ echo "----------------------------" echo "Benchmark on rpxy" -ab -c 100 -n 10000 http://127.0.0.1:8080/ # TODO: localhost = 127.0.0.1を解決できるように決めておかんとだめそう +ab -c 100 -n 10000 http://127.0.0.1:8080/index.html # TODO: localhost = 127.0.0.1を解決できるように決めておかんとだめそう echo "----------------------------" echo "Benchmark on nginx" -ab -c 100 -n 10000 http://127.0.0.1:8090/ +ab -c 100 -n 10000 http://127.0.0.1:8090/index.html echo "----------------------------" echo "Benchmark on caddy" -ab -c 100 -n 10000 http://127.0.0.1:8100/ +ab -c 100 -n 10000 http://127.0.0.1:8100/index.html diff --git a/config-example.toml b/config-example.toml index 5befe8b..6792965 100644 --- a/config-example.toml +++ b/config-example.toml @@ -16,6 +16,7 @@ max_clients = 512 # Optional: Listen [::] listen_ipv6 = false +listen_only_ipv6 = false # Optional: App that serves all plaintext http request by referring to HOSTS or request header # execpt for configured application. diff --git a/src/backend.rs b/src/backend.rs index 554eccd..d5f56ee 100644 --- a/src/backend.rs +++ b/src/backend.rs @@ -180,7 +180,20 @@ impl Backend { "Unable to find a valid certificate and key", ) })?; - server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + + #[cfg(feature = "h3")] + { + server_config.alpn_protocols = vec![ + b"h3".to_vec(), + b"hq-29".to_vec(), // quinn draft example TODO: remove later + b"h2".to_vec(), + b"http/1.1".to_vec(), + ]; + } + #[cfg(not(feature = "h3"))] + { + server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()]; + } let mut config_store = self.server_config.lock(); *config_store = Some(server_config); diff --git a/src/config/parse.rs b/src/config/parse.rs index 7008486..56bdc8a 100644 --- a/src/config/parse.rs +++ b/src/config/parse.rs @@ -39,11 +39,18 @@ pub fn parse_opts(globals: &mut Globals, backends: &mut Backends) -> Result<()> }, anyhow!("Wrong port spec.") ); - let mut listen_addresses: Vec<&str> = LISTEN_ADDRESSES_V4.to_vec(); - if let Some(v) = config.listen_ipv6 { + let mut listen_addresses: Vec<&str> = Vec::new(); + if let Some(v) = config.listen_only_ipv6 { if v { listen_addresses.extend(LISTEN_ADDRESSES_V6.iter()); } + } else if let Some(v) = config.listen_ipv6 { + listen_addresses.extend(LISTEN_ADDRESSES_V4.iter()); + if v { + listen_addresses.extend(LISTEN_ADDRESSES_V6.iter()); + } + } else { + listen_addresses.extend(LISTEN_ADDRESSES_V4.iter()); } globals.listen_sockets = listen_addresses .iter() diff --git a/src/config/toml.rs b/src/config/toml.rs index b04e048..94eaf45 100644 --- a/src/config/toml.rs +++ b/src/config/toml.rs @@ -8,6 +8,7 @@ pub struct ConfigToml { pub listen_port: Option, pub listen_port_tls: Option, pub listen_ipv6: Option, + pub listen_only_ipv6: Option, pub max_concurrent_streams: Option, pub max_clients: Option, pub apps: Option, diff --git a/src/proxy/mod.rs b/src/proxy/mod.rs index a094bdc..20b5810 100644 --- a/src/proxy/mod.rs +++ b/src/proxy/mod.rs @@ -1,3 +1,5 @@ +#[cfg(feature = "h3")] +mod proxy_h3; mod proxy_handler; mod proxy_main; mod proxy_tls; diff --git a/src/proxy/proxy_h3.rs b/src/proxy/proxy_h3.rs new file mode 100644 index 0000000..7df9f34 --- /dev/null +++ b/src/proxy/proxy_h3.rs @@ -0,0 +1,214 @@ +use super::Proxy; +use crate::{error::*, log::*}; +use bytes::{Buf, Bytes, BytesMut}; +use futures::{FutureExt, StreamExt}; +use h3::{quic::BidiStream, server::RequestStream}; +use hyper::body::HttpBody; +use hyper::http::request; +use hyper::Response; +use hyper::{client::connect::Connect, Body, Request}; +use std::{ascii, str}; +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; + +impl Proxy +where + T: Connect + Clone + Sync + Send + 'static, +{ + pub async fn client_serve_h3(self, conn: quinn::Connecting) -> Result<()> { + let client_addr = conn.remote_address(); + + match conn.await { + Ok(new_conn) => { + debug!( + "HTTP/3 connection established from {:?} {:?}", + client_addr, + { + let hsd = new_conn + .connection + .handshake_data() + .ok_or_else(|| anyhow!(""))? + .downcast::() + .map_err(|_| anyhow!(""))?; + ( + hsd.protocol.map_or_else( + || "".into(), + |x| String::from_utf8_lossy(&x).into_owned(), + ), + hsd.server_name.map_or_else(|| "".into(), |x| x), + ) + } + ); + + let mut h3_conn = + h3::server::Connection::<_, bytes::Bytes>::new(h3_quinn::Connection::new(new_conn)) + .await?; + + // let self_inner = self.clone(); + while let Some((req, stream)) = h3_conn.accept().await? { + debug!("New request: {:?}", req); + + let self_inner = self.clone(); + self.globals.runtime_handle.spawn(async move { + let res = self_inner.handle_request_h3(req, stream, client_addr).await; + // if let Err(e) = handle_request(req, stream).await { + // error!("HTTP/3 request failed: {}", e); + // } + // }); + // tokio::spawn(async { + // if let Err(e) = handle_request(req, stream, root).await { + // error!("request failed: {}", e); + // } + }); + } + } + Err(err) => { + warn!("HTTP/3 accepting connection failed: {:?}", err); + } + } + // let quinn::NewConnection { + // connection, + // mut bi_streams, + // .. + // } = conn.await?; + // async { + // debug!( + // "HTTP/3 connection established from {:?} (ALPN {:?}, SNI: {:?})", + // connection.remote_address(), + // connection + // .handshake_data() + // .unwrap() + // .downcast::() + // .unwrap() + // .protocol + // .map_or_else( + // || "".into(), + // |x| String::from_utf8_lossy(&x).into_owned() + // ), + // connection + // .handshake_data() + // .unwrap() + // .downcast::() + // .unwrap() + // .server_name + // .map_or_else(|| "".into(), |x| x) + // ); + + // // Each stream initiated by the client constitutes a new request. + // while let Some(stream) = bi_streams.next().await { + // let stream = match stream { + // Err(quinn::ConnectionError::ApplicationClosed { .. }) => { + // debug!("HTTP/3 connection closed"); + // return Ok(()); + // } + // Err(e) => { + // return Err(e); + // } + // Ok(s) => s, + // }; + // let fut = handle_request_h3(stream); + // tokio::spawn(async move { + // if let Err(e) = fut.await { + // error!("failed: {reason}", reason = e.to_string()); + // } + // }); + // } + // Ok(()) + // } + // .await?; + // Ok(()) + Ok(()) + } + + async fn handle_request_h3( + self, + req: Request<()>, + mut stream: RequestStream, + client_addr: SocketAddr, + ) -> Result<()> + where + S: BidiStream, + { + let (req_parts, _) = req.into_parts(); + + let body = if let Some(request_body) = stream.recv_data().await? { + let chunk = request_body.chunk(); + Body::from(chunk.to_owned()) + } else { + Body::default() + }; + + let mut new_req: Request = Request::from_parts(req_parts, body); + if let Some(request_trailers) = stream.recv_trailers().await? { + let headers = new_req.headers_mut(); + for (ok, v) in request_trailers { + if let Some(k) = ok { + headers.insert(k, v); + } + } + }; + + let res = self.handle_request(new_req, client_addr).await?; + println!("{:?}", res); + + let (new_res_parts, new_body) = res.into_parts(); + let new_res = Response::from_parts(new_res_parts, ()); + + match stream.send_response(new_res).await { + Ok(_) => { + debug!("HTTP/3 response to connection successful"); + let data = hyper::body::to_bytes(new_body).await?; + stream.send_data(data).await?; + } + Err(err) => { + error!("Unable to send response to connection peer: {:?}", err); + } + } + Ok(()) + } +} + +// TODO: +// async fn handle_request_h3((mut send, recv): (quinn::SendStream, quinn::RecvStream)) -> Result<()> { +// let req = recv +// .read_to_end(64 * 1024) +// .await +// .map_err(|e| anyhow!("failed reading request: {}", e))?; + +// // let hyper_req = hyper::Request::try_from(req.clone()); + +// let mut escaped = String::new(); +// for &x in &req[..] { +// let part = ascii::escape_default(x).collect::>(); +// escaped.push_str(str::from_utf8(&part).unwrap()); +// } +// info!("content = {:?}", escaped); +// // Execute the request +// let resp = process_get(&req).unwrap_or_else(|e| { +// error!("failed: {}", e); +// format!("failed to process request: {}\n", e).into_bytes() +// }); +// // Write the response +// send +// .write_all(&resp) +// .await +// .map_err(|e| anyhow!("failed to send response: {}", e))?; +// // Gracefully terminate the stream +// send +// .finish() +// .await +// .map_err(|e| anyhow!("failed to shutdown stream: {}", e))?; +// info!("complete"); +// Ok(()) +// } + +// fn process_get(x: &[u8]) -> Result> { +// if x.len() < 4 || &x[0..4] != b"GET " { +// bail!("missing GET"); +// } +// if x[4..].len() < 2 || &x[x.len() - 2..] != b"\r\n" { +// bail!("missing \\r\\n"); +// } + +// let data = b"hello world!".to_vec(); +// Ok(data) +// } diff --git a/src/proxy/proxy_handler.rs b/src/proxy/proxy_handler.rs index c73e003..08be2c6 100644 --- a/src/proxy/proxy_handler.rs +++ b/src/proxy/proxy_handler.rs @@ -200,6 +200,9 @@ fn generate_request_forwarded( // Change version to http/1.1 when destination scheme is http if req.version() != Version::HTTP_11 && upstream_scheme_host.scheme() == Some(&Scheme::HTTP) { *req.version_mut() = Version::HTTP_11; + } else if req.version() == Version::HTTP_3 { + debug!("HTTP/3 is currently unsupported for request to upstream. Use HTTP/2."); + *req.version_mut() = Version::HTTP_2; } Ok(req) @@ -290,7 +293,10 @@ fn secure_redirection( Ok(response) } -fn parse_host_port(req: &Request, tls_enabled: bool) -> Result<(String, u16)> { +fn parse_host_port( + req: &Request, + tls_enabled: bool, +) -> Result<(String, u16)> { let host_port_headers = req.headers().get("host"); let host_uri = req.uri().host(); let port_uri = req.uri().port_u16(); diff --git a/src/proxy/proxy_main.rs b/src/proxy/proxy_main.rs index b46e88c..1a8c707 100644 --- a/src/proxy/proxy_main.rs +++ b/src/proxy/proxy_main.rs @@ -74,13 +74,14 @@ where }); } - async fn start_without_tls( - self, - listener: TcpListener, - server: Http, - ) -> Result<()> { + async fn start_without_tls(self, server: Http) -> Result<()> { let listener_service = async { - while let Ok((stream, _client_addr)) = listener.accept().await { + let tcp_listener = TcpListener::bind(&self.listening_on).await?; + info!( + "Start TCP proxy serving with HTTP request for configured host names: {:?}", + tcp_listener.local_addr()? + ); + while let Ok((stream, _client_addr)) = tcp_listener.accept().await { self .clone() .client_serve(stream, server.clone(), _client_addr) @@ -93,8 +94,6 @@ where } pub async fn start(self) -> Result<()> { - let tcp_listener = TcpListener::bind(&self.listening_on).await?; - let mut server = Http::new(); server.http1_keep_alive(self.globals.keepalive); server.http2_max_concurrent_streams(self.globals.max_concurrent_streams); @@ -103,18 +102,10 @@ where let server = server.with_executor(executor); if self.tls_enabled { - info!( - "Start TCP proxy serving with HTTPS request for configured host names: {:?}", - tcp_listener.local_addr()? - ); // #[cfg(feature = "tls")] - self.start_with_tls(tcp_listener, server).await?; + self.start_with_tls(server).await?; } else { - info!( - "Start TCP proxy serving with HTTP request for configured host names: {:?}", - tcp_listener.local_addr()? - ); - self.start_without_tls(tcp_listener, server).await?; + self.start_without_tls(server).await?; } Ok(()) diff --git a/src/proxy/proxy_tls.rs b/src/proxy/proxy_tls.rs index b8f939f..48b0b84 100644 --- a/src/proxy/proxy_tls.rs +++ b/src/proxy/proxy_tls.rs @@ -1,5 +1,7 @@ use super::proxy_main::{LocalExecutor, Proxy}; use crate::{constants::CERTS_WATCH_DELAY_SECS, error::*, log::*}; +#[cfg(feature = "h3")] +use futures::StreamExt; use futures::{future::FutureExt, join, select}; use hyper::{client::connect::Connect, server::conn::Http}; use std::{sync::Arc, time::Duration}; @@ -9,11 +11,7 @@ impl Proxy where T: Connect + Clone + Sync + Send + 'static, { - pub async fn start_with_tls( - self, - listener: TcpListener, - server: Http, - ) -> Result<()> { + pub async fn start_with_tls(self, server: Http) -> Result<()> { let cert_service = async { info!("Start cert watch service for {}", self.listening_on); loop { @@ -28,10 +26,17 @@ where } }; + // TCP Listener Service, i.e., http/2 and http/1.1 let listener_service = async { + let tcp_listener = TcpListener::bind(&self.listening_on).await?; + info!( + "Start TCP proxy serving with HTTPS request for configured host names: {:?}", + tcp_listener.local_addr()? + ); + loop { select! { - tcp_cnx = listener.accept().fuse() => { + tcp_cnx = tcp_listener.accept().fuse() => { if tcp_cnx.is_err() { continue; } @@ -81,6 +86,44 @@ where Ok(()) as Result<()> }; - join!(listener_service, cert_service).0 + /////////////////////// TODO:!!!!! + #[cfg(feature = "h3")] + let listener_service_h3 = async { + // TODO: とりあえずデフォルトのserver_cryptoが必要になりそう + let backend_serve = self.backends.apps.get("localhost").unwrap(); + let server_crypto = backend_serve.get_tls_server_config().unwrap(); + let server_config_h3 = quinn::ServerConfig::with_crypto(Arc::new(server_crypto)); + + let (endpoint, mut incoming) = + quinn::Endpoint::server(server_config_h3, self.listening_on).unwrap(); + debug!("HTTP/3 UDP listening on {}", endpoint.local_addr().unwrap()); + + while let Some(mut conn) = incoming.next().await { + debug!("HTTP/3 connection incoming"); + let hsd = conn.handshake_data().await; + let hsd_downcast = hsd + .unwrap() + .downcast::() + .unwrap(); + debug!("HTTP/3 SNI: {:?}", hsd_downcast.server_name); + // TODO: ServerConfig::set_server_configでSNIに応じて再セット + + let fut = self.clone().client_serve_h3(conn); + self.globals.runtime_handle.spawn(async { + if let Err(e) = fut.await { + error!("connection failed: {reason}", reason = e.to_string()) + } + }); + } + }; + + #[cfg(not(feature = "h3"))] + { + join!(listener_service, cert_service).0 + } + #[cfg(feature = "h3")] + { + join!(listener_service, cert_service, listener_service_h3).0 + } } }