split proxy into stream handler and http message handler

This commit is contained in:
Jun Kurihara 2022-07-10 03:11:46 +09:00
commit 954a1993a9
No known key found for this signature in database
GPG key ID: 48ADFD173ED22B03
14 changed files with 92 additions and 71 deletions

View file

@ -2,24 +2,29 @@
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
mod backend;
mod backend_opt;
mod config;
mod constants;
mod error;
mod globals;
mod log;
mod msg_handler;
mod proxy;
mod utils;
use crate::{
backend::{Backend, Backends, ServerNameLC},
config::parse_opts,
constants::*,
error::*,
globals::*,
log::*,
proxy::{Backend, Backends, Proxy, ServerNameLC},
proxy::Proxy,
};
use futures::future::select_all;
use hyper::Client;
use msg_handler::HttpMessageHandler;
// use hyper_trust_dns::TrustDnsResolver;
use rustc_hash::FxHashMap as HashMap;
use std::{io::Write, sync::Arc};
@ -61,24 +66,22 @@ fn main() {
max_concurrent_streams: MAX_CONCURRENT_STREAMS,
keepalive: true,
runtime_handle: runtime.handle().clone(),
backends: Backends {
default_server_name: None,
apps: HashMap::<ServerNameLC, Backend>::default(),
},
};
let mut backends = Backends {
default_server_name: None,
apps: HashMap::<ServerNameLC, Backend>::default(),
};
parse_opts(&mut globals).expect("Invalid configuration");
parse_opts(&mut globals, &mut backends).expect("Invalid configuration");
entrypoint(Arc::new(globals), Arc::new(backends))
.await
.unwrap()
entrypoint(Arc::new(globals)).await.unwrap()
});
warn!("Exit the program");
}
// entrypoint creates and spawns tasks of proxy services
async fn entrypoint(globals: Arc<Globals>, backends: Arc<Backends>) -> Result<()> {
async fn entrypoint(globals: Arc<Globals>) -> Result<()> {
// let connector = TrustDnsResolver::default().into_rustls_webpki_https_connector();
let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_webpki_roots()
@ -86,7 +89,10 @@ async fn entrypoint(globals: Arc<Globals>, backends: Arc<Backends>) -> Result<()
.enable_http1()
.enable_http2()
.build();
let forwarder = Arc::new(Client::builder().build::<_, hyper::Body>(connector));
let msg_handler = HttpMessageHandler {
forwarder: Arc::new(Client::builder().build::<_, hyper::Body>(connector)),
globals: globals.clone(),
};
let addresses = globals.listen_sockets.clone();
let futures = select_all(addresses.into_iter().map(|addr| {
@ -99,8 +105,7 @@ async fn entrypoint(globals: Arc<Globals>, backends: Arc<Backends>) -> Result<()
globals: globals.clone(),
listening_on: addr,
tls_enabled,
backends: backends.clone(),
forwarder: forwarder.clone(),
msg_handler: msg_handler.clone(),
};
globals.runtime_handle.spawn(proxy.start())
}));