Timeout, reduce log
This commit is contained in:
parent
5656af0781
commit
b51a09795f
6 changed files with 137 additions and 669 deletions
|
|
@ -127,7 +127,7 @@ pub async fn play(
|
|||
for (direction, data) in records {
|
||||
match direction {
|
||||
Direction::ClientToServer => {
|
||||
println!("[CLT] ({id}) >> {}", data.len());
|
||||
//println!("[CLT] ({id}) >> {}", data.len());
|
||||
//stream.get_mut().write_all(data).await.unwrap();
|
||||
match tokio::time::timeout(
|
||||
std::time::Duration::from_millis(1000),
|
||||
|
|
@ -140,7 +140,7 @@ pub async fn play(
|
|||
}
|
||||
}
|
||||
Direction::ServerToClient => {
|
||||
println!("[CLT] ({id}) << {}", data.len());
|
||||
//println!("[CLT] ({id}) << {}", data.len());
|
||||
// let mut buf = Vec::new();
|
||||
// stream.read_buf(&mut buf).await.ok();
|
||||
//let mut buf = vec![0; data.len().saturating_sub(50).max(1)];
|
||||
|
|
@ -250,7 +250,7 @@ pub async fn play(
|
|||
for (direction, data) in records {
|
||||
match direction {
|
||||
Direction::ClientToServer => {
|
||||
println!("[CLT] ({id}) >> {}", data.len());
|
||||
//println!("[CLT] ({id}) >> {}", data.len());
|
||||
//stream.get_mut().write_all(data).await.unwrap();
|
||||
match tokio::time::timeout(
|
||||
std::time::Duration::from_millis(1000),
|
||||
|
|
@ -263,7 +263,7 @@ pub async fn play(
|
|||
}
|
||||
}
|
||||
Direction::ServerToClient => {
|
||||
println!("[CLT] ({id}) << {}", data.len());
|
||||
//println!("[CLT] ({id}) << {}", data.len());
|
||||
//let mut buf = Vec::new();
|
||||
//stream.read_buf(&mut buf).await.ok();
|
||||
//let mut buf = vec![0; data.len().saturating_sub(50).max(1)];
|
||||
|
|
@ -307,9 +307,16 @@ pub async fn play(
|
|||
}
|
||||
}
|
||||
tokio::spawn(async move {
|
||||
let mut last_count = 0;
|
||||
loop {
|
||||
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
|
||||
tokio::time::sleep(std::time::Duration::from_secs(2)).await;
|
||||
println!("Running: {:?}", running.lock().await);
|
||||
let new_count = counter.load(std::sync::atomic::Ordering::Relaxed);
|
||||
if new_count == last_count {
|
||||
println!("Stalled at {} / {}, stopping", new_count, total);
|
||||
std::process::exit(0);
|
||||
}
|
||||
last_count = new_count;
|
||||
}
|
||||
});
|
||||
for handle in handles {
|
||||
|
|
|
|||
129
src/main.rs
129
src/main.rs
|
|
@ -8,6 +8,7 @@ use record::Records;
|
|||
use argp::FromArgs;
|
||||
use static_cell::StaticCell;
|
||||
use tokio::sync::oneshot;
|
||||
use tokio_rustls::rustls::crypto::CryptoProvider;
|
||||
|
||||
/// Play recorded requests and responses
|
||||
#[derive(FromArgs)]
|
||||
|
|
@ -55,6 +56,9 @@ struct OptPlay {
|
|||
/// Only play this record
|
||||
#[argp(option)]
|
||||
record: Option<u64>,
|
||||
/// Only run these parts
|
||||
#[argp(option, default = "String::from(\"both\")")]
|
||||
run: String,
|
||||
}
|
||||
|
||||
/// Print records
|
||||
|
|
@ -71,6 +75,13 @@ struct OptPrint {
|
|||
#[argp(subcommand, name = "record")]
|
||||
struct OptRecord {}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum RunMode {
|
||||
Client,
|
||||
Server,
|
||||
Both,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
|
||||
enum TlsMode {
|
||||
None,
|
||||
|
|
@ -94,29 +105,115 @@ async fn main() {
|
|||
"both" => TlsMode::Both,
|
||||
_ => panic!("TLS mode must be one of none,client,server,both."),
|
||||
};
|
||||
let run_mode = match subopt.run.as_str() {
|
||||
"client" => RunMode::Client,
|
||||
"server" => RunMode::Server,
|
||||
"both" => RunMode::Both,
|
||||
_ => panic!("run mode must be one of client,server,both."),
|
||||
};
|
||||
let records = RECORDS.init(record::read_record_file(&opt.record_file));
|
||||
|
||||
if let Some(only_record) = subopt.record {
|
||||
records.retain(|id, _| *id == only_record);
|
||||
}
|
||||
|
||||
let mut ciphers: Option<Vec<String>> = None;
|
||||
let mut kexes: Option<Vec<String>> = None;
|
||||
for (var, val) in std::env::vars() {
|
||||
match var.as_str() {
|
||||
"CIPHERS" => ciphers = Some(val.split(',').map(str::to_string).collect()),
|
||||
"KEXES" => kexes = Some(val.split(',').map(str::to_string).collect()),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
let mut prov = tokio_rustls::rustls::crypto::aws_lc_rs::default_provider();
|
||||
if let Some(ciphers) = ciphers {
|
||||
prov.cipher_suites.clear();
|
||||
for cipher in ciphers {
|
||||
match cipher.as_str() {
|
||||
"AES_256_GCM_SHA384" => prov
|
||||
.cipher_suites
|
||||
.push(tokio_rustls::rustls::crypto::aws_lc_rs::cipher_suite::TLS13_AES_256_GCM_SHA384),
|
||||
"AES_128_GCM_SHA256" => prov
|
||||
.cipher_suites
|
||||
.push(tokio_rustls::rustls::crypto::aws_lc_rs::cipher_suite::TLS13_AES_128_GCM_SHA256),
|
||||
"CHACHA20_POLY1305_SHA256" => prov
|
||||
.cipher_suites
|
||||
.push(tokio_rustls::rustls::crypto::aws_lc_rs::cipher_suite::TLS13_CHACHA20_POLY1305_SHA256),
|
||||
"ECDHE_ECDSA_WITH_AES_256_GCM_SHA384" => prov
|
||||
.cipher_suites
|
||||
.push(tokio_rustls::rustls::crypto::aws_lc_rs::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384),
|
||||
"ECDHE_ECDSA_WITH_AES_128_GCM_SHA256" => prov
|
||||
.cipher_suites
|
||||
.push(tokio_rustls::rustls::crypto::aws_lc_rs::cipher_suite::TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256),
|
||||
"ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256" => prov
|
||||
.cipher_suites
|
||||
.push(tokio_rustls::rustls::crypto::aws_lc_rs::cipher_suite::TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256),
|
||||
"ECDHE_RSA_WITH_AES_256_GCM_SHA384" => prov
|
||||
.cipher_suites
|
||||
.push(tokio_rustls::rustls::crypto::aws_lc_rs::cipher_suite::TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384),
|
||||
"ECDHE_RSA_WITH_AES_128_GCM_SHA256" => prov
|
||||
.cipher_suites
|
||||
.push(tokio_rustls::rustls::crypto::aws_lc_rs::cipher_suite::TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256),
|
||||
"ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256" => prov
|
||||
.cipher_suites
|
||||
.push(tokio_rustls::rustls::crypto::aws_lc_rs::cipher_suite::TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256),
|
||||
other => {
|
||||
println!("Unknown cipher `{other}`")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(kexes) = kexes {
|
||||
prov.kx_groups.clear();
|
||||
for kex in kexes {
|
||||
match kex.as_str() {
|
||||
"X25519" => prov
|
||||
.kx_groups
|
||||
.push(tokio_rustls::rustls::crypto::aws_lc_rs::kx_group::X25519),
|
||||
"SECP256R1" => prov
|
||||
.kx_groups
|
||||
.push(tokio_rustls::rustls::crypto::aws_lc_rs::kx_group::SECP256R1),
|
||||
"SECP384R1" => prov
|
||||
.kx_groups
|
||||
.push(tokio_rustls::rustls::crypto::aws_lc_rs::kx_group::SECP384R1),
|
||||
other => {
|
||||
println!("Unknown kex `{other}`")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
CryptoProvider::install_default(prov).unwrap();
|
||||
|
||||
let (sync_sender, sync_receiver) = oneshot::channel();
|
||||
console_subscriber::init();
|
||||
let client = tokio::spawn(client::play(
|
||||
records,
|
||||
tls_mode,
|
||||
(subopt.forward_addr, subopt.forward_port),
|
||||
sync_receiver,
|
||||
subopt.repeat,
|
||||
));
|
||||
server::play(
|
||||
records,
|
||||
tls_mode,
|
||||
&subopt.certs,
|
||||
("0.0.0.0", subopt.listen_port),
|
||||
sync_sender,
|
||||
)
|
||||
.await;
|
||||
//console_subscriber::init();
|
||||
let client = tokio::spawn({
|
||||
let records = &*records;
|
||||
async move {
|
||||
if run_mode == RunMode::Both || run_mode == RunMode::Client {
|
||||
client::play(
|
||||
records,
|
||||
tls_mode,
|
||||
(subopt.forward_addr, subopt.forward_port),
|
||||
sync_receiver,
|
||||
subopt.repeat,
|
||||
)
|
||||
.await;
|
||||
} else {
|
||||
std::future::pending().await
|
||||
}
|
||||
}
|
||||
});
|
||||
if run_mode == RunMode::Both || run_mode == RunMode::Server {
|
||||
server::play(
|
||||
records,
|
||||
tls_mode,
|
||||
&subopt.certs,
|
||||
("0.0.0.0", subopt.listen_port),
|
||||
sync_sender,
|
||||
)
|
||||
.await;
|
||||
}
|
||||
client.await.unwrap();
|
||||
}
|
||||
Subcommand::Print(subopt) => {
|
||||
|
|
|
|||
|
|
@ -212,15 +212,18 @@ pub fn print_records(records: &Records, print_packets: bool) {
|
|||
}
|
||||
}
|
||||
if print_packets {
|
||||
let data = if data.len() >= 256 && *direction == Direction::ServerToClient {
|
||||
let data_tr = if data.len() >= 256 && *direction == Direction::ServerToClient {
|
||||
&data[0..256]
|
||||
} else {
|
||||
data.as_slice()
|
||||
};
|
||||
if let Ok(data) = str::from_utf8(data) {
|
||||
println!(" {data:?}")
|
||||
if let Ok(data_tr) = str::from_utf8(data_tr) {
|
||||
println!(" {data_tr:?}")
|
||||
} else {
|
||||
println!(" {data:?}")
|
||||
println!(" {data_tr:?}")
|
||||
}
|
||||
if let Some(header_end) = memchr::memmem::find(data, b"\r\n\r\n") {
|
||||
println!(" --> body len: {}", data.len() - header_end - 4);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -209,7 +209,7 @@ pub async fn play(
|
|||
.get(&(server_name.as_bytes().to_vec(), hash.clone()))
|
||||
.unwrap();
|
||||
for &res in responses {
|
||||
println!("[SRV] response for ({}): {} bytes", id, res.len());
|
||||
//println!("[SRV] response for ({}): {} bytes", id, res.len());
|
||||
stream.write_all(res).await.unwrap();
|
||||
stream.flush().await.unwrap();
|
||||
}
|
||||
|
|
@ -267,7 +267,7 @@ pub async fn play(
|
|||
stream.shutdown().await.unwrap();
|
||||
};*/
|
||||
let fut = async move {
|
||||
println!("[SRV] New task");
|
||||
//println!("[SRV] New task");
|
||||
let mut stream = Framed::new(stream, crate::http::HttpCodec {});
|
||||
let req = stream.next().await.unwrap().unwrap();
|
||||
let req_hash = tlsh::hash_buf(&req)
|
||||
|
|
@ -291,7 +291,7 @@ pub async fn play(
|
|||
.get(&(server_name.clone(), hash.clone()))
|
||||
.unwrap();
|
||||
for &res in responses {
|
||||
println!("[SRV] response for ({}): {} bytes", id, res.len());
|
||||
//println!("[SRV] response for ({}): {} bytes", id, res.len());
|
||||
stream.write_all(res).await.unwrap();
|
||||
stream.flush().await.unwrap();
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue