tests for http3
This commit is contained in:
parent
6030beb32b
commit
9d4cf08fba
11 changed files with 317 additions and 34 deletions
|
|
@ -180,7 +180,20 @@ impl Backend {
|
|||
"Unable to find a valid certificate and key",
|
||||
)
|
||||
})?;
|
||||
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
|
||||
#[cfg(feature = "h3")]
|
||||
{
|
||||
server_config.alpn_protocols = vec![
|
||||
b"h3".to_vec(),
|
||||
b"hq-29".to_vec(), // quinn draft example TODO: remove later
|
||||
b"h2".to_vec(),
|
||||
b"http/1.1".to_vec(),
|
||||
];
|
||||
}
|
||||
#[cfg(not(feature = "h3"))]
|
||||
{
|
||||
server_config.alpn_protocols = vec![b"h2".to_vec(), b"http/1.1".to_vec()];
|
||||
}
|
||||
|
||||
let mut config_store = self.server_config.lock();
|
||||
*config_store = Some(server_config);
|
||||
|
|
|
|||
|
|
@ -39,11 +39,18 @@ pub fn parse_opts(globals: &mut Globals, backends: &mut Backends) -> Result<()>
|
|||
},
|
||||
anyhow!("Wrong port spec.")
|
||||
);
|
||||
let mut listen_addresses: Vec<&str> = LISTEN_ADDRESSES_V4.to_vec();
|
||||
if let Some(v) = config.listen_ipv6 {
|
||||
let mut listen_addresses: Vec<&str> = Vec::new();
|
||||
if let Some(v) = config.listen_only_ipv6 {
|
||||
if v {
|
||||
listen_addresses.extend(LISTEN_ADDRESSES_V6.iter());
|
||||
}
|
||||
} else if let Some(v) = config.listen_ipv6 {
|
||||
listen_addresses.extend(LISTEN_ADDRESSES_V4.iter());
|
||||
if v {
|
||||
listen_addresses.extend(LISTEN_ADDRESSES_V6.iter());
|
||||
}
|
||||
} else {
|
||||
listen_addresses.extend(LISTEN_ADDRESSES_V4.iter());
|
||||
}
|
||||
globals.listen_sockets = listen_addresses
|
||||
.iter()
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ pub struct ConfigToml {
|
|||
pub listen_port: Option<u16>,
|
||||
pub listen_port_tls: Option<u16>,
|
||||
pub listen_ipv6: Option<bool>,
|
||||
pub listen_only_ipv6: Option<bool>,
|
||||
pub max_concurrent_streams: Option<u32>,
|
||||
pub max_clients: Option<u32>,
|
||||
pub apps: Option<Apps>,
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
#[cfg(feature = "h3")]
|
||||
mod proxy_h3;
|
||||
mod proxy_handler;
|
||||
mod proxy_main;
|
||||
mod proxy_tls;
|
||||
|
|
|
|||
214
src/proxy/proxy_h3.rs
Normal file
214
src/proxy/proxy_h3.rs
Normal file
|
|
@ -0,0 +1,214 @@
|
|||
use super::Proxy;
|
||||
use crate::{error::*, log::*};
|
||||
use bytes::{Buf, Bytes, BytesMut};
|
||||
use futures::{FutureExt, StreamExt};
|
||||
use h3::{quic::BidiStream, server::RequestStream};
|
||||
use hyper::body::HttpBody;
|
||||
use hyper::http::request;
|
||||
use hyper::Response;
|
||||
use hyper::{client::connect::Connect, Body, Request};
|
||||
use std::{ascii, str};
|
||||
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
|
||||
|
||||
impl<T> Proxy<T>
|
||||
where
|
||||
T: Connect + Clone + Sync + Send + 'static,
|
||||
{
|
||||
pub async fn client_serve_h3(self, conn: quinn::Connecting) -> Result<()> {
|
||||
let client_addr = conn.remote_address();
|
||||
|
||||
match conn.await {
|
||||
Ok(new_conn) => {
|
||||
debug!(
|
||||
"HTTP/3 connection established from {:?} {:?}",
|
||||
client_addr,
|
||||
{
|
||||
let hsd = new_conn
|
||||
.connection
|
||||
.handshake_data()
|
||||
.ok_or_else(|| anyhow!(""))?
|
||||
.downcast::<quinn::crypto::rustls::HandshakeData>()
|
||||
.map_err(|_| anyhow!(""))?;
|
||||
(
|
||||
hsd.protocol.map_or_else(
|
||||
|| "<none>".into(),
|
||||
|x| String::from_utf8_lossy(&x).into_owned(),
|
||||
),
|
||||
hsd.server_name.map_or_else(|| "<none>".into(), |x| x),
|
||||
)
|
||||
}
|
||||
);
|
||||
|
||||
let mut h3_conn =
|
||||
h3::server::Connection::<_, bytes::Bytes>::new(h3_quinn::Connection::new(new_conn))
|
||||
.await?;
|
||||
|
||||
// let self_inner = self.clone();
|
||||
while let Some((req, stream)) = h3_conn.accept().await? {
|
||||
debug!("New request: {:?}", req);
|
||||
|
||||
let self_inner = self.clone();
|
||||
self.globals.runtime_handle.spawn(async move {
|
||||
let res = self_inner.handle_request_h3(req, stream, client_addr).await;
|
||||
// if let Err(e) = handle_request(req, stream).await {
|
||||
// error!("HTTP/3 request failed: {}", e);
|
||||
// }
|
||||
// });
|
||||
// tokio::spawn(async {
|
||||
// if let Err(e) = handle_request(req, stream, root).await {
|
||||
// error!("request failed: {}", e);
|
||||
// }
|
||||
});
|
||||
}
|
||||
}
|
||||
Err(err) => {
|
||||
warn!("HTTP/3 accepting connection failed: {:?}", err);
|
||||
}
|
||||
}
|
||||
// let quinn::NewConnection {
|
||||
// connection,
|
||||
// mut bi_streams,
|
||||
// ..
|
||||
// } = conn.await?;
|
||||
// async {
|
||||
// debug!(
|
||||
// "HTTP/3 connection established from {:?} (ALPN {:?}, SNI: {:?})",
|
||||
// connection.remote_address(),
|
||||
// connection
|
||||
// .handshake_data()
|
||||
// .unwrap()
|
||||
// .downcast::<quinn::crypto::rustls::HandshakeData>()
|
||||
// .unwrap()
|
||||
// .protocol
|
||||
// .map_or_else(
|
||||
// || "<none>".into(),
|
||||
// |x| String::from_utf8_lossy(&x).into_owned()
|
||||
// ),
|
||||
// connection
|
||||
// .handshake_data()
|
||||
// .unwrap()
|
||||
// .downcast::<quinn::crypto::rustls::HandshakeData>()
|
||||
// .unwrap()
|
||||
// .server_name
|
||||
// .map_or_else(|| "<none>".into(), |x| x)
|
||||
// );
|
||||
|
||||
// // Each stream initiated by the client constitutes a new request.
|
||||
// while let Some(stream) = bi_streams.next().await {
|
||||
// let stream = match stream {
|
||||
// Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
|
||||
// debug!("HTTP/3 connection closed");
|
||||
// return Ok(());
|
||||
// }
|
||||
// Err(e) => {
|
||||
// return Err(e);
|
||||
// }
|
||||
// Ok(s) => s,
|
||||
// };
|
||||
// let fut = handle_request_h3(stream);
|
||||
// tokio::spawn(async move {
|
||||
// if let Err(e) = fut.await {
|
||||
// error!("failed: {reason}", reason = e.to_string());
|
||||
// }
|
||||
// });
|
||||
// }
|
||||
// Ok(())
|
||||
// }
|
||||
// .await?;
|
||||
// Ok(())
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn handle_request_h3<S>(
|
||||
self,
|
||||
req: Request<()>,
|
||||
mut stream: RequestStream<S, Bytes>,
|
||||
client_addr: SocketAddr,
|
||||
) -> Result<()>
|
||||
where
|
||||
S: BidiStream<Bytes>,
|
||||
{
|
||||
let (req_parts, _) = req.into_parts();
|
||||
|
||||
let body = if let Some(request_body) = stream.recv_data().await? {
|
||||
let chunk = request_body.chunk();
|
||||
Body::from(chunk.to_owned())
|
||||
} else {
|
||||
Body::default()
|
||||
};
|
||||
|
||||
let mut new_req: Request<Body> = Request::from_parts(req_parts, body);
|
||||
if let Some(request_trailers) = stream.recv_trailers().await? {
|
||||
let headers = new_req.headers_mut();
|
||||
for (ok, v) in request_trailers {
|
||||
if let Some(k) = ok {
|
||||
headers.insert(k, v);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
let res = self.handle_request(new_req, client_addr).await?;
|
||||
println!("{:?}", res);
|
||||
|
||||
let (new_res_parts, new_body) = res.into_parts();
|
||||
let new_res = Response::from_parts(new_res_parts, ());
|
||||
|
||||
match stream.send_response(new_res).await {
|
||||
Ok(_) => {
|
||||
debug!("HTTP/3 response to connection successful");
|
||||
let data = hyper::body::to_bytes(new_body).await?;
|
||||
stream.send_data(data).await?;
|
||||
}
|
||||
Err(err) => {
|
||||
error!("Unable to send response to connection peer: {:?}", err);
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// TODO:
|
||||
// async fn handle_request_h3((mut send, recv): (quinn::SendStream, quinn::RecvStream)) -> Result<()> {
|
||||
// let req = recv
|
||||
// .read_to_end(64 * 1024)
|
||||
// .await
|
||||
// .map_err(|e| anyhow!("failed reading request: {}", e))?;
|
||||
|
||||
// // let hyper_req = hyper::Request::try_from(req.clone());
|
||||
|
||||
// let mut escaped = String::new();
|
||||
// for &x in &req[..] {
|
||||
// let part = ascii::escape_default(x).collect::<Vec<_>>();
|
||||
// escaped.push_str(str::from_utf8(&part).unwrap());
|
||||
// }
|
||||
// info!("content = {:?}", escaped);
|
||||
// // Execute the request
|
||||
// let resp = process_get(&req).unwrap_or_else(|e| {
|
||||
// error!("failed: {}", e);
|
||||
// format!("failed to process request: {}\n", e).into_bytes()
|
||||
// });
|
||||
// // Write the response
|
||||
// send
|
||||
// .write_all(&resp)
|
||||
// .await
|
||||
// .map_err(|e| anyhow!("failed to send response: {}", e))?;
|
||||
// // Gracefully terminate the stream
|
||||
// send
|
||||
// .finish()
|
||||
// .await
|
||||
// .map_err(|e| anyhow!("failed to shutdown stream: {}", e))?;
|
||||
// info!("complete");
|
||||
// Ok(())
|
||||
// }
|
||||
|
||||
// fn process_get(x: &[u8]) -> Result<Vec<u8>> {
|
||||
// if x.len() < 4 || &x[0..4] != b"GET " {
|
||||
// bail!("missing GET");
|
||||
// }
|
||||
// if x[4..].len() < 2 || &x[x.len() - 2..] != b"\r\n" {
|
||||
// bail!("missing \\r\\n");
|
||||
// }
|
||||
|
||||
// let data = b"hello world!".to_vec();
|
||||
// Ok(data)
|
||||
// }
|
||||
|
|
@ -200,6 +200,9 @@ fn generate_request_forwarded<B: core::fmt::Debug>(
|
|||
// Change version to http/1.1 when destination scheme is http
|
||||
if req.version() != Version::HTTP_11 && upstream_scheme_host.scheme() == Some(&Scheme::HTTP) {
|
||||
*req.version_mut() = Version::HTTP_11;
|
||||
} else if req.version() == Version::HTTP_3 {
|
||||
debug!("HTTP/3 is currently unsupported for request to upstream. Use HTTP/2.");
|
||||
*req.version_mut() = Version::HTTP_2;
|
||||
}
|
||||
|
||||
Ok(req)
|
||||
|
|
@ -290,7 +293,10 @@ fn secure_redirection(
|
|||
Ok(response)
|
||||
}
|
||||
|
||||
fn parse_host_port(req: &Request<Body>, tls_enabled: bool) -> Result<(String, u16)> {
|
||||
fn parse_host_port<B: core::fmt::Debug>(
|
||||
req: &Request<B>,
|
||||
tls_enabled: bool,
|
||||
) -> Result<(String, u16)> {
|
||||
let host_port_headers = req.headers().get("host");
|
||||
let host_uri = req.uri().host();
|
||||
let port_uri = req.uri().port_u16();
|
||||
|
|
|
|||
|
|
@ -74,13 +74,14 @@ where
|
|||
});
|
||||
}
|
||||
|
||||
async fn start_without_tls(
|
||||
self,
|
||||
listener: TcpListener,
|
||||
server: Http<LocalExecutor>,
|
||||
) -> Result<()> {
|
||||
async fn start_without_tls(self, server: Http<LocalExecutor>) -> Result<()> {
|
||||
let listener_service = async {
|
||||
while let Ok((stream, _client_addr)) = listener.accept().await {
|
||||
let tcp_listener = TcpListener::bind(&self.listening_on).await?;
|
||||
info!(
|
||||
"Start TCP proxy serving with HTTP request for configured host names: {:?}",
|
||||
tcp_listener.local_addr()?
|
||||
);
|
||||
while let Ok((stream, _client_addr)) = tcp_listener.accept().await {
|
||||
self
|
||||
.clone()
|
||||
.client_serve(stream, server.clone(), _client_addr)
|
||||
|
|
@ -93,8 +94,6 @@ where
|
|||
}
|
||||
|
||||
pub async fn start(self) -> Result<()> {
|
||||
let tcp_listener = TcpListener::bind(&self.listening_on).await?;
|
||||
|
||||
let mut server = Http::new();
|
||||
server.http1_keep_alive(self.globals.keepalive);
|
||||
server.http2_max_concurrent_streams(self.globals.max_concurrent_streams);
|
||||
|
|
@ -103,18 +102,10 @@ where
|
|||
let server = server.with_executor(executor);
|
||||
|
||||
if self.tls_enabled {
|
||||
info!(
|
||||
"Start TCP proxy serving with HTTPS request for configured host names: {:?}",
|
||||
tcp_listener.local_addr()?
|
||||
);
|
||||
// #[cfg(feature = "tls")]
|
||||
self.start_with_tls(tcp_listener, server).await?;
|
||||
self.start_with_tls(server).await?;
|
||||
} else {
|
||||
info!(
|
||||
"Start TCP proxy serving with HTTP request for configured host names: {:?}",
|
||||
tcp_listener.local_addr()?
|
||||
);
|
||||
self.start_without_tls(tcp_listener, server).await?;
|
||||
self.start_without_tls(server).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
|
|
|||
|
|
@ -1,5 +1,7 @@
|
|||
use super::proxy_main::{LocalExecutor, Proxy};
|
||||
use crate::{constants::CERTS_WATCH_DELAY_SECS, error::*, log::*};
|
||||
#[cfg(feature = "h3")]
|
||||
use futures::StreamExt;
|
||||
use futures::{future::FutureExt, join, select};
|
||||
use hyper::{client::connect::Connect, server::conn::Http};
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
|
@ -9,11 +11,7 @@ impl<T> Proxy<T>
|
|||
where
|
||||
T: Connect + Clone + Sync + Send + 'static,
|
||||
{
|
||||
pub async fn start_with_tls(
|
||||
self,
|
||||
listener: TcpListener,
|
||||
server: Http<LocalExecutor>,
|
||||
) -> Result<()> {
|
||||
pub async fn start_with_tls(self, server: Http<LocalExecutor>) -> Result<()> {
|
||||
let cert_service = async {
|
||||
info!("Start cert watch service for {}", self.listening_on);
|
||||
loop {
|
||||
|
|
@ -28,10 +26,17 @@ where
|
|||
}
|
||||
};
|
||||
|
||||
// TCP Listener Service, i.e., http/2 and http/1.1
|
||||
let listener_service = async {
|
||||
let tcp_listener = TcpListener::bind(&self.listening_on).await?;
|
||||
info!(
|
||||
"Start TCP proxy serving with HTTPS request for configured host names: {:?}",
|
||||
tcp_listener.local_addr()?
|
||||
);
|
||||
|
||||
loop {
|
||||
select! {
|
||||
tcp_cnx = listener.accept().fuse() => {
|
||||
tcp_cnx = tcp_listener.accept().fuse() => {
|
||||
if tcp_cnx.is_err() {
|
||||
continue;
|
||||
}
|
||||
|
|
@ -81,6 +86,44 @@ where
|
|||
Ok(()) as Result<()>
|
||||
};
|
||||
|
||||
join!(listener_service, cert_service).0
|
||||
/////////////////////// TODO:!!!!!
|
||||
#[cfg(feature = "h3")]
|
||||
let listener_service_h3 = async {
|
||||
// TODO: とりあえずデフォルトのserver_cryptoが必要になりそう
|
||||
let backend_serve = self.backends.apps.get("localhost").unwrap();
|
||||
let server_crypto = backend_serve.get_tls_server_config().unwrap();
|
||||
let server_config_h3 = quinn::ServerConfig::with_crypto(Arc::new(server_crypto));
|
||||
|
||||
let (endpoint, mut incoming) =
|
||||
quinn::Endpoint::server(server_config_h3, self.listening_on).unwrap();
|
||||
debug!("HTTP/3 UDP listening on {}", endpoint.local_addr().unwrap());
|
||||
|
||||
while let Some(mut conn) = incoming.next().await {
|
||||
debug!("HTTP/3 connection incoming");
|
||||
let hsd = conn.handshake_data().await;
|
||||
let hsd_downcast = hsd
|
||||
.unwrap()
|
||||
.downcast::<quinn::crypto::rustls::HandshakeData>()
|
||||
.unwrap();
|
||||
debug!("HTTP/3 SNI: {:?}", hsd_downcast.server_name);
|
||||
// TODO: ServerConfig::set_server_configでSNIに応じて再セット
|
||||
|
||||
let fut = self.clone().client_serve_h3(conn);
|
||||
self.globals.runtime_handle.spawn(async {
|
||||
if let Err(e) = fut.await {
|
||||
error!("connection failed: {reason}", reason = e.to_string())
|
||||
}
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
#[cfg(not(feature = "h3"))]
|
||||
{
|
||||
join!(listener_service, cert_service).0
|
||||
}
|
||||
#[cfg(feature = "h3")]
|
||||
{
|
||||
join!(listener_service, cert_service, listener_service_h3).0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue