use hyper-tls and hyper-trust-dns as http(s) clients

This commit is contained in:
Jun Kurihara 2022-06-16 20:01:26 -04:00
commit 8fff4f4088
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
4 changed files with 138 additions and 80 deletions

View file

@ -14,6 +14,7 @@ publish = false
[features] [features]
default = ["tls"] default = ["tls"]
tls = ["tokio-rustls", "rustls-pemfile"] tls = ["tokio-rustls", "rustls-pemfile"]
forward-hyper-trust-dns = ["hyper-trust-dns"]
[dependencies] [dependencies]
anyhow = "1.0.57" anyhow = "1.0.57"
@ -40,6 +41,13 @@ tokio-rustls = { version = "0.23.4", features = [
"early-data", "early-data",
], optional = true } ], optional = true }
rustls-pemfile = { version = "1.0.0", optional = true } rustls-pemfile = { version = "1.0.0", optional = true }
hyper-trust-dns = { version = "0.4.2", default-features = false, features = [
"rustls-http2",
"dnssec-ring",
"dns-over-https-rustls",
"rustls-webpki",
], optional = true }
hyper-tls = "0.5.0"
[dev-dependencies] [dev-dependencies]

View file

@ -1,12 +1,12 @@
use crate::{error::*, globals::Globals, log::*}; use crate::{error::*, globals::Globals, log::*};
use futures::{ use futures::{
task::{Context, Poll}, task::{Context, Poll},
Future, Future,
}; };
use hyper::http; use hyper::{
use hyper::server::conn::Http; client::connect::Connect, http, server::conn::Http, Body, Client, HeaderMap, Method, Request,
use hyper::{Body, HeaderMap, Method, Request, Response, StatusCode}; Response, StatusCode,
};
use std::{net::SocketAddr, pin::Pin, sync::Arc}; use std::{net::SocketAddr, pin::Pin, sync::Arc};
use tokio::{ use tokio::{
io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt}, io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt},
@ -46,13 +46,20 @@ where
} }
#[derive(Clone)] #[derive(Clone)]
pub struct PacketAcceptor { pub struct PacketAcceptor<T>
where
T: hyper::client::connect::Connect + Send + Sync + Clone + 'static,
{
pub listening_on: SocketAddr, pub listening_on: SocketAddr,
pub forwarder: Client<T>,
pub globals: Arc<Globals>, pub globals: Arc<Globals>,
} }
#[allow(clippy::type_complexity)] #[allow(clippy::type_complexity)]
impl hyper::service::Service<http::Request<Body>> for PacketAcceptor { impl<T> hyper::service::Service<http::Request<Body>> for PacketAcceptor<T>
where
T: Connect + Clone + Send + Sync + 'static,
{
type Response = Response<Body>; type Response = Response<Body>;
type Error = http::Error; type Error = http::Error;
@ -63,84 +70,114 @@ impl hyper::service::Service<http::Request<Body>> for PacketAcceptor {
} }
fn call(&mut self, req: Request<Body>) -> Self::Future { fn call(&mut self, req: Request<Body>) -> Self::Future {
debug!("\nserve:{:?}\n{:?}", self.listening_on, req); debug!("\nserve: {:?}\n{:?}", self.listening_on, req);
// let globals = &self.doh.globals; let self_inner = self.clone();
// let self_inner = self.clone();
// if req.uri().path() == globals.path { // 1. check uri (domain queried host name)
// Box::pin(async move { // 2. build uri to forwarding target destination
// let mut subscriber = None; // 3. build request from uri and body
// if self_inner.doh.globals.enable_auth_target { // 4. send request to forwarding target
// subscriber = match auth::authenticate(
// &self_inner.doh.globals, if *req.method() == Method::GET {
// &req, Box::pin(async move {
// ValidationLocation::Target, // let uri = req.uri();
// &self_inner.peer_addr, let target_uri = hyper::Uri::builder()
// ) { .scheme("https")
// Ok((sub, aud)) => { .authority("www.google.com")
// debug!("Valid token or allowed ip: sub={:?}, aud={:?}", &sub, &aud); .path_and_query("/")
// sub .build()
// } .unwrap();
// Err(e) => { println!("{:?}", target_uri);
// error!("{:?}", e); match self_inner.forwarder.get(target_uri).await {
// return Ok(e); Ok(res) => Ok(res),
// } Err(e) => {
// }; error!("{:?}", e);
// } http_error(StatusCode::INTERNAL_SERVER_ERROR)
// match *req.method() { }
// Method::POST => self_inner.doh.serve_post(req, subscriber).await, }
// Method::GET => self_inner.doh.serve_get(req, subscriber).await, })
// _ => http_error(StatusCode::METHOD_NOT_ALLOWED), } else {
// } // let globals = &self.doh.globals;
// }) // let self_inner = self.clone();
// } else if req.uri().path() == globals.odoh_configs_path { // if req.uri().path() == globals.path {
// match *req.method() { // Box::pin(async move {
// Method::GET => Box::pin(async move { self_inner.doh.serve_odoh_configs().await }), // let mut subscriber = None;
// _ => Box::pin(async { http_error(StatusCode::METHOD_NOT_ALLOWED) }), // if self_inner.doh.globals.enable_auth_target {
// } // subscriber = match auth::authenticate(
// } else { // &self_inner.doh.globals,
// #[cfg(not(feature = "odoh-proxy"))] // &req,
// { // ValidationLocation::Target,
// Box::pin(async { http_error(StatusCode::NOT_FOUND) }) // &self_inner.peer_addr,
// } // ) {
// #[cfg(feature = "odoh-proxy")] // Ok((sub, aud)) => {
// { // debug!("Valid token or allowed ip: sub={:?}, aud={:?}", &sub, &aud);
// if req.uri().path() == globals.odoh_proxy_path { // sub
// Box::pin(async move { // }
// let mut subscriber = None; // Err(e) => {
// if self_inner.doh.globals.enable_auth_proxy { // error!("{:?}", e);
// subscriber = match auth::authenticate( // return Ok(e);
// &self_inner.doh.globals, // }
// &req, // };
// ValidationLocation::Proxy, // }
// &self_inner.peer_addr, // match *req.method() {
// ) { // Method::POST => self_inner.doh.serve_post(req, subscriber).await,
// Ok((sub, aud)) => { // Method::GET => self_inner.doh.serve_get(req, subscriber).await,
// debug!("Valid token or allowed ip: sub={:?}, aud={:?}", &sub, &aud); // _ => http_error(StatusCode::METHOD_NOT_ALLOWED),
// sub // }
// } // })
// Err(e) => { // } else if req.uri().path() == globals.odoh_configs_path {
// error!("{:?}", e); // match *req.method() {
// return Ok(e); // Method::GET => Box::pin(async move { self_inner.doh.serve_odoh_configs().await }),
// } // _ => Box::pin(async { http_error(StatusCode::METHOD_NOT_ALLOWED) }),
// }; // }
// } // } else {
// // Draft: https://datatracker.ietf.org/doc/html/draft-pauly-dprive-oblivious-doh-11 // #[cfg(not(feature = "odoh-proxy"))]
// // Golang impl.: https://github.com/cloudflare/odoh-server-go // {
// // Based on the draft and Golang implementation, only post method is allowed. // Box::pin(async { http_error(StatusCode::NOT_FOUND) })
// match *req.method() { // }
// Method::POST => self_inner.doh.serve_odoh_proxy_post(req, subscriber).await, // #[cfg(feature = "odoh-proxy")]
// _ => http_error(StatusCode::METHOD_NOT_ALLOWED), // {
// } // if req.uri().path() == globals.odoh_proxy_path {
// }) // Box::pin(async move {
// } else { // let mut subscriber = None;
Box::pin(async { http_error(StatusCode::NOT_FOUND) }) // if self_inner.doh.globals.enable_auth_proxy {
// subscriber = match auth::authenticate(
// &self_inner.doh.globals,
// &req,
// ValidationLocation::Proxy,
// &self_inner.peer_addr,
// ) {
// Ok((sub, aud)) => {
// debug!("Valid token or allowed ip: sub={:?}, aud={:?}", &sub, &aud);
// sub
// }
// Err(e) => {
// error!("{:?}", e);
// return Ok(e);
// }
// };
// }
// // Draft: https://datatracker.ietf.org/doc/html/draft-pauly-dprive-oblivious-doh-11
// // Golang impl.: https://github.com/cloudflare/odoh-server-go
// // Based on the draft and Golang implementation, only post method is allowed.
// match *req.method() {
// Method::POST => self_inner.doh.serve_odoh_proxy_post(req, subscriber).await,
// _ => http_error(StatusCode::METHOD_NOT_ALLOWED),
// }
// })
// } else {
Box::pin(async { http_error(StatusCode::NOT_FOUND) })
}
// } // }
// } // }
// } // }
} }
} }
impl PacketAcceptor { impl<T> PacketAcceptor<T>
where
T: Connect + Clone + Send + Sync + 'static,
{
pub async fn client_serve<I>(self, stream: I, server: Http<LocalExecutor>, peer_addr: SocketAddr) pub async fn client_serve<I>(self, stream: I, server: Http<LocalExecutor>, peer_addr: SocketAddr)
where where
I: AsyncRead + AsyncWrite + Send + Unpin + 'static, I: AsyncRead + AsyncWrite + Send + Unpin + 'static,

View file

@ -1,5 +1,8 @@
use crate::{acceptor::PacketAcceptor, error::*, globals::Globals, log::*}; use crate::{acceptor::PacketAcceptor, error::*, globals::Globals, log::*};
use futures::future::select_all; use futures::future::select_all;
use hyper::Client;
#[cfg(feature = "forward-hyper-trust-dns")]
use hyper_trust_dns::TrustDnsResolver;
use std::sync::Arc; use std::sync::Arc;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -11,9 +14,16 @@ impl Proxy {
let addresses = self.globals.listen_addresses.clone(); let addresses = self.globals.listen_addresses.clone();
let futures = select_all(addresses.into_iter().map(|addr| { let futures = select_all(addresses.into_iter().map(|addr| {
info!("Listen address: {:?}", addr); info!("Listen address: {:?}", addr);
#[cfg(feature = "forward-hyper-trust-dns")]
let connector = TrustDnsResolver::default().into_rustls_webpki_https_connector();
#[cfg(not(feature = "forward-hyper-trust-dns"))]
let connector = hyper_tls::HttpsConnector::new();
let acceptor = PacketAcceptor { let acceptor = PacketAcceptor {
listening_on: addr, listening_on: addr,
globals: self.globals.clone(), globals: self.globals.clone(),
forwarder: Client::builder().build::<_, hyper::Body>(connector),
}; };
self.globals.runtime_handle.spawn(acceptor.start()) self.globals.runtime_handle.spawn(acceptor.start())
})); }));

View file

@ -5,7 +5,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use futures::{future::FutureExt, join, select}; use futures::{future::FutureExt, join, select};
use hyper::server::conn::Http; use hyper::{client::connect::Connect, server::conn::Http};
use tokio::{ use tokio::{
net::TcpListener, net::TcpListener,
sync::mpsc::{self, Receiver}, sync::mpsc::{self, Receiver},
@ -109,7 +109,10 @@ where
Ok(TlsAcceptor::from(Arc::new(server_config))) Ok(TlsAcceptor::from(Arc::new(server_config)))
} }
impl PacketAcceptor { impl<T> PacketAcceptor<T>
where
T: Connect + Clone + Send + Sync + 'static,
{
async fn start_https_service( async fn start_https_service(
self, self,
mut tls_acceptor_receiver: Receiver<TlsAcceptor>, mut tls_acceptor_receiver: Receiver<TlsAcceptor>,