diff --git a/src/data.rs b/src/data.rs index 225c57f..4c0af91 100644 --- a/src/data.rs +++ b/src/data.rs @@ -8,6 +8,7 @@ use std::sync::{Arc, Mutex}; use crate::{HandlerCallbacks, InnerHandlers}; +#[derive(Debug)] enum FullDuplexTcpState { DownStreamRead, DownStreamWrite(Vec), @@ -17,6 +18,7 @@ enum FullDuplexTcpState { UpStreamShutDown, } +#[derive(Debug)] enum DataPipe { DataWrite(Vec), Finished, @@ -58,7 +60,10 @@ impl DownStreamInner { let _ = stream_lock.flush(); drop(stream_lock); - let _ = data_out.send(FullDuplexTcpState::DownStreamRead); + if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamRead) { + println!("[SSLRelay Error]: Failed to send DownStreamRead ready notifier to main thread: {}", e); + return; + } }, DataPipe::Shutdown => { let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both); @@ -72,6 +77,7 @@ impl DownStreamInner { mpsc::RecvTimeoutError::Timeout => {}, mpsc::RecvTimeoutError::Disconnected => { println!("[!] DownStream data_in channel is disconnected!"); + return; } } } @@ -81,17 +87,38 @@ impl DownStreamInner { if let Some(byte_count) = self.get_data_stream() { if byte_count > 0 { - let _ = data_out.send(FullDuplexTcpState::UpStreamWrite(self.internal_data_buffer.clone())); - if let DataPipe::Finished = data_in.recv().unwrap() { - self.internal_data_buffer.clear(); - continue; - } else { - println!("[!] Could not receive DataPipe::Finished notifier!"); + if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamWrite(self.internal_data_buffer.clone())) { + println!("[SSLRelay Error]: Failed to send UpStreamWrite to main thread: {}", e); + return; + } + + // Possible race condition here if both DownStream and UpStream recieve data at same time!!! + + match data_in.recv() { + Ok(data) => { + match data { + DataPipe::Finished => { + self.internal_data_buffer.clear(); + continue; + }, + _ => { + println!("[SSLRelay DownStream Thread Error]: Got unexpected data from data_in: Expected [Finished] but got {:?}", data); + return; + } + } + }, + Err(e) => { + println!("[SSLRelay DownStream Thread Error]: Failed to receive data from data_in: {}", e); + return; + } } } else if byte_count == 0 || byte_count == -2 { - let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown); + if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) { + println!("[SSLRelay Error]: Failed to send shutdown signal to main thread from DownStream thread: {}", e); + return; + } let _ = self.ds_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both); return; } else if byte_count == -1 { @@ -195,10 +222,13 @@ impl UpStreamInner { let _ = stream_lock.flush(); drop(stream_lock); - let _ = data_out.send(FullDuplexTcpState::UpStreamRead); + if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamRead) { + println!("[SSLRelay Error]: Failed to send UpStreamRead ready notifier to main thread: {}", e); + return; + } }, DataPipe::Shutdown => { - let _ = self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both); + self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both).unwrap(); return; } _ => {} @@ -209,6 +239,7 @@ impl UpStreamInner { mpsc::RecvTimeoutError::Timeout => {}, mpsc::RecvTimeoutError::Disconnected => { println!("[!] UpStream data_in channel is disconnected!"); + return; } } } @@ -217,17 +248,36 @@ impl UpStreamInner { if let Some(byte_count) = self.get_data_stream() { if byte_count > 0 { - let _ = data_out.send(FullDuplexTcpState::DownStreamWrite(self.internal_data_buffer.clone())); - if let DataPipe::Finished = data_in.recv().unwrap() { - self.internal_data_buffer.clear(); - continue; - } else { - println!("[!] Could not receive DataPipe::Finished notifier!"); + if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamWrite(self.internal_data_buffer.clone())) { + println!("[SSLRelay Error]: Failed to send DownStreamWrite to main thread: {}", e); + return; + } + + match data_in.recv() { + Ok(data) => { + match data { + DataPipe::Finished => { + self.internal_data_buffer.clear(); + continue; + }, + _ => { + println!("[SSLRelay UpStream Thread Error]: Got unexpected data from data_in: Expected [Finished] but got {:?}", data); + return; + } + } + }, + Err(e) => { + println!("[SSLRelay UpStream Thread Error]: Failed to receive data from data_in: {}", e); + return; + } } } else if byte_count == 0 || byte_count == -2 { - let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown); + if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) { + println!("[SSLRelay Error]: Failed to send shutdown signal to main thread from UpStream thread: {}", e); + return; + } let _ = self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both); return; } else if byte_count == -1 { @@ -371,13 +421,33 @@ impl {}, + Err(e) => { + println!("[SSLRelay Error]: Failed to send data write to DownStream thread: {}", e); + return; + } + } - match state_receiver.recv().unwrap() { - FullDuplexTcpState::DownStreamRead => { - let _ = us_data_pipe_sender.send(DataPipe::Finished); + match state_receiver.recv() { + Ok(data) => { + match data { + FullDuplexTcpState::DownStreamRead => { + if let Err(e) = us_data_pipe_sender.send(DataPipe::Finished) { + println!("[SSLRelay Error]: Failed to send Finished notifier to UpStream thread: {}", e); + return; + } + }, + _ => { + println!("[SSLRelay Error]: Got unexpected data from state_receiver: Expected [DownStreamRead] but got {:?}", data); + return; + } + } }, - _ => {} + Err(e) => { + println!("[SSLRelay Error]: Failed to receive DownStream read from state_receiver: {}", e); + return; + } } }, // UpStream Write Request @@ -395,23 +465,52 @@ impl { - let _ = ds_data_pipe_sender.send(DataPipe::Finished); + match us_data_pipe_sender.send(DataPipe::DataWrite(data)) { + Ok(()) => {}, + Err(e) => { + println!("[SSLRelay Error]: Failed to send data write to UpStream thread: {}", e); + return; + } + } + + match state_receiver.recv() { + Ok(data) => { + match data { + FullDuplexTcpState::UpStreamRead => { + if let Err(e) = ds_data_pipe_sender.send(DataPipe::Finished) { + println!("[SSLRelay Error]: Failed to send Finished notifier to DownStream thread: {}", e); + return; + } + }, + _ => { + println!("[SSLRelay Error]: Got unexpected data from state_receiver: Expected [UpStreamRead] but got {:?}", data); + return; + } + } }, - _ => {} + Err(e) => { + println!("[SSLRelay Error]: Failed to receive UpStream read from state_receiver: {}", e); + return; + } } }, // DownStreamShutDown Request FullDuplexTcpState::DownStreamShutDown => { - let _ = us_data_pipe_sender.send(DataPipe::Shutdown); + + if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) { + println!("[SSLRelay Error]: Failed to send Shutdown signal to UpStream thread: {}", e); + return; + } return; }, // UpStreamShutDown Request FullDuplexTcpState::UpStreamShutDown => { - let _ = ds_data_pipe_sender.send(DataPipe::Shutdown); + + if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) { + println!("[SSLRelay Error]: Failed to send Shutdown signal to DownStream thread: {}", e); + return; + } return; }, _ => {} @@ -419,6 +518,7 @@ impl { println!("[!] State receiver communication channel has closed!"); + return; } } }