add lb. todo: toml

This commit is contained in:
Jun Kurihara 2022-06-25 10:33:55 -04:00
commit e2ebb304c1
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
5 changed files with 104 additions and 24 deletions

View file

@ -1,10 +1,14 @@
use crate::log::*;
use rand::Rng;
use std::{
collections::HashMap,
fs::File,
io::{self, BufReader, Cursor, Read},
path::PathBuf,
sync::Mutex,
sync::{
atomic::{AtomicUsize, Ordering},
Arc, Mutex,
},
};
use tokio_rustls::rustls::{Certificate, PrivateKey, ServerConfig};
@ -20,8 +24,58 @@ pub struct Backend {
#[derive(Debug, Clone)]
pub struct ReverseProxy {
pub default_destination_uri: hyper::Uri,
pub destination_uris: HashMap<String, hyper::Uri>, // TODO: url pathで引っ掛ける。
pub default_upstream: Upstream,
pub upstream: HashMap<String, Upstream>,
}
#[allow(dead_code)]
#[derive(Debug, Clone)]
pub enum LoadBalance {
RoundRobin,
Random,
}
impl Default for LoadBalance {
fn default() -> Self {
Self::RoundRobin
}
}
#[derive(Debug, Clone)]
pub struct Upstream {
pub uri: Vec<hyper::Uri>,
pub lb: LoadBalance,
pub cnt: UpstreamCount, // counter for load balancing
}
#[derive(Debug, Clone, Default)]
pub struct UpstreamCount(Arc<AtomicUsize>);
impl Upstream {
pub fn get(&self) -> Option<&hyper::Uri> {
match self.lb {
LoadBalance::RoundRobin => {
let idx = self.increment_cnt();
self.uri.get(idx)
}
LoadBalance::Random => {
let mut rng = rand::thread_rng();
let max = self.uri.len() - 1;
self.uri.get(rng.gen_range(0..max))
}
}
}
fn current_cnt(&self) -> usize {
self.cnt.0.load(Ordering::Relaxed)
}
fn increment_cnt(&self) -> usize {
if self.current_cnt() < self.uri.len() - 1 {
self.cnt.0.fetch_add(1, Ordering::Relaxed)
} else {
self.cnt.0.fetch_and(0, Ordering::Relaxed)
}
}
}
impl Backend {

View file

@ -21,10 +21,17 @@ pub fn parse_opts(globals: &mut Globals, backends: &mut HashMap<String, Backend>
globals.https_port = Some(HTTPS_LISTEN_PORT);
// TODO:
let mut map_example: HashMap<String, Uri> = HashMap::new();
let mut map_example: HashMap<String, Upstream> = HashMap::new();
map_example.insert(
"/maps".to_string(),
"https://www.bing.com".parse::<Uri>().unwrap(),
Upstream {
uri: vec![
"https://www.bing.com".parse::<Uri>().unwrap(),
"https://www.bing.co.jp".parse::<Uri>().unwrap(),
],
cnt: Default::default(),
lb: Default::default(),
},
);
backends.insert(
"localhost".to_string(),
@ -32,9 +39,16 @@ pub fn parse_opts(globals: &mut Globals, backends: &mut HashMap<String, Backend>
app_name: "Localhost to Google except for maps".to_string(),
hostname: "localhost".to_string(),
reverse_proxy: ReverseProxy {
// default_destination_uri: "https://www.google.com".parse::<Uri>().unwrap(),
default_destination_uri: "http://abehiroshi.la.coocan.jp/".parse::<Uri>().unwrap(), // httpのみの場合の好例
destination_uris: map_example,
default_upstream: Upstream {
uri: vec![
"https://www.google.com".parse::<Uri>().unwrap(),
"https://www.google.co.jp".parse::<Uri>().unwrap(),
],
cnt: Default::default(),
lb: Default::default(),
},
// default_upstream_uri: vec!["http://abehiroshi.la.coocan.jp/".parse::<Uri>().unwrap()], // httpのみの場合の好例
upstream: map_example,
},
https_redirection: Some(false), // TODO: ここはtlsが存在する時はSomeにすべき。Noneはtlsがないときのみのはず

View file

@ -52,14 +52,18 @@ where
return secure_redirection(&hostname, self.globals.https_port, &path_and_query);
}
// Find reverse proxy for given path
// Find reverse proxy for given path and choose one of upstream host
let path = req.uri().path();
let destination_scheme_host =
if let Some(uri) = backend.reverse_proxy.destination_uris.get(path) {
uri.to_owned()
} else {
backend.reverse_proxy.default_destination_uri.clone()
};
let upstream_uri = if let Some(upstream) = backend.reverse_proxy.upstream.get(path) {
upstream.get()
} else {
backend.reverse_proxy.default_upstream.get()
};
let upstream_scheme_host = if let Some(u) = upstream_uri {
u
} else {
return http_error(StatusCode::INTERNAL_SERVER_ERROR);
};
// Upgrade in request header
let upgrade_in_request = extract_upgrade(req.headers());
@ -69,7 +73,7 @@ where
let req_forwarded = if let Ok(req) = generate_request_forwarded(
client_addr,
req,
destination_scheme_host,
upstream_scheme_host,
path_and_query,
&upgrade_in_request,
) {
@ -139,7 +143,7 @@ fn generate_response_forwarded<B: core::fmt::Debug>(response: &mut Response<B>)
fn generate_request_forwarded<B: core::fmt::Debug>(
client_addr: SocketAddr,
mut req: Request<B>,
destination_scheme_host: Uri,
upstream_scheme_host: &Uri,
path_and_query: String,
upgrade: &Option<String>,
) -> Result<Request<B>> {
@ -174,8 +178,8 @@ fn generate_request_forwarded<B: core::fmt::Debug>(
// update uri in request
*req.uri_mut() = Uri::builder()
.scheme(destination_scheme_host.scheme().unwrap().as_str())
.authority(destination_scheme_host.authority().unwrap().as_str())
.scheme(upstream_scheme_host.scheme().unwrap().as_str())
.authority(upstream_scheme_host.authority().unwrap().as_str())
.path_and_query(&path_and_query)
.build()?;
@ -188,7 +192,7 @@ fn generate_request_forwarded<B: core::fmt::Debug>(
}
// Change version to http/1.1 when destination scheme is http
if req.version() != Version::HTTP_11 && destination_scheme_host.scheme() == Some(&Scheme::HTTP) {
if req.version() != Version::HTTP_11 && upstream_scheme_host.scheme() == Some(&Scheme::HTTP) {
*req.version_mut() = Version::HTTP_11;
}