prototype (failed) to multiplex quic streams

This commit is contained in:
Jun Kurihara 2022-07-05 17:10:31 +09:00
commit 4b42e6eec1
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
3 changed files with 123 additions and 178 deletions

View file

@ -1,14 +1,9 @@
use super::Proxy;
use crate::{error::*, log::*};
use bytes::{Buf, Bytes, BytesMut};
use futures::{FutureExt, StreamExt};
use bytes::{Buf, Bytes};
use h3::{quic::BidiStream, server::RequestStream};
use hyper::body::HttpBody;
use hyper::http::request;
use hyper::Response;
use hyper::{client::connect::Connect, Body, Request};
use std::{ascii, str};
use std::{net::SocketAddr, path::PathBuf, sync::Arc};
use hyper::{client::connect::Connect, Body, HeaderMap, Request, Response};
use std::net::SocketAddr;
impl<T> Proxy<T>
where
@ -19,103 +14,47 @@ where
match conn.await {
Ok(new_conn) => {
debug!(
"HTTP/3 connection established from {:?} {:?}",
client_addr,
{
let hsd = new_conn
.connection
.handshake_data()
.ok_or_else(|| anyhow!(""))?
.downcast::<quinn::crypto::rustls::HandshakeData>()
.map_err(|_| anyhow!(""))?;
(
hsd.protocol.map_or_else(
|| "<none>".into(),
|x| String::from_utf8_lossy(&x).into_owned(),
),
hsd.server_name.map_or_else(|| "<none>".into(), |x| x),
)
}
);
info!("QUIC connection established from {:?} {:?}", client_addr, {
let hsd = new_conn
.connection
.handshake_data()
.ok_or_else(|| anyhow!(""))?
.downcast::<quinn::crypto::rustls::HandshakeData>()
.map_err(|_| anyhow!(""))?;
(
hsd.protocol.map_or_else(
|| "<none>".into(),
|x| String::from_utf8_lossy(&x).into_owned(),
),
hsd.server_name.map_or_else(|| "<none>".into(), |x| x),
)
});
let mut h3_conn =
h3::server::Connection::<_, bytes::Bytes>::new(h3_quinn::Connection::new(new_conn))
.await?;
info!("HTTP/3 connection established");
// let self_inner = self.clone();
while let Some((req, stream)) = h3_conn.accept().await? {
debug!("New request: {:?}", req);
while let Some((req, stream)) = h3_conn
.accept()
.await
.map_err(|e| anyhow!("HTTP/3 accept failed (likely timeout): {}", e))?
{
info!("HTTP/3 new request received");
let self_inner = self.clone();
self.globals.runtime_handle.spawn(async move {
let res = self_inner.handle_request_h3(req, stream, client_addr).await;
// if let Err(e) = handle_request(req, stream).await {
// error!("HTTP/3 request failed: {}", e);
// }
// });
// tokio::spawn(async {
// if let Err(e) = handle_request(req, stream, root).await {
// error!("request failed: {}", e);
// }
if let Err(e) = self_inner.handle_request_h3(req, stream, client_addr).await {
error!("HTTP/3 request failed: {}", e);
}
});
}
}
Err(err) => {
warn!("HTTP/3 accepting connection failed: {:?}", err);
warn!("QUIC accepting connection failed: {:?}", err);
}
}
// let quinn::NewConnection {
// connection,
// mut bi_streams,
// ..
// } = conn.await?;
// async {
// debug!(
// "HTTP/3 connection established from {:?} (ALPN {:?}, SNI: {:?})",
// connection.remote_address(),
// connection
// .handshake_data()
// .unwrap()
// .downcast::<quinn::crypto::rustls::HandshakeData>()
// .unwrap()
// .protocol
// .map_or_else(
// || "<none>".into(),
// |x| String::from_utf8_lossy(&x).into_owned()
// ),
// connection
// .handshake_data()
// .unwrap()
// .downcast::<quinn::crypto::rustls::HandshakeData>()
// .unwrap()
// .server_name
// .map_or_else(|| "<none>".into(), |x| x)
// );
// // Each stream initiated by the client constitutes a new request.
// while let Some(stream) = bi_streams.next().await {
// let stream = match stream {
// Err(quinn::ConnectionError::ApplicationClosed { .. }) => {
// debug!("HTTP/3 connection closed");
// return Ok(());
// }
// Err(e) => {
// return Err(e);
// }
// Ok(s) => s,
// };
// let fut = handle_request_h3(stream);
// tokio::spawn(async move {
// if let Err(e) = fut.await {
// error!("failed: {reason}", reason = e.to_string());
// }
// });
// }
// Ok(())
// }
// .await?;
// Ok(())
Ok(())
}
@ -130,25 +69,27 @@ where
{
let (req_parts, _) = req.into_parts();
let body = if let Some(request_body) = stream.recv_data().await? {
let chunk = request_body.chunk();
Body::from(chunk.to_owned())
} else {
// TODO: h3 -> h2/http1.1などのプロトコル変換がなければ、bodyはBytes単位で直でsend_dataして転送した方がいい。やむなし。
let mut body_chunk: Vec<u8> = Vec::new();
while let Some(request_body) = stream.recv_data().await? {
body_chunk.extend_from_slice(request_body.chunk());
}
let body = if body_chunk.is_empty() {
Body::default()
} else {
debug!("HTTP/3 request with non-empty body");
Body::from(body_chunk)
};
// trailers
let trailers = if let Some(trailers) = stream.recv_trailers().await? {
debug!("HTTP/3 request with trailers");
trailers
} else {
HeaderMap::new()
};
let mut new_req: Request<Body> = Request::from_parts(req_parts, body);
if let Some(request_trailers) = stream.recv_trailers().await? {
let headers = new_req.headers_mut();
for (ok, v) in request_trailers {
if let Some(k) = ok {
headers.insert(k, v);
}
}
};
let new_req: Request<Body> = Request::from_parts(req_parts, body);
let res = self.handle_request(new_req, client_addr).await?;
println!("{:?}", res);
let (new_res_parts, new_body) = res.into_parts();
let new_res = Response::from_parts(new_res_parts, ());
@ -158,57 +99,12 @@ where
debug!("HTTP/3 response to connection successful");
let data = hyper::body::to_bytes(new_body).await?;
stream.send_data(data).await?;
stream.send_trailers(trailers).await?;
}
Err(err) => {
error!("Unable to send response to connection peer: {:?}", err);
}
}
Ok(())
Ok(stream.finish().await?)
}
}
// TODO:
// async fn handle_request_h3((mut send, recv): (quinn::SendStream, quinn::RecvStream)) -> Result<()> {
// let req = recv
// .read_to_end(64 * 1024)
// .await
// .map_err(|e| anyhow!("failed reading request: {}", e))?;
// // let hyper_req = hyper::Request::try_from(req.clone());
// let mut escaped = String::new();
// for &x in &req[..] {
// let part = ascii::escape_default(x).collect::<Vec<_>>();
// escaped.push_str(str::from_utf8(&part).unwrap());
// }
// info!("content = {:?}", escaped);
// // Execute the request
// let resp = process_get(&req).unwrap_or_else(|e| {
// error!("failed: {}", e);
// format!("failed to process request: {}\n", e).into_bytes()
// });
// // Write the response
// send
// .write_all(&resp)
// .await
// .map_err(|e| anyhow!("failed to send response: {}", e))?;
// // Gracefully terminate the stream
// send
// .finish()
// .await
// .map_err(|e| anyhow!("failed to shutdown stream: {}", e))?;
// info!("complete");
// Ok(())
// }
// fn process_get(x: &[u8]) -> Result<Vec<u8>> {
// if x.len() < 4 || &x[0..4] != b"GET " {
// bail!("missing GET");
// }
// if x[4..].len() < 2 || &x[x.len() - 2..] != b"\r\n" {
// bail!("missing \\r\\n");
// }
// let data = b"hello world!".to_vec();
// Ok(data)
// }