From 5cba376394b4d6d055f58fb97338d053fa936c8b Mon Sep 17 00:00:00 2001 From: Jun Kurihara Date: Sat, 3 Jun 2023 14:55:34 +0900 Subject: [PATCH] refactor: update logic of round-robin --- src/backend/load_balance.rs | 43 +++++++++++++++++++++++++++++++++- src/backend/upstream.rs | 46 ++++++++++--------------------------- src/config/parse.rs | 9 +++++--- 3 files changed, 60 insertions(+), 38 deletions(-) diff --git a/src/backend/load_balance.rs b/src/backend/load_balance.rs index a189c6f..f6b2292 100644 --- a/src/backend/load_balance.rs +++ b/src/backend/load_balance.rs @@ -1,3 +1,9 @@ +use derive_builder::Builder; +use std::sync::{ + atomic::{AtomicUsize, Ordering}, + Arc, +}; + /// Constants to specify a load balance option pub(super) mod load_balance_options { pub const FIX_TO_FIRST: &str = "none"; @@ -6,13 +12,48 @@ pub(super) mod load_balance_options { pub const STICKY_ROUND_ROBIN: &str = "sticky"; } +// +// /// Counter for load balancing +// pub cnt: UpstreamCount, + +// TODO: カウンタの移動 +#[derive(Debug, Clone, Builder)] +pub struct LbRoundRobinCount { + #[builder(default)] + cnt: Arc, + #[builder(setter(custom), default)] + max_val: usize, +} +impl LbRoundRobinCountBuilder { + pub fn max_val(&mut self, v: &usize) -> &mut Self { + self.max_val = Some(*v); + self + } +} +impl LbRoundRobinCount { + /// Get a current count of upstream served + fn current_cnt(&self) -> usize { + self.cnt.load(Ordering::Relaxed) + } + + /// Increment the count of upstream served up to the max value + pub fn increment_cnt(&self) -> usize { + if self.current_cnt() < self.max_val - 1 { + self.cnt.fetch_add(1, Ordering::Relaxed) + } else { + // Clear the counter + self.cnt.fetch_and(0, Ordering::Relaxed) + } + } +} + #[derive(Debug, Clone)] /// Load Balancing Option pub enum LoadBalance { /// Fix to the first upstream. Use if only one upstream destination is specified FixToFirst, /// Simple round robin without session persistance - RoundRobin, // TODO: カウンタはここにいれる。randomとかには不要なので + RoundRobin(LbRoundRobinCount), // TODO: カウンタはここにいれる。randomとかには不要なので /// Randomly chose one upstream server Random, /// Round robin with session persistance using cookie diff --git a/src/backend/upstream.rs b/src/backend/upstream.rs index 10d0a3c..5875189 100644 --- a/src/backend/upstream.rs +++ b/src/backend/upstream.rs @@ -1,18 +1,12 @@ use super::{ - load_balance::{load_balance_options as lb_opts, LoadBalance}, + load_balance::{load_balance_options as lb_opts, LbRoundRobinCountBuilder, LoadBalance}, BytesName, PathNameBytesExp, UpstreamOption, }; use crate::log::*; use derive_builder::Builder; use rand::Rng; use rustc_hash::{FxHashMap as HashMap, FxHashSet as HashSet}; -use std::{ - borrow::Cow, - sync::{ - atomic::{AtomicUsize, Ordering}, - Arc, - }, -}; +use std::borrow::Cow; #[derive(Debug, Clone)] pub struct ReverseProxy { @@ -76,9 +70,6 @@ pub struct UpstreamGroup { #[builder(setter(custom), default)] /// Load balancing option pub lb: LoadBalance, - #[builder(default)] - /// Counter for load balancing - pub cnt: UpstreamCount, #[builder(setter(custom), default)] /// Activated upstream options defined in [[UpstreamOption]] pub opts: HashSet, @@ -101,11 +92,16 @@ impl UpstreamGroupBuilder { ); self } - pub fn lb(&mut self, v: &Option) -> &mut Self { + pub fn lb(&mut self, v: &Option, upstream_num: &usize) -> &mut Self { let lb = if let Some(x) = v { match x.as_str() { lb_opts::FIX_TO_FIRST => LoadBalance::FixToFirst, - lb_opts::ROUND_ROBIN => LoadBalance::RoundRobin, + lb_opts::ROUND_ROBIN => LoadBalance::RoundRobin( + LbRoundRobinCountBuilder::default() + .max_val(upstream_num) + .build() + .unwrap(), + ), lb_opts::RANDOM => LoadBalance::Random, lb_opts::STICKY_ROUND_ROBIN => LoadBalance::StickyRoundRobin, _ => { @@ -133,17 +129,13 @@ impl UpstreamGroupBuilder { } } -// TODO: カウンタの移動 -#[derive(Debug, Clone, Default)] -pub struct UpstreamCount(Arc); - impl UpstreamGroup { /// Get an enabled option of load balancing [[LoadBalance]] pub fn get(&self) -> Option<&Upstream> { - match self.lb { + match &self.lb { LoadBalance::FixToFirst => self.upstream.get(0), - LoadBalance::RoundRobin => { - let idx = self.increment_cnt(); + LoadBalance::RoundRobin(cnt) => { + let idx = cnt.increment_cnt(); self.upstream.get(idx) } LoadBalance::Random => { @@ -154,18 +146,4 @@ impl UpstreamGroup { LoadBalance::StickyRoundRobin => todo!(), // TODO: TODO: } } - - /// Get a current count of upstream served - fn current_cnt(&self) -> usize { - self.cnt.0.load(Ordering::Relaxed) - } - - /// Increment count of upstream served - fn increment_cnt(&self) -> usize { - if self.current_cnt() < self.upstream.len() - 1 { - self.cnt.0.fetch_add(1, Ordering::Relaxed) - } else { - self.cnt.0.fetch_and(0, Ordering::Relaxed) - } - } } diff --git a/src/config/parse.rs b/src/config/parse.rs index d023f56..88892a8 100644 --- a/src/config/parse.rs +++ b/src/config/parse.rs @@ -1,6 +1,6 @@ use super::toml::{ConfigToml, ReverseProxyOption}; use crate::{ - backend::{BackendBuilder, ReverseProxy, UpstreamGroup, UpstreamGroupBuilder, UpstreamOption}, + backend::{BackendBuilder, ReverseProxy, Upstream, UpstreamGroup, UpstreamGroupBuilder, UpstreamOption}, constants::*, error::*, globals::*, @@ -200,12 +200,15 @@ pub fn parse_opts(globals: &mut Globals) -> std::result::Result<(), anyhow::Erro fn get_reverse_proxy(rp_settings: &[ReverseProxyOption]) -> std::result::Result { let mut upstream: HashMap = HashMap::default(); + rp_settings.iter().for_each(|rpo| { + let vec_upstream: Vec = rpo.upstream.iter().map(|x| x.to_upstream().unwrap()).collect(); + let lb_upstream_num = vec_upstream.len(); let elem = UpstreamGroupBuilder::default() - .upstream(rpo.upstream.iter().map(|x| x.to_upstream().unwrap()).collect()) + .upstream(vec_upstream) .path(&rpo.path) .replace_path(&rpo.replace_path) - .lb(&rpo.load_balance) + .lb(&rpo.load_balance, &lb_upstream_num) .opts(&rpo.upstream_options) .build() .unwrap();