refactor: update logic of round-robin

This commit is contained in:
Jun Kurihara 2023-06-03 14:55:34 +09:00
commit 5cba376394
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
3 changed files with 60 additions and 38 deletions

View file

@ -1,3 +1,9 @@
use derive_builder::Builder;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
};
/// Constants to specify a load balance option /// Constants to specify a load balance option
pub(super) mod load_balance_options { pub(super) mod load_balance_options {
pub const FIX_TO_FIRST: &str = "none"; pub const FIX_TO_FIRST: &str = "none";
@ -6,13 +12,48 @@ pub(super) mod load_balance_options {
pub const STICKY_ROUND_ROBIN: &str = "sticky"; pub const STICKY_ROUND_ROBIN: &str = "sticky";
} }
//
// /// Counter for load balancing
// pub cnt: UpstreamCount,
// TODO: カウンタの移動
#[derive(Debug, Clone, Builder)]
pub struct LbRoundRobinCount {
#[builder(default)]
cnt: Arc<AtomicUsize>,
#[builder(setter(custom), default)]
max_val: usize,
}
impl LbRoundRobinCountBuilder {
pub fn max_val(&mut self, v: &usize) -> &mut Self {
self.max_val = Some(*v);
self
}
}
impl LbRoundRobinCount {
/// Get a current count of upstream served
fn current_cnt(&self) -> usize {
self.cnt.load(Ordering::Relaxed)
}
/// Increment the count of upstream served up to the max value
pub fn increment_cnt(&self) -> usize {
if self.current_cnt() < self.max_val - 1 {
self.cnt.fetch_add(1, Ordering::Relaxed)
} else {
// Clear the counter
self.cnt.fetch_and(0, Ordering::Relaxed)
}
}
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
/// Load Balancing Option /// Load Balancing Option
pub enum LoadBalance { pub enum LoadBalance {
/// Fix to the first upstream. Use if only one upstream destination is specified /// Fix to the first upstream. Use if only one upstream destination is specified
FixToFirst, FixToFirst,
/// Simple round robin without session persistance /// Simple round robin without session persistance
RoundRobin, // TODO: カウンタはここにいれる。randomとかには不要なので RoundRobin(LbRoundRobinCount), // TODO: カウンタはここにいれる。randomとかには不要なので
/// Randomly chose one upstream server /// Randomly chose one upstream server
Random, Random,
/// Round robin with session persistance using cookie /// Round robin with session persistance using cookie

View file

@ -1,18 +1,12 @@
use super::{ use super::{
load_balance::{load_balance_options as lb_opts, LoadBalance}, load_balance::{load_balance_options as lb_opts, LbRoundRobinCountBuilder, LoadBalance},
BytesName, PathNameBytesExp, UpstreamOption, BytesName, PathNameBytesExp, UpstreamOption,
}; };
use crate::log::*; use crate::log::*;
use derive_builder::Builder; use derive_builder::Builder;
use rand::Rng; use rand::Rng;
use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet}; use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet};
use std::{ use std::borrow::Cow;
borrow::Cow,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ReverseProxy { pub struct ReverseProxy {
@ -76,9 +70,6 @@ pub struct UpstreamGroup {
#[builder(setter(custom), default)] #[builder(setter(custom), default)]
/// Load balancing option /// Load balancing option
pub lb: LoadBalance, pub lb: LoadBalance,
#[builder(default)]
/// Counter for load balancing
pub cnt: UpstreamCount,
#[builder(setter(custom), default)] #[builder(setter(custom), default)]
/// Activated upstream options defined in [[UpstreamOption]] /// Activated upstream options defined in [[UpstreamOption]]
pub opts: HashSet<UpstreamOption>, pub opts: HashSet<UpstreamOption>,
@ -101,11 +92,16 @@ impl UpstreamGroupBuilder {
); );
self self
} }
pub fn lb(&mut self, v: &Option<String>) -> &mut Self { pub fn lb(&mut self, v: &Option<String>, upstream_num: &usize) -> &mut Self {
let lb = if let Some(x) = v { let lb = if let Some(x) = v {
match x.as_str() { match x.as_str() {
lb_opts::FIX_TO_FIRST => LoadBalance::FixToFirst, lb_opts::FIX_TO_FIRST => LoadBalance::FixToFirst,
lb_opts::ROUND_ROBIN => LoadBalance::RoundRobin, lb_opts::ROUND_ROBIN => LoadBalance::RoundRobin(
LbRoundRobinCountBuilder::default()
.max_val(upstream_num)
.build()
.unwrap(),
),
lb_opts::RANDOM => LoadBalance::Random, lb_opts::RANDOM => LoadBalance::Random,
lb_opts::STICKY_ROUND_ROBIN => LoadBalance::StickyRoundRobin, lb_opts::STICKY_ROUND_ROBIN => LoadBalance::StickyRoundRobin,
_ => { _ => {
@ -133,17 +129,13 @@ impl UpstreamGroupBuilder {
} }
} }
// TODO: カウンタの移動
#[derive(Debug, Clone, Default)]
pub struct UpstreamCount(Arc<AtomicUsize>);
impl UpstreamGroup { impl UpstreamGroup {
/// Get an enabled option of load balancing [[LoadBalance]] /// Get an enabled option of load balancing [[LoadBalance]]
pub fn get(&self) -> Option<&Upstream> { pub fn get(&self) -> Option<&Upstream> {
match self.lb { match &self.lb {
LoadBalance::FixToFirst => self.upstream.get(0), LoadBalance::FixToFirst => self.upstream.get(0),
LoadBalance::RoundRobin => { LoadBalance::RoundRobin(cnt) => {
let idx = self.increment_cnt(); let idx = cnt.increment_cnt();
self.upstream.get(idx) self.upstream.get(idx)
} }
LoadBalance::Random => { LoadBalance::Random => {
@ -154,18 +146,4 @@ impl UpstreamGroup {
LoadBalance::StickyRoundRobin => todo!(), // TODO: TODO: LoadBalance::StickyRoundRobin => todo!(), // TODO: TODO:
} }
} }
/// Get a current count of upstream served
fn current_cnt(&self) -> usize {
self.cnt.0.load(Ordering::Relaxed)
}
/// Increment count of upstream served
fn increment_cnt(&self) -> usize {
if self.current_cnt() < self.upstream.len() - 1 {
self.cnt.0.fetch_add(1, Ordering::Relaxed)
} else {
self.cnt.0.fetch_and(0, Ordering::Relaxed)
}
}
} }

View file

@ -1,6 +1,6 @@
use super::toml::{ConfigToml, ReverseProxyOption}; use super::toml::{ConfigToml, ReverseProxyOption};
use crate::{ use crate::{
backend::{BackendBuilder, ReverseProxy, UpstreamGroup, UpstreamGroupBuilder, UpstreamOption}, backend::{BackendBuilder, ReverseProxy, Upstream, UpstreamGroup, UpstreamGroupBuilder, UpstreamOption},
constants::*, constants::*,
error::*, error::*,
globals::*, globals::*,
@ -200,12 +200,15 @@ pub fn parse_opts(globals: &mut Globals) -> std::result::Result<(), anyhow::Erro
fn get_reverse_proxy(rp_settings: &[ReverseProxyOption]) -> std::result::Result<ReverseProxy, anyhow::Error> { fn get_reverse_proxy(rp_settings: &[ReverseProxyOption]) -> std::result::Result<ReverseProxy, anyhow::Error> {
let mut upstream: HashMap<PathNameBytesExp, UpstreamGroup> = HashMap::default(); let mut upstream: HashMap<PathNameBytesExp, UpstreamGroup> = HashMap::default();
rp_settings.iter().for_each(|rpo| { rp_settings.iter().for_each(|rpo| {
let vec_upstream: Vec<Upstream> = rpo.upstream.iter().map(|x| x.to_upstream().unwrap()).collect();
let lb_upstream_num = vec_upstream.len();
let elem = UpstreamGroupBuilder::default() let elem = UpstreamGroupBuilder::default()
.upstream(rpo.upstream.iter().map(|x| x.to_upstream().unwrap()).collect()) .upstream(vec_upstream)
.path(&rpo.path) .path(&rpo.path)
.replace_path(&rpo.replace_path) .replace_path(&rpo.replace_path)
.lb(&rpo.load_balance) .lb(&rpo.load_balance, &lb_upstream_num)
.opts(&rpo.upstream_options) .opts(&rpo.upstream_options)
.build() .build()
.unwrap(); .unwrap();