Added more error handling and possible bug fix for when one a DS or US stream thread is attempting to recv a DataPipe state but the channel has been closed (Other threads shutdown)
This commit is contained in:
parent
fdbd791e85
commit
ac2b515905
1 changed files with 129 additions and 29 deletions
158
src/data.rs
158
src/data.rs
|
|
@ -8,6 +8,7 @@ use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use crate::{HandlerCallbacks, InnerHandlers};
|
use crate::{HandlerCallbacks, InnerHandlers};
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
enum FullDuplexTcpState {
|
enum FullDuplexTcpState {
|
||||||
DownStreamRead,
|
DownStreamRead,
|
||||||
DownStreamWrite(Vec<u8>),
|
DownStreamWrite(Vec<u8>),
|
||||||
|
|
@ -17,6 +18,7 @@ enum FullDuplexTcpState {
|
||||||
UpStreamShutDown,
|
UpStreamShutDown,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
enum DataPipe {
|
enum DataPipe {
|
||||||
DataWrite(Vec<u8>),
|
DataWrite(Vec<u8>),
|
||||||
Finished,
|
Finished,
|
||||||
|
|
@ -58,7 +60,10 @@ impl DownStreamInner {
|
||||||
let _ = stream_lock.flush();
|
let _ = stream_lock.flush();
|
||||||
drop(stream_lock);
|
drop(stream_lock);
|
||||||
|
|
||||||
let _ = data_out.send(FullDuplexTcpState::DownStreamRead);
|
if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamRead) {
|
||||||
|
println!("[SSLRelay Error]: Failed to send DownStreamRead ready notifier to main thread: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
DataPipe::Shutdown => {
|
DataPipe::Shutdown => {
|
||||||
let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
||||||
|
|
@ -72,6 +77,7 @@ impl DownStreamInner {
|
||||||
mpsc::RecvTimeoutError::Timeout => {},
|
mpsc::RecvTimeoutError::Timeout => {},
|
||||||
mpsc::RecvTimeoutError::Disconnected => {
|
mpsc::RecvTimeoutError::Disconnected => {
|
||||||
println!("[!] DownStream data_in channel is disconnected!");
|
println!("[!] DownStream data_in channel is disconnected!");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -81,17 +87,38 @@ impl DownStreamInner {
|
||||||
if let Some(byte_count) = self.get_data_stream() {
|
if let Some(byte_count) = self.get_data_stream() {
|
||||||
if byte_count > 0 {
|
if byte_count > 0 {
|
||||||
|
|
||||||
let _ = data_out.send(FullDuplexTcpState::UpStreamWrite(self.internal_data_buffer.clone()));
|
if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamWrite(self.internal_data_buffer.clone())) {
|
||||||
if let DataPipe::Finished = data_in.recv().unwrap() {
|
println!("[SSLRelay Error]: Failed to send UpStreamWrite to main thread: {}", e);
|
||||||
self.internal_data_buffer.clear();
|
return;
|
||||||
continue;
|
}
|
||||||
} else {
|
|
||||||
println!("[!] Could not receive DataPipe::Finished notifier!");
|
// Possible race condition here if both DownStream and UpStream recieve data at same time!!!
|
||||||
|
|
||||||
|
match data_in.recv() {
|
||||||
|
Ok(data) => {
|
||||||
|
match data {
|
||||||
|
DataPipe::Finished => {
|
||||||
|
self.internal_data_buffer.clear();
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
println!("[SSLRelay DownStream Thread Error]: Got unexpected data from data_in: Expected [Finished] but got {:?}", data);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
println!("[SSLRelay DownStream Thread Error]: Failed to receive data from data_in: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if byte_count == 0 || byte_count == -2 {
|
} else if byte_count == 0 || byte_count == -2 {
|
||||||
|
|
||||||
let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown);
|
if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) {
|
||||||
|
println!("[SSLRelay Error]: Failed to send shutdown signal to main thread from DownStream thread: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
||||||
return;
|
return;
|
||||||
} else if byte_count == -1 {
|
} else if byte_count == -1 {
|
||||||
|
|
@ -195,10 +222,13 @@ impl UpStreamInner {
|
||||||
let _ = stream_lock.flush();
|
let _ = stream_lock.flush();
|
||||||
drop(stream_lock);
|
drop(stream_lock);
|
||||||
|
|
||||||
let _ = data_out.send(FullDuplexTcpState::UpStreamRead);
|
if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamRead) {
|
||||||
|
println!("[SSLRelay Error]: Failed to send UpStreamRead ready notifier to main thread: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
},
|
},
|
||||||
DataPipe::Shutdown => {
|
DataPipe::Shutdown => {
|
||||||
let _ = self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both).unwrap();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
_ => {}
|
_ => {}
|
||||||
|
|
@ -209,6 +239,7 @@ impl UpStreamInner {
|
||||||
mpsc::RecvTimeoutError::Timeout => {},
|
mpsc::RecvTimeoutError::Timeout => {},
|
||||||
mpsc::RecvTimeoutError::Disconnected => {
|
mpsc::RecvTimeoutError::Disconnected => {
|
||||||
println!("[!] UpStream data_in channel is disconnected!");
|
println!("[!] UpStream data_in channel is disconnected!");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -217,17 +248,36 @@ impl UpStreamInner {
|
||||||
if let Some(byte_count) = self.get_data_stream() {
|
if let Some(byte_count) = self.get_data_stream() {
|
||||||
if byte_count > 0 {
|
if byte_count > 0 {
|
||||||
|
|
||||||
let _ = data_out.send(FullDuplexTcpState::DownStreamWrite(self.internal_data_buffer.clone()));
|
if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamWrite(self.internal_data_buffer.clone())) {
|
||||||
if let DataPipe::Finished = data_in.recv().unwrap() {
|
println!("[SSLRelay Error]: Failed to send DownStreamWrite to main thread: {}", e);
|
||||||
self.internal_data_buffer.clear();
|
return;
|
||||||
continue;
|
}
|
||||||
} else {
|
|
||||||
println!("[!] Could not receive DataPipe::Finished notifier!");
|
match data_in.recv() {
|
||||||
|
Ok(data) => {
|
||||||
|
match data {
|
||||||
|
DataPipe::Finished => {
|
||||||
|
self.internal_data_buffer.clear();
|
||||||
|
continue;
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
println!("[SSLRelay UpStream Thread Error]: Got unexpected data from data_in: Expected [Finished] but got {:?}", data);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
println!("[SSLRelay UpStream Thread Error]: Failed to receive data from data_in: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if byte_count == 0 || byte_count == -2 {
|
} else if byte_count == 0 || byte_count == -2 {
|
||||||
|
|
||||||
let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown);
|
if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) {
|
||||||
|
println!("[SSLRelay Error]: Failed to send shutdown signal to main thread from UpStream thread: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
let _ = self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
let _ = self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both);
|
||||||
return;
|
return;
|
||||||
} else if byte_count == -1 {
|
} else if byte_count == -1 {
|
||||||
|
|
@ -371,13 +421,33 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
|
||||||
});
|
});
|
||||||
|
|
||||||
self.inner_handlers.cb.us_b_callback(&mut data);
|
self.inner_handlers.cb.us_b_callback(&mut data);
|
||||||
let _ = ds_data_pipe_sender.send(DataPipe::DataWrite(data));
|
match ds_data_pipe_sender.send(DataPipe::DataWrite(data)) {
|
||||||
|
Ok(()) => {},
|
||||||
|
Err(e) => {
|
||||||
|
println!("[SSLRelay Error]: Failed to send data write to DownStream thread: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
match state_receiver.recv().unwrap() {
|
match state_receiver.recv() {
|
||||||
FullDuplexTcpState::DownStreamRead => {
|
Ok(data) => {
|
||||||
let _ = us_data_pipe_sender.send(DataPipe::Finished);
|
match data {
|
||||||
|
FullDuplexTcpState::DownStreamRead => {
|
||||||
|
if let Err(e) = us_data_pipe_sender.send(DataPipe::Finished) {
|
||||||
|
println!("[SSLRelay Error]: Failed to send Finished notifier to UpStream thread: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
println!("[SSLRelay Error]: Got unexpected data from state_receiver: Expected [DownStreamRead] but got {:?}", data);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
_ => {}
|
Err(e) => {
|
||||||
|
println!("[SSLRelay Error]: Failed to receive DownStream read from state_receiver: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
// UpStream Write Request
|
// UpStream Write Request
|
||||||
|
|
@ -395,23 +465,52 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
|
||||||
});
|
});
|
||||||
|
|
||||||
self.inner_handlers.cb.ds_b_callback(&mut data);
|
self.inner_handlers.cb.ds_b_callback(&mut data);
|
||||||
let _ = us_data_pipe_sender.send(DataPipe::DataWrite(data));
|
|
||||||
|
|
||||||
match state_receiver.recv().unwrap() {
|
match us_data_pipe_sender.send(DataPipe::DataWrite(data)) {
|
||||||
FullDuplexTcpState::UpStreamRead => {
|
Ok(()) => {},
|
||||||
let _ = ds_data_pipe_sender.send(DataPipe::Finished);
|
Err(e) => {
|
||||||
|
println!("[SSLRelay Error]: Failed to send data write to UpStream thread: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
match state_receiver.recv() {
|
||||||
|
Ok(data) => {
|
||||||
|
match data {
|
||||||
|
FullDuplexTcpState::UpStreamRead => {
|
||||||
|
if let Err(e) = ds_data_pipe_sender.send(DataPipe::Finished) {
|
||||||
|
println!("[SSLRelay Error]: Failed to send Finished notifier to DownStream thread: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
},
|
||||||
|
_ => {
|
||||||
|
println!("[SSLRelay Error]: Got unexpected data from state_receiver: Expected [UpStreamRead] but got {:?}", data);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
},
|
},
|
||||||
_ => {}
|
Err(e) => {
|
||||||
|
println!("[SSLRelay Error]: Failed to receive UpStream read from state_receiver: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
// DownStreamShutDown Request
|
// DownStreamShutDown Request
|
||||||
FullDuplexTcpState::DownStreamShutDown => {
|
FullDuplexTcpState::DownStreamShutDown => {
|
||||||
let _ = us_data_pipe_sender.send(DataPipe::Shutdown);
|
|
||||||
|
if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) {
|
||||||
|
println!("[SSLRelay Error]: Failed to send Shutdown signal to UpStream thread: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
// UpStreamShutDown Request
|
// UpStreamShutDown Request
|
||||||
FullDuplexTcpState::UpStreamShutDown => {
|
FullDuplexTcpState::UpStreamShutDown => {
|
||||||
let _ = ds_data_pipe_sender.send(DataPipe::Shutdown);
|
|
||||||
|
if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) {
|
||||||
|
println!("[SSLRelay Error]: Failed to send Shutdown signal to DownStream thread: {}", e);
|
||||||
|
return;
|
||||||
|
}
|
||||||
return;
|
return;
|
||||||
},
|
},
|
||||||
_ => {}
|
_ => {}
|
||||||
|
|
@ -419,6 +518,7 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
|
||||||
},
|
},
|
||||||
Err(_e) => {
|
Err(_e) => {
|
||||||
println!("[!] State receiver communication channel has closed!");
|
println!("[!] State receiver communication channel has closed!");
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue