diff --git a/src/data.rs b/src/data.rs index 6ba5565..9e5a3fc 100644 --- a/src/data.rs +++ b/src/data.rs @@ -23,37 +23,13 @@ enum DataPipe { Shutdown, } -/* -trait DataStream {} -impl DataStream for S {} -*/ pub enum DataStreamType { RAW(TcpStream), TLS(SslStream), } -//impl Read for DataStreamType {} -//impl Write for DataStreamType {} -/* -impl DataStreamType { - fn shutdown(&self) { - match self { - &Self::RAW(r) => { - r.shutdown(Shutdown::Both); - return; - }, - &Self::TLS(t) => { - t.shutdown(); - return; - }, - } - } -} -*/ struct DownStreamInner { - //ds_stream: Option>>>, - //ds_stream: Arc>, ds_stream: DataStreamType, internal_data_buffer: Vec, } @@ -62,7 +38,6 @@ impl DownStreamInner { fn handle_error(error_description: &str) { println!("[SSLRelay DownStream Thread Error]: {}", error_description); - //self.ds_stream.as_ref().shutdown(); } pub fn ds_handler(self, data_out: Sender, data_in: Receiver) { @@ -89,26 +64,13 @@ impl DownStreamInner { match data_received { DataPipe::DataWrite(data) => { -/* - let mut stream_lock = match self.ds_stream.as_ref().unwrap().lock() { - Ok(sl) => sl, - Err(_e) => { - self.handle_error("Failed to get stream lock!"); - if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) { - self.handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); - } - return; - } - }; -*/ + match raw_stream.write_all(&data) { Ok(()) => {}, Err(_e) => { Self::handle_error("Failed to write data to DownStream tcp stream!"); - if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) { - Self::handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); - let _ = raw_stream.shutdown(Shutdown::Both); - } + let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown); + let _ = raw_stream.shutdown(Shutdown::Both); return; } } @@ -145,9 +107,8 @@ impl DownStreamInner { } else if byte_count == 0 || byte_count == -2 { - if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) { - Self::handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); - } + let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown); + let _ = raw_stream.shutdown(Shutdown::Both); return; } else if byte_count == -1 { @@ -175,25 +136,13 @@ impl DownStreamInner { match data_received { DataPipe::DataWrite(data) => { -/* - let mut stream_lock = match self.ds_stream.as_ref().unwrap().lock() { - Ok(sl) => sl, - Err(_e) => { - self.handle_error("Failed to get stream lock!"); - if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) { - self.handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); - } - return; - } - }; -*/ + match tls_stream.write_all(&data) { Ok(()) => {}, Err(_e) => { Self::handle_error("Failed to write data to DownStream tcp stream!"); - if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) { - Self::handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); - } + let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown); + let _ = tls_stream.shutdown(); return; } } @@ -222,6 +171,7 @@ impl DownStreamInner { if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamWrite(self.internal_data_buffer.clone())) { Self::handle_error(format!("Failed to send UpStreamWrite to main thread: {}", e).as_str()); + let _ = tls_stream.shutdown(); return; } @@ -229,9 +179,8 @@ impl DownStreamInner { } else if byte_count == 0 || byte_count == -2 { - if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) { - Self::handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); - } + let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown); + let _ = tls_stream.shutdown(); return; } else if byte_count == -1 { @@ -247,8 +196,6 @@ impl DownStreamInner { let mut data_length: i64 = 0; - //let mut stream_lock = self.ds_stream.as_mut().unwrap().lock().unwrap(); - loop { let mut r_buf = [0; 1024]; @@ -303,7 +250,6 @@ impl DownStreamInner { struct UpStreamInner { - //us_stream: Option>>, us_stream: DataStreamType, internal_data_buffer: Vec } @@ -312,7 +258,6 @@ impl UpStreamInner { fn handle_error(error_description: &str) { println!("[SSLRelay UpStream Thread Error]: {}", error_description); - //let _ = self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both); } pub fn us_handler(self, data_out: Sender, data_in: Receiver) { @@ -339,27 +284,13 @@ impl UpStreamInner { match data_received { DataPipe::DataWrite(data) => { -/* - let mut stream_lock = match self.us_stream.as_ref().unwrap().lock() { - Ok(sl) => sl, - Err(_e) => { - self.handle_error("Failed to get stream lock!"); - if let Err(_e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) { - //println!("[SSLRelay UpStream Thread Error]: Failed to send shutdown signal to main thread from UpStream thread: {}", e); - //return; - } - return; - } - }; -*/ + match raw_stream.write_all(&data) { Ok(()) => {}, Err(_e) => { - println!("Failed to write data to UpStream tcp stream!"); - if let Err(_e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) { - //println!("[SSLRelay UpStream Thread Error]: Failed to send shutdown signal to main thread from UpStream thread: {}", e); - //return; - } + Self::handle_error("Failed to write data to UpStream tcp stream!"); + let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown); + let _ = raw_stream.shutdown(Shutdown::Both); return; } } @@ -394,9 +325,8 @@ impl UpStreamInner { } else if byte_count == 0 || byte_count == -2 { - if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) { - Self::handle_error(format!("Failed to send shutdown signal to main thread from UpStream thread: {}", e).as_str()); - } + let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown); + let _ = raw_stream.shutdown(Shutdown::Both); return; } else if byte_count == -1 { continue; @@ -421,27 +351,13 @@ impl UpStreamInner { match data_received { DataPipe::DataWrite(data) => { -/* - let mut stream_lock = match self.us_stream.as_ref().unwrap().lock() { - Ok(sl) => sl, - Err(_e) => { - self.handle_error("Failed to get stream lock!"); - if let Err(_e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) { - //println!("[SSLRelay UpStream Thread Error]: Failed to send shutdown signal to main thread from UpStream thread: {}", e); - //return; - } - return; - } - }; -*/ + match tls_stream.write_all(&data) { Ok(()) => {}, Err(_e) => { - println!("Failed to write data to UpStream tcp stream!"); - if let Err(_e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) { - //println!("[SSLRelay UpStream Thread Error]: Failed to send shutdown signal to main thread from UpStream thread: {}", e); - //return; - } + Self::handle_error("Failed to write data to UpStream tcp stream!"); + let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown); + let _ = tls_stream.shutdown(); return; } } @@ -476,9 +392,8 @@ impl UpStreamInner { } else if byte_count == 0 || byte_count == -2 { - if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) { - Self::handle_error(format!("Failed to send shutdown signal to main thread from UpStream thread: {}", e).as_str()); - } + let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown); + let _ = tls_stream.shutdown(); return; } else if byte_count == -1 { continue; @@ -492,8 +407,6 @@ impl UpStreamInner { let mut data_length: i64 = 0; - //let mut stream_lock = self.us_stream.as_mut().unwrap().lock().unwrap(); - loop { let mut r_buf = [0; 1024]; @@ -550,15 +463,10 @@ pub struct FullDuplexTcp where H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static, { - //ds_tcp_stream: Arc>, - //us_tcp_stream: Option>>, - //remote_endpoint: String, remote_host: String, remote_port: String, ds_inner_m: Arc>>, - //ds_inner_m: DownStreamInner, us_inner_m: Arc>>, - //us_inner_m: UpStreamInner, inner_handlers: InnerHandlers, } @@ -566,13 +474,11 @@ impl) -> Result { - //let _ = ds_tcp_stream.set_read_timeout(Some(Duration::from_millis(50))); match ds_tcp_stream { DataStreamType::RAW(ref s) => { let _ = s.set_read_timeout(Some(Duration::from_millis(50))); }, DataStreamType::TLS(ref s) => { let _ = s.get_ref().set_read_timeout(Some(Duration::from_millis(50))); }, } - // Connect to remote here let us_tcp_stream = match Self::connect_endpoint(us_tcp_stream_type, remote_host.clone(), remote_port.clone()) { Ok(s) => s, Err(ec) => { @@ -586,34 +492,23 @@ impl::new()}))), - //ds_inner_m: DownStreamInner{ds_stream: ds_tcp_stream, internal_data_buffer: Vec::::new()}, us_inner_m: Arc::new(Mutex::new(Some(UpStreamInner{us_stream: us_tcp_stream, internal_data_buffer: Vec::::new()}))), - //us_inner_m: UpStreamInner{us_stream: us_tcp_stream, internal_data_buffer: Vec::::new()}, inner_handlers: handlers, }) } pub fn handle(&mut self) { -/* - if self.connect_endpoint() < 0 { - let _ = self.ds_tcp_stream.lock().unwrap().get_ref().shutdown(Shutdown::Both); - return; - } -*/ + let (state_sender, state_receiver): (Sender, Receiver) = mpsc::channel(); let (ds_data_pipe_sender, ds_data_pipe_receiver): (Sender, Receiver) = mpsc::channel(); let (us_data_pipe_sender, us_data_pipe_receiver): (Sender, Receiver) = mpsc::channel(); - //self.ds_inner_m.lock().unwrap().ds_stream = Some(self.ds_tcp_stream.clone()); let ds_method_pointer = self.ds_inner_m.clone(); let ds_state_bc = state_sender.clone(); - //self.us_inner_m.lock().unwrap().us_stream = Some(self.us_tcp_stream.as_ref().unwrap().clone()); let us_method_pointer = self.us_inner_m.clone(); let us_state_bc = state_sender.clone(); @@ -776,7 +671,6 @@ impl { @@ -800,12 +694,7 @@ impl) or DataStreamType::RAW(TcpStream) - * This will be passed into the Full Duplex TCP simulator object and those - * methods within will decide which handler is called for each specific stream. - * WILL ONLY DECIDE HANDLER IF CANT DO GENERIC DATA TYPING WITH TRAITS - * The remote / UpStream data type will be decided by another DataStreamType variant and passed separately - * into the Full Duplex TCP simulator. - - * Maybe streams passed into FDTCP simulator object could be typed as Trait requirements instead of a strict type? - * Like Write/Read etc. - */ - let rhost = self.config.remote_host.clone(); let rport = self.config.remote_port.clone(); - //let remote_endpoint = format!("{}:{}", rhost, rport); let listener = TcpListener::bind(format!("{}:{}", self.config.bind_host.clone(), self.config.bind_port.clone())).unwrap(); let upstream_data_stream_type = self.config.upstream_data_type;