From 490caf9e5ed6031e18b4202cf1e91a7c69288cbc Mon Sep 17 00:00:00 2001 From: PinkP4nther <0x0090@protonmail.com> Date: Tue, 28 Sep 2021 03:32:36 -0700 Subject: [PATCH] New feature: Data Stream Types (TLS/RAW). Performance enhancements. (V0.4.0) --- .gitignore | 2 +- Cargo.toml | 2 +- README.md | 2 + examples/basic/src/main.rs | 27 +- examples/modifydata/relay_config.toml | 4 +- examples/modifydata/src/main.rs | 5 +- relay_config.example.toml | 2 + src/data.rs | 383 ++++++++++++++++++-------- src/lib.rs | 121 ++++++-- 9 files changed, 383 insertions(+), 165 deletions(-) diff --git a/.gitignore b/.gitignore index b835682..8ebe069 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,3 @@ /target ideas.txt -Cargo.lock +Cargo.lock \ No newline at end of file diff --git a/Cargo.toml b/Cargo.toml index 5f76870..672dfce 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "sslrelay" -version = "0.3.1" +version = "0.4.0" authors = ["PinkP4nther "] edition = "2018" diff --git a/README.md b/README.md index f1e1b35..78b096a 100644 --- a/README.md +++ b/README.md @@ -16,4 +16,6 @@ Then use this library to continuously rewrite or display decrypted network traff 09/21/2021 | v0.3.1 | Fully supports IPv6. +09/28/2021 | v0.4.0 | New feature added: Stream data types. Can now set type of stream data TLS or RAW. And some performance improvements. + More updates/ideas to come.. I think.. \ No newline at end of file diff --git a/examples/basic/src/main.rs b/examples/basic/src/main.rs index 6ce174c..c9c2834 100644 --- a/examples/basic/src/main.rs +++ b/examples/basic/src/main.rs @@ -1,4 +1,4 @@ -use sslrelay::{self, ConfigType, RelayConfig, HandlerCallbacks, CallbackRet}; +use sslrelay::{self, ConfigType, RelayConfig, HandlerCallbacks, CallbackRet, TCPDataType}; // Handler object #[derive(Clone)] // Must have Clone trait implemented. @@ -36,18 +36,19 @@ impl HandlerCallbacks for Handler { fn main() { // Create new SSLRelay object - let mut relay = sslrelay::SSLRelay::new(Handler); - - // Load Configuration - relay.load_config(ConfigType::Conf(RelayConfig { - bind_host: "0.0.0.0".to_string(), - bind_port: "443".to_string(), - remote_host: "remote.com".to_string(), - remote_port: "443".to_string(), - ssl_private_key_path: "./remote.com.key".to_string(), - ssl_cert_path: "./remote.com.crt".to_string(), - })); - + let mut relay = sslrelay::SSLRelay::new( + Handler, + ConfigType::Conf(RelayConfig { + downstream_data_type: TCPDataType::TLS, + upstream_data_type: TCPDataType::TLS, + bind_host: "0.0.0.0".to_string(), + bind_port: "443".to_string(), + remote_host: "remote.com".to_string(), + remote_port: "443".to_string(), + ssl_private_key_path: "./remote.com.key".to_string(), + ssl_cert_path: "./remote.com.crt".to_string(), + }) + ); // Start listening relay.start(); } \ No newline at end of file diff --git a/examples/modifydata/relay_config.toml b/examples/modifydata/relay_config.toml index 064379d..feb1885 100644 --- a/examples/modifydata/relay_config.toml +++ b/examples/modifydata/relay_config.toml @@ -3,4 +3,6 @@ bind_port = "443" ssl_private_key_path = "./remote.com.key" ssl_cert_path = "./remote.com.crt" remote_host = "remote.com" -remote_port = "443" \ No newline at end of file +remote_port = "443" +downstream_data_type = "tls" +upstream_data_type = "tls" \ No newline at end of file diff --git a/examples/modifydata/src/main.rs b/examples/modifydata/src/main.rs index 601a814..96657ea 100644 --- a/examples/modifydata/src/main.rs +++ b/examples/modifydata/src/main.rs @@ -38,10 +38,7 @@ impl HandlerCallbacks for Handler { fn main() { // Create new SSLRelay object - let mut relay = sslrelay::SSLRelay::new(Handler); - - // Load Configuration - relay.load_config(ConfigType::Default); + let mut relay = sslrelay::SSLRelay::new(Handler, ConfigType::Default); // Start listening relay.start(); diff --git a/relay_config.example.toml b/relay_config.example.toml index bdc1e57..5bb8f2c 100644 --- a/relay_config.example.toml +++ b/relay_config.example.toml @@ -4,3 +4,5 @@ ssl_private_key_path = "./ssl.key" ssl_cert_path = "./ssl.crt" remote_host = "remote.com" remote_port = "443" +downstream_data_type = "tls" +upstream_data_type = "tls" \ No newline at end of file diff --git a/src/data.rs b/src/data.rs index cab835b..6ba5565 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,12 +1,13 @@ use std::time::Duration; -use openssl::ssl::{Ssl, SslConnector, SslMethod, SslStream, SslVerifyMode}; +use openssl::ssl::{SslConnector, SslMethod, SslStream, SslVerifyMode}; use std::io::{self, Read, Write}; use std::net::{TcpStream, Shutdown}; use std::sync::mpsc::{self, Receiver, Sender}; use std::thread; +use std::result::Result; use std::sync::{Arc, Mutex}; -use crate::{HandlerCallbacks, CallbackRet, InnerHandlers}; +use crate::{HandlerCallbacks, CallbackRet, InnerHandlers, TCPDataType}; #[derive(Debug)] enum FullDuplexTcpState { @@ -26,7 +27,7 @@ enum DataPipe { trait DataStream {} impl DataStream for S {} */ -enum DataStreamType { +pub enum DataStreamType { RAW(TcpStream), TLS(SslStream), } @@ -52,24 +53,33 @@ impl DataStreamType { struct DownStreamInner { //ds_stream: Option>>>, - ds_stream: Option>>, + //ds_stream: Arc>, + ds_stream: DataStreamType, internal_data_buffer: Vec, } impl DownStreamInner { - fn handle_error(&self, error_description: &str) { + fn handle_error(error_description: &str) { println!("[SSLRelay DownStream Thread Error]: {}", error_description); - self.ds_stream.as_ref().unwrap().lock().unwrap().shutdown(); + //self.ds_stream.as_ref().shutdown(); } - pub fn ds_handler(&mut self, data_out: Sender, data_in: Receiver) { - - + pub fn ds_handler(self, data_out: Sender, data_in: Receiver) { + match &self.ds_stream { + DataStreamType::RAW(_) => self.handle_raw(data_out, data_in), + DataStreamType::TLS(_) => self.handle_tls(data_out, data_in), + } } - fn handle_raw(&mut self, data_out: Sender, data_in: Receiver) { + fn handle_raw(mut self, data_out: Sender, data_in: Receiver) { + + let mut raw_stream = match &self.ds_stream { + DataStreamType::RAW(ref s) => s, + _ => return, + }; + loop { match data_in.recv_timeout(Duration::from_millis(50)) { @@ -79,7 +89,7 @@ impl DownStreamInner { match data_received { DataPipe::DataWrite(data) => { - +/* let mut stream_lock = match self.ds_stream.as_ref().unwrap().lock() { Ok(sl) => sl, Err(_e) => { @@ -90,23 +100,23 @@ impl DownStreamInner { return; } }; - - match stream_lock.write_all(&data) { +*/ + match raw_stream.write_all(&data) { Ok(()) => {}, Err(_e) => { - self.handle_error("Failed to write data to DownStream tcp stream!"); + Self::handle_error("Failed to write data to DownStream tcp stream!"); if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) { - self.handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); + let _ = raw_stream.shutdown(Shutdown::Both); } return; } } - let _ = stream_lock.flush(); - drop(stream_lock); + let _ = raw_stream.flush(); }, DataPipe::Shutdown => { - self.ds_stream.as_ref().unwrap().lock().unwrap().shutdown(); + let _ = raw_stream.shutdown(Shutdown::Both); return; }, } @@ -115,7 +125,7 @@ impl DownStreamInner { match _e { mpsc::RecvTimeoutError::Timeout => {}, mpsc::RecvTimeoutError::Disconnected => { - self.handle_error("DownStream data_in channel is disconnected!"); + Self::handle_error("DownStream data_in channel is disconnected!"); return; } } @@ -123,11 +133,11 @@ impl DownStreamInner { }// End of data_in receive // If received data - if let Some(byte_count) = self.get_data_stream() { + if let Some(byte_count) = Self::get_data_stream(&mut raw_stream, &mut self.internal_data_buffer) { if byte_count > 0 { if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamWrite(self.internal_data_buffer.clone())) { - self.handle_error(format!("Failed to send UpStreamWrite to main thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send UpStreamWrite to main thread: {}", e).as_str()); return; } @@ -136,7 +146,7 @@ impl DownStreamInner { } else if byte_count == 0 || byte_count == -2 { if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) { - self.handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); } return; @@ -149,7 +159,12 @@ impl DownStreamInner { } } - fn handle_tls(&mut self, data_out: Sender, data_in: Receiver) { + fn handle_tls(mut self, data_out: Sender, data_in: Receiver) { + + let mut tls_stream = match self.ds_stream { + DataStreamType::TLS(ref mut s) => s, + _ => return, + }; loop { @@ -160,7 +175,7 @@ impl DownStreamInner { match data_received { DataPipe::DataWrite(data) => { - +/* let mut stream_lock = match self.ds_stream.as_ref().unwrap().lock() { Ok(sl) => sl, Err(_e) => { @@ -171,23 +186,21 @@ impl DownStreamInner { return; } }; - - match stream_lock.write_all(&data) { +*/ + match tls_stream.write_all(&data) { Ok(()) => {}, Err(_e) => { - self.handle_error("Failed to write data to DownStream tcp stream!"); + Self::handle_error("Failed to write data to DownStream tcp stream!"); if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) { - self.handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); } return; } } - let _ = stream_lock.flush(); - drop(stream_lock); - + let _ = tls_stream.flush(); }, DataPipe::Shutdown => { - self.ds_stream.as_ref().unwrap().lock().unwrap().shutdown(); + let _ = tls_stream.shutdown(); return; }, } @@ -196,7 +209,7 @@ impl DownStreamInner { match _e { mpsc::RecvTimeoutError::Timeout => {}, mpsc::RecvTimeoutError::Disconnected => { - self.handle_error("DownStream data_in channel is disconnected!"); + Self::handle_error("DownStream data_in channel is disconnected!"); return; } } @@ -204,11 +217,11 @@ impl DownStreamInner { }// End of data_in receive // If received data - if let Some(byte_count) = self.get_data_stream() { + if let Some(byte_count) = Self::get_data_stream(&mut tls_stream, &mut self.internal_data_buffer) { if byte_count > 0 { if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamWrite(self.internal_data_buffer.clone())) { - self.handle_error(format!("Failed to send UpStreamWrite to main thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send UpStreamWrite to main thread: {}", e).as_str()); return; } @@ -217,7 +230,7 @@ impl DownStreamInner { } else if byte_count == 0 || byte_count == -2 { if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamShutDown) { - self.handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send shutdown signal to main thread from DownStream thread: {}", e).as_str()); } return; @@ -230,17 +243,17 @@ impl DownStreamInner { } } - fn get_data_stream(&mut self) -> Option { + fn get_data_stream(stream: &mut S, internal_data_buffer: &mut Vec) -> Option { let mut data_length: i64 = 0; - let mut stream_lock = self.ds_stream.as_mut().unwrap().lock().unwrap(); + //let mut stream_lock = self.ds_stream.as_mut().unwrap().lock().unwrap(); loop { let mut r_buf = [0; 1024]; - match stream_lock.read(&mut r_buf) { + match stream.read(&mut r_buf) { Ok(bytes_read) => { @@ -257,7 +270,7 @@ impl DownStreamInner { //let _bw = self.internal_data_buffer.write(&tmp_buf).unwrap(); - let _bw = self.internal_data_buffer.write(r_buf.split_at(bytes_read).0).unwrap(); + let _bw = internal_data_buffer.write(r_buf.split_at(bytes_read).0).unwrap(); data_length += bytes_read as i64; } else { @@ -290,18 +303,33 @@ impl DownStreamInner { struct UpStreamInner { - us_stream: Option>>, + //us_stream: Option>>, + us_stream: DataStreamType, internal_data_buffer: Vec } impl UpStreamInner { - fn handle_error(&self, error_description: &str) { + fn handle_error(error_description: &str) { println!("[SSLRelay UpStream Thread Error]: {}", error_description); - let _ = self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both); + //let _ = self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both); } - pub fn us_handler(&mut self, data_out: Sender, data_in: Receiver) { + pub fn us_handler(self, data_out: Sender, data_in: Receiver) { + + match &self.us_stream { + DataStreamType::RAW(_) => self.handle_raw(data_out, data_in), + DataStreamType::TLS(_) => self.handle_tls(data_out, data_in), + } + + } + + fn handle_raw(mut self, data_out: Sender, data_in: Receiver) { + + let mut raw_stream = match self.us_stream { + DataStreamType::RAW(ref s) => s, + _ => return, + }; loop { @@ -311,7 +339,7 @@ impl UpStreamInner { match data_received { DataPipe::DataWrite(data) => { - +/* let mut stream_lock = match self.us_stream.as_ref().unwrap().lock() { Ok(sl) => sl, Err(_e) => { @@ -323,8 +351,8 @@ impl UpStreamInner { return; } }; - - match stream_lock.write_all(&data) { +*/ + match raw_stream.write_all(&data) { Ok(()) => {}, Err(_e) => { println!("Failed to write data to UpStream tcp stream!"); @@ -335,12 +363,10 @@ impl UpStreamInner { return; } } - let _ = stream_lock.flush(); - drop(stream_lock); - + let _ = raw_stream.flush(); }, DataPipe::Shutdown => { - self.us_stream.as_ref().unwrap().lock().unwrap().get_ref().shutdown(Shutdown::Both).unwrap(); + let _ = raw_stream.shutdown(Shutdown::Both); return; } } @@ -349,18 +375,18 @@ impl UpStreamInner { match e { mpsc::RecvTimeoutError::Timeout => {}, mpsc::RecvTimeoutError::Disconnected => { - self.handle_error("UpStream data_in channel is disconnected!"); + Self::handle_error("UpStream data_in channel is disconnected!"); return; } } } }// End of data_in receive - if let Some(byte_count) = self.get_data_stream() { + if let Some(byte_count) = Self::get_data_stream(&mut raw_stream, &mut self.internal_data_buffer) { if byte_count > 0 { if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamWrite(self.internal_data_buffer.clone())) { - self.handle_error(format!("Failed to send DownStreamWrite to main thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send DownStreamWrite to main thread: {}", e).as_str()); return; } @@ -369,7 +395,7 @@ impl UpStreamInner { } else if byte_count == 0 || byte_count == -2 { if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) { - self.handle_error(format!("Failed to send shutdown signal to main thread from UpStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send shutdown signal to main thread from UpStream thread: {}", e).as_str()); } return; } else if byte_count == -1 { @@ -380,17 +406,99 @@ impl UpStreamInner { } } - fn get_data_stream(&mut self) -> Option { + fn handle_tls(mut self, data_out: Sender, data_in: Receiver) { + + let mut tls_stream = match self.us_stream { + DataStreamType::TLS(ref mut s) => s, + _ => return, + }; + + loop { + + match data_in.recv_timeout(Duration::from_millis(50)) { + + Ok(data_received) => { + + match data_received { + DataPipe::DataWrite(data) => { +/* + let mut stream_lock = match self.us_stream.as_ref().unwrap().lock() { + Ok(sl) => sl, + Err(_e) => { + self.handle_error("Failed to get stream lock!"); + if let Err(_e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) { + //println!("[SSLRelay UpStream Thread Error]: Failed to send shutdown signal to main thread from UpStream thread: {}", e); + //return; + } + return; + } + }; +*/ + match tls_stream.write_all(&data) { + Ok(()) => {}, + Err(_e) => { + println!("Failed to write data to UpStream tcp stream!"); + if let Err(_e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) { + //println!("[SSLRelay UpStream Thread Error]: Failed to send shutdown signal to main thread from UpStream thread: {}", e); + //return; + } + return; + } + } + let _ = tls_stream.flush(); + }, + DataPipe::Shutdown => { + let _ = tls_stream.shutdown(); + return; + } + } + }, + Err(e) => { + match e { + mpsc::RecvTimeoutError::Timeout => {}, + mpsc::RecvTimeoutError::Disconnected => { + Self::handle_error("UpStream data_in channel is disconnected!"); + return; + } + } + } + }// End of data_in receive + + if let Some(byte_count) = Self::get_data_stream(&mut tls_stream, &mut self.internal_data_buffer) { + if byte_count > 0 { + + if let Err(e) = data_out.send(FullDuplexTcpState::DownStreamWrite(self.internal_data_buffer.clone())) { + Self::handle_error(format!("Failed to send DownStreamWrite to main thread: {}", e).as_str()); + return; + } + + self.internal_data_buffer.clear(); + + } else if byte_count == 0 || byte_count == -2 { + + if let Err(e) = data_out.send(FullDuplexTcpState::UpStreamShutDown) { + Self::handle_error(format!("Failed to send shutdown signal to main thread from UpStream thread: {}", e).as_str()); + } + return; + } else if byte_count == -1 { + continue; + } + } else { + } + } + } + + fn get_data_stream(stream: &mut S, internal_data_buffer: &mut Vec) -> Option { let mut data_length: i64 = 0; - let mut stream_lock = self.us_stream.as_mut().unwrap().lock().unwrap(); + //let mut stream_lock = self.us_stream.as_mut().unwrap().lock().unwrap(); loop { let mut r_buf = [0; 1024]; - match stream_lock.read(&mut r_buf) { + match stream.read(&mut r_buf) { Ok(bytes_read) => { @@ -408,7 +516,7 @@ impl UpStreamInner { //let _bw = self.internal_data_buffer.write(&tmp_buf).unwrap(); - let _bw = self.internal_data_buffer.write(r_buf.split_at(bytes_read).0).unwrap(); + let _bw = internal_data_buffer.write(r_buf.split_at(bytes_read).0).unwrap(); data_length += bytes_read as i64; } else { @@ -437,62 +545,84 @@ impl UpStreamInner { } } +#[allow(dead_code)] pub struct FullDuplexTcp where H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static, { - ds_tcp_stream: Arc>, - us_tcp_stream: Option>>, + //ds_tcp_stream: Arc>, + //us_tcp_stream: Option>>, //remote_endpoint: String, remote_host: String, remote_port: String, - ds_inner_m: Arc>, - us_inner_m: Arc>, + ds_inner_m: Arc>>, + //ds_inner_m: DownStreamInner, + us_inner_m: Arc>>, + //us_inner_m: UpStreamInner, inner_handlers: InnerHandlers, } impl FullDuplexTcp { - pub fn new(ds_tcp_stream: SslStream, remote_host: String, remote_port: String, handlers: InnerHandlers) -> Self { + pub fn new(ds_tcp_stream: DataStreamType, us_tcp_stream_type: TCPDataType, remote_host: String, remote_port: String, handlers: InnerHandlers) -> Result { - let _ = ds_tcp_stream.get_ref().set_read_timeout(Some(Duration::from_millis(50))); + //let _ = ds_tcp_stream.set_read_timeout(Some(Duration::from_millis(50))); + match ds_tcp_stream { + DataStreamType::RAW(ref s) => { let _ = s.set_read_timeout(Some(Duration::from_millis(50))); }, + DataStreamType::TLS(ref s) => { let _ = s.get_ref().set_read_timeout(Some(Duration::from_millis(50))); }, + } - FullDuplexTcp { - ds_tcp_stream: Arc::new(Mutex::new(ds_tcp_stream)), - us_tcp_stream: None, + // Connect to remote here + let us_tcp_stream = match Self::connect_endpoint(us_tcp_stream_type, remote_host.clone(), remote_port.clone()) { + Ok(s) => s, + Err(ec) => { + match ds_tcp_stream { + DataStreamType::RAW(s) => { let _ = s.shutdown(Shutdown::Both); }, + DataStreamType::TLS(mut s) => { let _ = s.shutdown(); }, + } + return Err(ec); + } + }; + + Ok( + FullDuplexTcp { + //ds_tcp_stream: Arc::new(Mutex::new(ds_tcp_stream)), + //us_tcp_stream: None, remote_host, remote_port, - ds_inner_m: Arc::new(Mutex::new(DownStreamInner{ds_stream: None, internal_data_buffer: Vec::::new()})), - us_inner_m: Arc::new(Mutex::new(UpStreamInner{us_stream: None, internal_data_buffer: Vec::::new()})), + ds_inner_m: Arc::new(Mutex::new(Some(DownStreamInner{ds_stream: ds_tcp_stream, internal_data_buffer: Vec::::new()}))), + //ds_inner_m: DownStreamInner{ds_stream: ds_tcp_stream, internal_data_buffer: Vec::::new()}, + us_inner_m: Arc::new(Mutex::new(Some(UpStreamInner{us_stream: us_tcp_stream, internal_data_buffer: Vec::::new()}))), + //us_inner_m: UpStreamInner{us_stream: us_tcp_stream, internal_data_buffer: Vec::::new()}, inner_handlers: handlers, - } + }) } pub fn handle(&mut self) { - +/* if self.connect_endpoint() < 0 { let _ = self.ds_tcp_stream.lock().unwrap().get_ref().shutdown(Shutdown::Both); return; } - +*/ let (state_sender, state_receiver): (Sender, Receiver) = mpsc::channel(); let (ds_data_pipe_sender, ds_data_pipe_receiver): (Sender, Receiver) = mpsc::channel(); let (us_data_pipe_sender, us_data_pipe_receiver): (Sender, Receiver) = mpsc::channel(); - self.ds_inner_m.lock().unwrap().ds_stream = Some(self.ds_tcp_stream.clone()); + //self.ds_inner_m.lock().unwrap().ds_stream = Some(self.ds_tcp_stream.clone()); let ds_method_pointer = self.ds_inner_m.clone(); let ds_state_bc = state_sender.clone(); - self.us_inner_m.lock().unwrap().us_stream = Some(self.us_tcp_stream.as_ref().unwrap().clone()); + //self.us_inner_m.lock().unwrap().us_stream = Some(self.us_tcp_stream.as_ref().unwrap().clone()); let us_method_pointer = self.us_inner_m.clone(); let us_state_bc = state_sender.clone(); thread::spawn(move || { - ds_method_pointer.lock().unwrap().ds_handler(ds_state_bc, ds_data_pipe_receiver); + ds_method_pointer.lock().unwrap().take().unwrap().ds_handler(ds_state_bc, ds_data_pipe_receiver); }); thread::spawn(move || { - us_method_pointer.lock().unwrap().us_handler(us_state_bc, us_data_pipe_receiver); + us_method_pointer.lock().unwrap().take().unwrap().us_handler(us_state_bc, us_data_pipe_receiver); }); loop { @@ -525,7 +655,7 @@ impl {}, Err(e) => { - self.handle_error(format!("Failed to send data write to DownStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send data write to DownStream thread: {}", e).as_str()); return; } } @@ -534,7 +664,7 @@ impl {}, Err(e) => { - self.handle_error(format!("Failed to send data write to DownStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send data write to DownStream thread: {}", e).as_str()); return; } } @@ -542,10 +672,10 @@ impl {}, CallbackRet::Shutdown => { if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) { - self.handle_error(format!("Failed to send Shutdown signal to UpStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send Shutdown signal to UpStream thread: {}", e).as_str()); } if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) { - self.handle_error(format!("Failed to send Shutdown signal to DownStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send Shutdown signal to DownStream thread: {}", e).as_str()); } return; } @@ -571,7 +701,7 @@ impl {}, Err(e) => { - self.handle_error(format!("Failed to send data write to UpStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send data write to UpStream thread: {}", e).as_str()); return; } } @@ -580,7 +710,7 @@ impl {}, Err(e) => { - self.handle_error(format!("Failed to send data write to DownStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send data write to DownStream thread: {}", e).as_str()); return; } } @@ -588,10 +718,10 @@ impl {}, CallbackRet::Shutdown => { if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) { - self.handle_error(format!("Failed to send Shutdown signal to DownStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send Shutdown signal to DownStream thread: {}", e).as_str()); } if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) { - self.handle_error(format!("Failed to send Shutdown signal to UpStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send Shutdown signal to UpStream thread: {}", e).as_str()); } return; } @@ -601,7 +731,7 @@ impl { if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) { - self.handle_error(format!("Failed to send Shutdown signal to UpStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send Shutdown signal to UpStream thread: {}", e).as_str()); return; } return; @@ -610,7 +740,7 @@ impl { if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) { - self.handle_error(format!("Failed to send Shutdown signal to DownStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send Shutdown signal to DownStream thread: {}", e).as_str()); return; } return; @@ -618,12 +748,12 @@ impl { - self.handle_error("State receiver communication channel has closed!"); + Self::handle_error("State receiver communication channel has closed!"); if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) { - self.handle_error(format!("Failed to send Shutdown signal to DownStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send Shutdown signal to DownStream thread: {}", e).as_str()); } if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) { - self.handle_error(format!("Failed to send Shutdown signal to UpStream thread: {}", e).as_str()); + Self::handle_error(format!("Failed to send Shutdown signal to UpStream thread: {}", e).as_str()); } return; } @@ -631,39 +761,58 @@ impl i8 { + fn connect_endpoint(stream_data_type: TCPDataType, remote_host: String, remote_port: String) -> Result { - let mut sslbuilder = SslConnector::builder(SslMethod::tls()).unwrap(); - sslbuilder.set_verify(SslVerifyMode::NONE); + match stream_data_type { - let connector = sslbuilder.build(); + TCPDataType::RAW => { + let s = match TcpStream::connect(format!("{}:{}", remote_host, remote_port)) { + Ok(s) => s, + Err(e) => { + Self::handle_error(format!("Can't connect to remote host: {}\nErr: {}", format!("{}:{}", remote_host, remote_port), e).as_str()); + return Result::Err(-1); + } + }; + let _ = s.set_read_timeout(Some(Duration::from_millis(50))); + return Ok(DataStreamType::RAW(s)); - let s = match TcpStream::connect(format!("{}:{}", self.remote_host, self.remote_port)) { - Ok(s) => s, - Err(e) => { - self.handle_error(format!("Can't connect to remote host: {}\nErr: {}", format!("{}:{}", self.remote_host, self.remote_port), e).as_str()); - return -1; + + }, + TCPDataType::TLS => { + + let mut sslbuilder = SslConnector::builder(SslMethod::tls()).unwrap(); + sslbuilder.set_verify(SslVerifyMode::NONE); + + let connector = sslbuilder.build(); + + let s = match TcpStream::connect(format!("{}:{}", remote_host, remote_port)) { + Ok(s) => s, + Err(e) => { + Self::handle_error(format!("Can't connect to remote host: {}\nErr: {}", format!("{}:{}", remote_host, remote_port), e).as_str()); + return Result::Err(-1); + } + }; + + let s = match connector.connect(remote_host.as_str(), s) { + Ok(s) => s, + Err(e) => { + Self::handle_error(format!("Failed to accept TLS/SSL handshake: {}", e).as_str()); + return Result::Err(-2); + } + }; + /* + self.us_tcp_stream = Some( + Arc::new( + Mutex::new( + s + )));*/ + let _ = s.get_ref().set_read_timeout(Some(Duration::from_millis(50))); + return Ok(DataStreamType::TLS(s)); } - }; - - let s = match connector.connect(self.remote_host.as_str(), s) { - Ok(s) => s, - Err(e) => { - self.handle_error(format!("Failed to accept TLS/SSL handshake: {}", e).as_str()); - return -2; - } - }; - - self.us_tcp_stream = Some( - Arc::new( - Mutex::new( - s - ))); - let _ = self.us_tcp_stream.as_ref().unwrap().lock().unwrap().get_ref().set_read_timeout(Some(Duration::from_millis(50))); - return 0; + } } - fn handle_error(&self, error_description: &str) { + fn handle_error(error_description: &str) { println!("[SSLRelay Master Thread Error]: {}", error_description); } } \ No newline at end of file diff --git a/src/lib.rs b/src/lib.rs index 21c7648..50a959d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,10 +9,18 @@ use std::path::Path; use toml::Value as TValue; mod data; -use data::FullDuplexTcp; +use data::{FullDuplexTcp, DataStreamType}; + +#[derive(Copy, Clone)] +pub enum TCPDataType { + TLS, + RAW, +} #[derive(Clone)] pub struct RelayConfig { + pub downstream_data_type: TCPDataType, + pub upstream_data_type: TCPDataType, pub bind_host: String, pub bind_port: String, pub remote_host: String, @@ -92,41 +100,73 @@ impl { + TCPDataType::TLS => { + let acceptor = self.setup_ssl_config(self.config.ssl_private_key_path.clone(), self.config.ssl_cert_path.clone()); - let acceptor = acceptor.clone(); - let handler_clone = self.handlers.as_ref().unwrap().clone(); - //let r_endpoint = remote_endpoint.clone(); + for stream in listener.incoming() { + + match stream { + Ok(stream) => { + + let acceptor = acceptor.clone(); + let handler_clone = self.handlers.as_ref().unwrap().clone(); + + let r_host = rhost.clone(); + let r_port = rport.clone(); + + thread::spawn(move || { + + match acceptor.accept(stream) { + Ok(stream) => { + // FULL DUPLEX OBJECT CREATION HERE + match FullDuplexTcp::new(DataStreamType::TLS(stream), upstream_data_stream_type, r_host, r_port, handler_clone) { + Ok(mut fdtcp) => fdtcp.handle(), + Err(_ec) => {} + } + }, + Err(e) => { + + println!("[Error] {}", e); + } + } + }); + }, + Err(e) => {println!("[Error] Tcp Connection Failed: {}", e)} + } + } + }, - let r_host = rhost.clone(); - let r_port = rport.clone(); + TCPDataType::RAW => { - let r_host = rhost.clone(); - let r_port = rport.clone(); - - thread::spawn(move || { - - match acceptor.accept(stream) { - Ok(stream) => { + for stream in listener.incoming() { + + match stream { + Ok(stream) => { + + let handler_clone = self.handlers.as_ref().unwrap().clone(); + + let r_host = rhost.clone(); + let r_port = rport.clone(); + + thread::spawn(move || { + // FULL DUPLEX OBJECT CREATION HERE - FullDuplexTcp::new(stream, r_host, r_port, handler_clone).handle(); - }, - Err(e) => { + match FullDuplexTcp::new(DataStreamType::RAW(stream), upstream_data_stream_type, r_host, r_port, handler_clone) { + Ok(mut fdtcp) => fdtcp.handle(), + Err(_ec) => {}, + } - println!("[Error] {}", e); - } - } - }); - }, - Err(e) => {println!("[Error] Tcp Connection Failed: {}", e)} + }); + }, + Err(e) => {println!("[Error] Tcp Connection Failed: {}", e)} + } + } } } } @@ -163,8 +203,33 @@ impl