Race condition fixed and performance greatly improved. More thread shutdown handling as well.

This commit is contained in:
PinkP4nther 2021-09-14 11:15:44 -07:00
commit de35c08906

View file

@ -10,9 +10,7 @@ use crate::{HandlerCallbacks, InnerHandlers};
#[derive(Debug)] #[derive(Debug)]
enum FullDuplexTcpState { enum FullDuplexTcpState {
DownStreamRead,
DownStreamWrite(Vec<u8>), DownStreamWrite(Vec<u8>),
UpStreamRead,
UpStreamWrite(Vec<u8>), UpStreamWrite(Vec<u8>),
DownStreamShutDown, DownStreamShutDown,
UpStreamShutDown, UpStreamShutDown,
@ -21,7 +19,6 @@ enum FullDuplexTcpState {
#[derive(Debug)] #[derive(Debug)]
enum DataPipe { enum DataPipe {
DataWrite(Vec<u8>), DataWrite(Vec<u8>),
Finished,
Shutdown, Shutdown,
} }
@ -47,6 +44,11 @@ impl DownStreamInner {
Ok(sl) => sl, Ok(sl) => sl,
Err(_e) => { Err(_e) => {
println!("[!] Failed to get stream lock!"); println!("[!] Failed to get stream lock!");
if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) {
println!("[SSLRelay DownStream Thread Error]: Failed to send shutdown signal to main thread from DownStream thread: {}", e);
return;
}
let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
return; return;
} }
}; };
@ -55,21 +57,21 @@ impl DownStreamInner {
Ok(()) => {}, Ok(()) => {},
Err(_e) => { Err(_e) => {
println!("[!] Failed to write data to DownStream tcp stream!"); println!("[!] Failed to write data to DownStream tcp stream!");
if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) {
println!("[SSLRelay DownStream Thread Error]: Failed to send shutdown signal to main thread from DownStream thread: {}", e);
return;
}
let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
} }
} }
let _ = stream_lock.flush(); let _ = stream_lock.flush();
drop(stream_lock); drop(stream_lock);
if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamRead) {
println!("[SSLRelay DownStream Thread Error]: Failed to send DownStreamRead ready notifier to main thread: {}", e);
return;
}
}, },
DataPipe::Shutdown => { DataPipe::Shutdown => {
let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both); let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
return; return;
}, },
_ => {}
} }
}, },
Err(_e) => { Err(_e) => {
@ -92,26 +94,7 @@ impl DownStreamInner {
return; return;
} }
// Possible race condition here if both DownStream and UpStream recieve data at same time!!! self.internal_data_buffer.clear();
match data_in.recv() {
Ok(data) => {
match data {
DataPipe::Finished => {
self.internal_data_buffer.clear();
continue;
},
_ => {
println!("[SSLRelay DownStream Thread Error]: Got unexpected data from data_in: Expected [Finished] but got {:?}", data);
return;
}
}
},
Err(e) => {
println!("[SSLRelay DownStream Thread Error]: Failed to receive data from data_in: {}", e);
return;
}
}
} else if byte_count == 0 || byte_count == -2 { } else if byte_count == 0 || byte_count == -2 {
@ -209,6 +192,11 @@ impl UpStreamInner {
Ok(sl) => sl, Ok(sl) => sl,
Err(_e) => { Err(_e) => {
println!("[!] Failed to get stream lock!"); println!("[!] Failed to get stream lock!");
if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) {
println!("[SSLRelay UpStream Thread Error]: Failed to send shutdown signal to main thread from UpStream thread: {}", e);
return;
}
let _ = self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
return; return;
} }
}; };
@ -216,22 +204,22 @@ impl UpStreamInner {
match stream_lock.write_all(&data) { match stream_lock.write_all(&data) {
Ok(()) => {}, Ok(()) => {},
Err(_e) => { Err(_e) => {
println!("[!] Failed to write data to DownStream tcp stream!"); println!("[!] Failed to write data to UpStream tcp stream!");
if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) {
println!("[SSLRelay UpStream Thread Error]: Failed to send shutdown signal to main thread from UpStream thread: {}", e);
return;
}
let _ = self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
} }
} }
let _ = stream_lock.flush(); let _ = stream_lock.flush();
drop(stream_lock); drop(stream_lock);
if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamRead) {
println!("[SSLRelay UpStream Thread Error]: Failed to send UpStreamRead ready notifier to main thread: {}", e);
return;
}
}, },
DataPipe::Shutdown => { DataPipe::Shutdown => {
self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both).unwrap(); self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both).unwrap();
return; return;
} }
_ => {}
} }
}, },
Err(e) => { Err(e) => {
@ -253,24 +241,7 @@ impl UpStreamInner {
return; return;
} }
match data_in.recv() { self.internal_data_buffer.clear();
Ok(data) => {
match data {
DataPipe::Finished => {
self.internal_data_buffer.clear();
continue;
},
_ => {
println!("[SSLRelay UpStream Thread Error]: Got unexpected data from data_in: Expected [Finished] but got {:?}", data);
return;
}
}
},
Err(e) => {
println!("[SSLRelay UpStream Thread Error]: Failed to receive data from data_in: {}", e);
return;
}
}
} else if byte_count == 0 || byte_count == -2 { } else if byte_count == 0 || byte_count == -2 {
@ -428,27 +399,6 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
return; return;
} }
} }
match state_receiver.recv() {
Ok(data) => {
match data {
FullDuplexTcpState::DownStreamRead => {
if let Err(e) = us_data_pipe_sender.send(DataPipe::Finished) {
println!("[SSLRelay Error]: Failed to send Finished notifier to UpStream thread: {}", e);
return;
}
},
_ => {
println!("[SSLRelay Error]: Got unexpected data from state_receiver: Expected [DownStreamRead] but got {:?}", data);
return;
}
}
},
Err(e) => {
println!("[SSLRelay Error]: Failed to receive DownStream read from state_receiver: {}", e);
return;
}
}
}, },
// UpStream Write Request // UpStream Write Request
FullDuplexTcpState::UpStreamWrite(mut data) => { FullDuplexTcpState::UpStreamWrite(mut data) => {
@ -473,27 +423,6 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
return; return;
} }
} }
match state_receiver.recv() {
Ok(data) => {
match data {
FullDuplexTcpState::UpStreamRead => {
if let Err(e) = ds_data_pipe_sender.send(DataPipe::Finished) {
println!("[SSLRelay Error]: Failed to send Finished notifier to DownStream thread: {}", e);
return;
}
},
_ => {
println!("[SSLRelay Error]: Got unexpected data from state_receiver: Expected [UpStreamRead] but got {:?}", data);
return;
}
}
},
Err(e) => {
println!("[SSLRelay Error]: Failed to receive UpStream read from state_receiver: {}", e);
return;
}
}
}, },
// DownStreamShutDown Request // DownStreamShutDown Request
FullDuplexTcpState::DownStreamShutDown => { FullDuplexTcpState::DownStreamShutDown => {
@ -513,7 +442,6 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
} }
return; return;
}, },
_ => {}
} }
}, },
Err(_e) => { Err(_e) => {