refactor http handler

This commit is contained in:
Jun Kurihara 2023-07-28 15:18:21 +09:00
commit a40d8a0072
No known key found for this signature in database
GPG key ID: D992B3E3DE1DED23
7 changed files with 91 additions and 72 deletions

View file

@ -19,6 +19,8 @@ use std::{env, net::SocketAddr, sync::Arc};
use tokio::{io::copy_bidirectional, time::timeout};
#[derive(Clone, Builder)]
/// HTTP message handler for requests from clients and responses from backend applications,
/// responsible to manipulate and forward messages to upstream backends and downstream clients.
pub struct HttpMessageHandler<T, U>
where
T: Connect + Clone + Sync + Send + 'static,
@ -33,11 +35,13 @@ where
T: Connect + Clone + Sync + Send + 'static,
U: CryptoSource + Clone,
{
/// Return with an arbitrary status code of error and log message
fn return_with_error_log(&self, status_code: StatusCode, log_data: &mut MessageLog) -> Result<Response<Body>> {
log_data.status_code(&status_code).output();
http_error(status_code)
}
/// Handle incoming request message from a client
pub async fn handle_request(
self,
mut req: Request<Body>,
@ -65,13 +69,15 @@ where
}
}
// Find backend application for given server_name, and drop if incoming request is invalid as request.
let backend = if let Some(be) = self.globals.backends.apps.get(&server_name) {
be
} else if let Some(default_server_name) = &self.globals.backends.default_server_name_bytes {
debug!("Serving by default app");
self.globals.backends.apps.get(default_server_name).unwrap()
} else {
return self.return_with_error_log(StatusCode::SERVICE_UNAVAILABLE, &mut log_data);
let backend = match self.globals.backends.apps.get(&server_name) {
Some(be) => be,
None => {
let Some(default_server_name) = &self.globals.backends.default_server_name_bytes else {
return self.return_with_error_log(StatusCode::SERVICE_UNAVAILABLE, &mut log_data);
};
debug!("Serving by default app");
self.globals.backends.apps.get(default_server_name).unwrap()
}
};
// Redirect to https if !tls_enabled and redirect_to_https is true
@ -84,9 +90,8 @@ where
// Find reverse proxy for given path and choose one of upstream host
// Longest prefix match
let path = req.uri().path();
let upstream_group = match backend.reverse_proxy.get(path) {
Some(ug) => ug,
None => return self.return_with_error_log(StatusCode::NOT_FOUND, &mut log_data),
let Some(upstream_group) = backend.reverse_proxy.get(path) else {
return self.return_with_error_log(StatusCode::NOT_FOUND, &mut log_data)
};
// Upgrade in request header
@ -113,19 +118,17 @@ where
log_data.upstream(req.uri());
//////
// Forward request to
// Forward request to a chosen backend
let mut res_backend = {
match timeout(self.globals.proxy_config.upstream_timeout, self.forwarder.request(req)).await {
Err(_) => {
return self.return_with_error_log(StatusCode::GATEWAY_TIMEOUT, &mut log_data);
let Ok(result) = timeout(self.globals.proxy_config.upstream_timeout, self.forwarder.request(req)).await else {
return self.return_with_error_log(StatusCode::GATEWAY_TIMEOUT, &mut log_data);
};
match result {
Ok(res) => res,
Err(e) => {
error!("Failed to get response from backend: {}", e);
return self.return_with_error_log(StatusCode::SERVICE_UNAVAILABLE, &mut log_data);
}
Ok(x) => match x {
Ok(res) => res,
Err(e) => {
error!("Failed to get response from backend: {}", e);
return self.return_with_error_log(StatusCode::SERVICE_UNAVAILABLE, &mut log_data);
}
},
}
};
@ -141,62 +144,63 @@ where
if res_backend.status() != StatusCode::SWITCHING_PROTOCOLS {
// Generate response to client
if self.generate_response_forwarded(&mut res_backend, backend).is_ok() {
log_data.status_code(&res_backend.status()).output();
return Ok(res_backend);
} else {
if self.generate_response_forwarded(&mut res_backend, backend).is_err() {
return self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data);
}
log_data.status_code(&res_backend.status()).output();
return Ok(res_backend);
}
// Handle StatusCode::SWITCHING_PROTOCOLS in response
let upgrade_in_response = extract_upgrade(res_backend.headers());
if if let (Some(u_req), Some(u_res)) = (upgrade_in_request.as_ref(), upgrade_in_response.as_ref()) {
let should_upgrade = if let (Some(u_req), Some(u_res)) = (upgrade_in_request.as_ref(), upgrade_in_response.as_ref())
{
u_req.to_ascii_lowercase() == u_res.to_ascii_lowercase()
} else {
false
} {
if let Some(request_upgraded) = request_upgraded {
let Some(onupgrade) = res_backend.extensions_mut().remove::<hyper::upgrade::OnUpgrade>() else {
error!("Response does not have an upgrade extension");
return self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data);
};
self.globals.runtime_handle.spawn(async move {
let mut response_upgraded = onupgrade.await.map_err(|e| {
error!("Failed to upgrade response: {}", e);
RpxyError::Hyper(e)
})?;
let mut request_upgraded = request_upgraded.await.map_err(|e| {
error!("Failed to upgrade request: {}", e);
RpxyError::Hyper(e)
})?;
copy_bidirectional(&mut response_upgraded, &mut request_upgraded)
.await
.map_err(|e| {
error!("Coping between upgraded connections failed: {}", e);
RpxyError::Io(e)
})?;
Ok(()) as Result<()>
});
log_data.status_code(&res_backend.status()).output();
Ok(res_backend)
} else {
error!("Request does not have an upgrade extension");
self.return_with_error_log(StatusCode::BAD_REQUEST, &mut log_data)
}
} else {
};
if !should_upgrade {
error!(
"Backend tried to switch to protocol {:?} when {:?} was requested",
upgrade_in_response, upgrade_in_request
);
self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data)
return self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data);
}
let Some(request_upgraded) = request_upgraded else {
error!("Request does not have an upgrade extension");
return self.return_with_error_log(StatusCode::BAD_REQUEST, &mut log_data);
};
let Some(onupgrade) = res_backend.extensions_mut().remove::<hyper::upgrade::OnUpgrade>() else {
error!("Response does not have an upgrade extension");
return self.return_with_error_log(StatusCode::INTERNAL_SERVER_ERROR, &mut log_data);
};
self.globals.runtime_handle.spawn(async move {
let mut response_upgraded = onupgrade.await.map_err(|e| {
error!("Failed to upgrade response: {}", e);
RpxyError::Hyper(e)
})?;
let mut request_upgraded = request_upgraded.await.map_err(|e| {
error!("Failed to upgrade request: {}", e);
RpxyError::Hyper(e)
})?;
copy_bidirectional(&mut response_upgraded, &mut request_upgraded)
.await
.map_err(|e| {
error!("Coping between upgraded connections failed: {}", e);
RpxyError::Io(e)
})?;
Ok(()) as Result<()>
});
log_data.status_code(&res_backend.status()).output();
Ok(res_backend)
}
////////////////////////////////////////////////////
// Functions to generate messages
////////////////////////////////////////////////////
/// Manipulate a response message sent from a backend application to forward downstream to a client.
fn generate_response_forwarded<B>(&self, response: &mut Response<B>, chosen_backend: &Backend<U>) -> Result<()>
where
B: core::fmt::Debug,
@ -208,7 +212,8 @@ where
#[cfg(feature = "http3")]
{
// TODO: Workaround for avoid h3 for client authentication
// Manipulate ALT_SVC allowing h3 in response message only when mutual TLS is not enabled
// TODO: This is a workaround for avoiding a client authentication in HTTP/3
if self.globals.proxy_config.http3
&& chosen_backend
.crypto_source
@ -241,6 +246,7 @@ where
}
#[allow(clippy::too_many_arguments)]
/// Manipulate a request message sent from a client to forward upstream to a backend application
fn generate_request_forwarded<B>(
&self,
client_addr: &SocketAddr,