Fmt, add experimental features

This commit is contained in:
Pascal Engélibert 2025-10-30 11:54:08 +01:00
commit 11a4777cfe
8 changed files with 1091 additions and 963 deletions

3
.vscode/settings.json vendored Normal file
View file

@ -0,0 +1,3 @@
{
"rust-analyzer.showUnlinkedFileNotification": false
}

66
examples/basic.rs Normal file
View file

@ -0,0 +1,66 @@
use sslrelay::{self, CallbackRet, HandlerCallbacks, RelayConfig, TCPDataType, TLSConfig};
// Handler object
#[derive(Clone)] // Must have Clone trait implemented.
struct Handler;
/*
Callback traits that can be used to read or inject data
into data upstream or downstream.
*/
impl HandlerCallbacks for Handler {
// DownStream non blocking callback
fn ds_nb_callback(&self, in_data: Vec<u8>, _conn_id: u64) {
if let Ok(in_data) = str::from_utf8(&in_data) {
println!("[downstream] {in_data}");
} else {
//println!("[downstream] {in_data:?}");
}
}
// DownStream blocking callback
fn ds_b_callback(&mut self, _in_data: Vec<u8>, _conn_id: u64) -> CallbackRet {
//println!("[CALLBACK] Down Stream Blocking CallBack!");
CallbackRet::Relay(_in_data)
}
// UpStream non blocking callback
fn us_nb_callback(&self, in_data: Vec<u8>, _conn_id: u64) {
if let Ok(in_data) = str::from_utf8(&in_data) {
println!("[upstream] {in_data}");
} else {
//println!("[upstream] {in_data:?}");
}
}
// UpStream blocking callback
fn us_b_callback(&mut self, _in_data: Vec<u8>, _conn_id: u64) -> CallbackRet {
//println!("[CALLBACK] Up Stream Blocking CallBack!");
CallbackRet::Relay(_in_data)
}
}
fn main() {
// Create new SSLRelay object
let mut relay = sslrelay::SSLRelay::new(
Handler,
RelayConfig {
downstream_data_type: TCPDataType::TLS,
upstream_data_type: TCPDataType::TLS,
bind_host: "127.0.0.1".to_string(),
bind_port: "443".to_string(),
remote_host: |server_name| {
server_name.map_or_else(|| panic!("NO HOST"), str::to_string)
},
remote_port: "443".to_string(),
tls_config:
TLSConfig::FILE {
certificate_path: "/dev/shm/exp/certs/prime256v1/apple.com.crt".to_string(),
private_key_path: "/dev/shm/exp/certs/prime256v1/apple.com.key".to_string(),
},
},
);
// Start listening
relay.start();
}

View file

@ -1,57 +0,0 @@
use sslrelay::{self, RelayConfig, HandlerCallbacks, CallbackRet, TCPDataType, TLSConfig};
// Handler object
#[derive(Clone)] // Must have Clone trait implemented.
struct Handler;
/*
Callback traits that can be used to read or inject data
into data upstream or downstream.
*/
impl HandlerCallbacks for Handler {
// DownStream non blocking callback
fn ds_nb_callback(&self, _in_data: Vec<u8>) {
println!("[CALLBACK] Down Stream Non Blocking CallBack!");
}
// DownStream blocking callback
fn ds_b_callback(&mut self, _in_data: Vec<u8>) -> CallbackRet {
println!("[CALLBACK] Down Stream Blocking CallBack!");
CallbackRet::Relay(_in_data)
}
// UpStream non blocking callback
fn us_nb_callback(&self, _in_data: Vec<u8>) {
println!("[CALLBACK] Up Stream Non Blocking CallBack!");
}
// UpStream blocking callback
fn us_b_callback(&mut self, _in_data: Vec<u8>) -> CallbackRet {
println!("[CALLBACK] Up Stream Blocking CallBack!");
CallbackRet::Relay(_in_data)
}
}
fn main() {
// Create new SSLRelay object
let mut relay = sslrelay::SSLRelay::new(
Handler,
RelayConfig {
downstream_data_type: TCPDataType::TLS,
upstream_data_type: TCPDataType::TLS,
bind_host: "0.0.0.0".to_string(),
bind_port: "443".to_string(),
remote_host: "remote.com".to_string(),
remote_port: "443".to_string(),
tls_config: TLSConfig::FILE{
certificate_path: "./tls.crt".to_string(),
private_key_path: "./tls.key".to_string(),
},
}
);
// Start listening
relay.start();
}

8
rustfmt.toml Normal file
View file

@ -0,0 +1,8 @@
hard_tabs = true
newline_style = "unix"
unstable_features = true
format_code_in_doc_comments = true
format_macro_bodies = true
format_macro_matchers = true
format_strings = true

View file

@ -1,27 +1,14 @@
use crate::{ use crate::{
DownStreamInner, io, mpsc, DataPipe, DataStreamType, DownStreamInner, Duration, FullDuplexTcpState, Read,
UpStreamInner, Receiver, Sender, Shutdown, UpStreamInner, Write,
FullDuplexTcpState,
DataPipe,
DataStreamType,
Sender,
Receiver,
Shutdown,
mpsc,
Duration,
Read,
Write,
io,
}; };
impl DownStreamInner { impl DownStreamInner {
fn handle_error(error_description: &str) { fn handle_error(error_description: &str) {
println!("[SSLRelay DownStream Thread Error]: {}", error_description); println!("[SSLRelay DownStream Thread Error]: {}", error_description);
} }
pub fn ds_handler(self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) { pub fn ds_handler(self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) {
match &self.ds_stream { match &self.ds_stream {
DataStreamType::RAW(_) => self.handle_raw(data_out, data_in), DataStreamType::RAW(_) => self.handle_raw(data_out, data_in),
DataStreamType::TLS(_) => self.handle_tls(data_out, data_in), DataStreamType::TLS(_) => self.handle_tls(data_out, data_in),
@ -29,200 +16,172 @@ impl DownStreamInner {
} }
fn handle_raw(mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) { fn handle_raw(mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) {
let mut raw_stream = match &self.ds_stream { let mut raw_stream = match &self.ds_stream {
DataStreamType::RAW(ref s) => s, DataStreamType::RAW(ref s) => s,
_ => return, _ => return,
}; };
loop { loop {
match data_in.recv_timeout(Duration::from_millis(50)) { match data_in.recv_timeout(Duration::from_millis(50)) {
// DataPipe Received // DataPipe Received
Ok(data_received) => { Ok(data_received) => match data_received {
match data_received {
DataPipe::DataWrite(data) => { DataPipe::DataWrite(data) => {
match raw_stream.write_all(&data) { match raw_stream.write_all(&data) {
Ok(()) => {}, Ok(()) => {}
Err(_e) => { Err(_e) => {
Self::handle_error("Failed to write data to DownStream tcp stream!"); Self::handle_error(
"Failed to write data to DownStream tcp stream!",
);
let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown); let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown);
let _ = raw_stream.shutdown(Shutdown::Both); let _ = raw_stream.shutdown(Shutdown::Both);
return; return;
} }
} }
let _ = raw_stream.flush(); let _ = raw_stream.flush();
}
},
DataPipe::Shutdown => { DataPipe::Shutdown => {
let _ = raw_stream.shutdown(Shutdown::Both); let _ = raw_stream.shutdown(Shutdown::Both);
return; return;
},
} }
}, },
Err(_e) => { Err(_e) => match _e {
match _e { mpsc::RecvTimeoutError::Timeout => {}
mpsc::RecvTimeoutError::Timeout => {},
mpsc::RecvTimeoutError::Disconnected => { mpsc::RecvTimeoutError::Disconnected => {
Self::handle_error("DownStream data_in channel is disconnected!"); Self::handle_error("DownStream data_in channel is disconnected!");
return; return;
} }
} },
} } // End of data_in receive
}// End of data_in receive
// If received data // If received data
if let Some(byte_count) = Self::get_data_stream(&mut raw_stream, &mut self.internal_data_buffer) { if let Some(byte_count) =
Self::get_data_stream(&mut raw_stream, &mut self.internal_data_buffer)
{
if byte_count > 0 { if byte_count > 0 {
if let Err(_e) = data_out.send(FullDuplexTcpState::UpStreamWrite(
if let Err(_e) = data_out.send(FullDuplexTcpState::UpStreamWrite(self.internal_data_buffer.clone())) { self.internal_data_buffer.clone(),
)) {
//Self::handle_error(format!("Failed to send UpStreamWrite to main thread: {}", e).as_str()); //Self::handle_error(format!("Failed to send UpStreamWrite to main thread: {}", e).as_str());
let _ = raw_stream.shutdown(Shutdown::Both); let _ = raw_stream.shutdown(Shutdown::Both);
return; return;
} }
self.internal_data_buffer.clear(); self.internal_data_buffer.clear();
} else if byte_count == 0 || byte_count == -2 { } else if byte_count == 0 || byte_count == -2 {
let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown); let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown);
let _ = raw_stream.shutdown(Shutdown::Both); let _ = raw_stream.shutdown(Shutdown::Both);
return; return;
} else if byte_count == -1 { } else if byte_count == -1 {
continue; continue;
} }
} else { } else {
} }
} }
} }
fn handle_tls(mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) { fn handle_tls(mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) {
let mut tls_stream = match self.ds_stream { let mut tls_stream = match self.ds_stream {
DataStreamType::TLS(ref mut s) => s, DataStreamType::TLS(ref mut s) => s,
_ => return, _ => return,
}; };
loop { loop {
match data_in.recv_timeout(Duration::from_millis(50)) { match data_in.recv_timeout(Duration::from_millis(50)) {
// DataPipe Received // DataPipe Received
Ok(data_received) => { Ok(data_received) => match data_received {
match data_received {
DataPipe::DataWrite(data) => { DataPipe::DataWrite(data) => {
match tls_stream.write_all(&data) { match tls_stream.write_all(&data) {
Ok(()) => {}, Ok(()) => {}
Err(_e) => { Err(_e) => {
Self::handle_error("Failed to write data to DownStream tcp stream!"); Self::handle_error(
"Failed to write data to DownStream tcp stream!",
);
let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown); let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown);
let _ = tls_stream.shutdown(); let _ = tls_stream.shutdown();
return; return;
} }
} }
let _ = tls_stream.flush(); let _ = tls_stream.flush();
}, }
DataPipe::Shutdown => { DataPipe::Shutdown => {
let _ = tls_stream.shutdown(); let _ = tls_stream.shutdown();
return; return;
},
} }
}, },
Err(_e) => { Err(_e) => match _e {
match _e { mpsc::RecvTimeoutError::Timeout => {}
mpsc::RecvTimeoutError::Timeout => {},
mpsc::RecvTimeoutError::Disconnected => { mpsc::RecvTimeoutError::Disconnected => {
Self::handle_error("DownStream data_in channel is disconnected!"); Self::handle_error("DownStream data_in channel is disconnected!");
return; return;
} }
} },
} } // End of data_in receive
}// End of data_in receive
// If received data // If received data
if let Some(byte_count) = Self::get_data_stream(&mut tls_stream, &mut self.internal_data_buffer) { if let Some(byte_count) =
Self::get_data_stream(&mut tls_stream, &mut self.internal_data_buffer)
{
if byte_count > 0 { if byte_count > 0 {
if let Err(_e) = data_out.send(FullDuplexTcpState::UpStreamWrite(
if let Err(_e) = data_out.send(FullDuplexTcpState::UpStreamWrite(self.internal_data_buffer.clone())) { self.internal_data_buffer.clone(),
)) {
//Self::handle_error(format!("Failed to send UpStreamWrite to main thread: {}", e).as_str()); //Self::handle_error(format!("Failed to send UpStreamWrite to main thread: {}", e).as_str());
let _ = tls_stream.shutdown(); let _ = tls_stream.shutdown();
return; return;
} }
self.internal_data_buffer.clear(); self.internal_data_buffer.clear();
} else if byte_count == 0 || byte_count == -2 { } else if byte_count == 0 || byte_count == -2 {
let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown); let _ = data_out.send(FullDuplexTcpState::DownStreamShutDown);
let _ = tls_stream.shutdown(); let _ = tls_stream.shutdown();
return; return;
} else if byte_count == -1 { } else if byte_count == -1 {
continue; continue;
} }
} else { } else {
} }
} }
} }
fn get_data_stream<S: Read>(stream: &mut S, internal_data_buffer: &mut Vec<u8>) -> Option<i64> { fn get_data_stream<S: Read>(stream: &mut S, internal_data_buffer: &mut Vec<u8>) -> Option<i64> {
let mut data_length: i64 = 0; let mut data_length: i64 = 0;
loop { loop {
let mut r_buf = [0; 1024]; let mut r_buf = [0; 1024];
match stream.read(&mut r_buf) { match stream.read(&mut r_buf) {
Ok(bytes_read) => { Ok(bytes_read) => {
if bytes_read == 0 { if bytes_read == 0 {
break; break;
} else if bytes_read != 0 && bytes_read <= 1024 { } else if bytes_read != 0 && bytes_read <= 1024 {
/* /*
let mut tmp_buf = r_buf.to_vec(); let mut tmp_buf = r_buf.to_vec();
tmp_buf.truncate(bytes_read); tmp_buf.truncate(bytes_read);
*/ */
//let _bw = self.internal_data_buffer.write(&tmp_buf).unwrap(); //let _bw = self.internal_data_buffer.write(&tmp_buf).unwrap();
let _bw = internal_data_buffer.write(r_buf.split_at(bytes_read).0).unwrap(); let _bw = internal_data_buffer
.write(r_buf.split_at(bytes_read).0)
.unwrap();
data_length += bytes_read as i64; data_length += bytes_read as i64;
} else { } else {
println!("[+] Else hit!!!!!!!!!!!!!!!!!!!!!!"); println!("[+] Else hit!!!!!!!!!!!!!!!!!!!!!!");
} }
}, }
Err(e) => { Err(e) => match e.kind() {
match e.kind() {
io::ErrorKind::WouldBlock => { io::ErrorKind::WouldBlock => {
if data_length == 0 { if data_length == 0 {
data_length = -1; data_length = -1;
} }
break; break;
}
},
io::ErrorKind::ConnectionReset => { io::ErrorKind::ConnectionReset => {
data_length = -2; data_length = -2;
break; break;
}, }
_ => {println!("[!!!] Got error: {}",e);} _ => {
println!("[!!!downstream] Got error: {}", e);
} }
}, },
} }
@ -231,40 +190,30 @@ impl DownStreamInner {
} }
} }
impl UpStreamInner { impl UpStreamInner {
fn handle_error(error_description: &str) { fn handle_error(error_description: &str) {
println!("[SSLRelay UpStream Thread Error]: {}", error_description); println!("[SSLRelay UpStream Thread Error]: {}", error_description);
} }
pub fn us_handler(self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) { pub fn us_handler(self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) {
match &self.us_stream { match &self.us_stream {
DataStreamType::RAW(_) => self.handle_raw(data_out, data_in), DataStreamType::RAW(_) => self.handle_raw(data_out, data_in),
DataStreamType::TLS(_) => self.handle_tls(data_out, data_in), DataStreamType::TLS(_) => self.handle_tls(data_out, data_in),
} }
} }
fn handle_raw(mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) { fn handle_raw(mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) {
let mut raw_stream = match self.us_stream { let mut raw_stream = match self.us_stream {
DataStreamType::RAW(ref s) => s, DataStreamType::RAW(ref s) => s,
_ => return, _ => return,
}; };
loop { loop {
match data_in.recv_timeout(Duration::from_millis(50)) { match data_in.recv_timeout(Duration::from_millis(50)) {
Ok(data_received) => match data_received {
Ok(data_received) => {
match data_received {
DataPipe::DataWrite(data) => { DataPipe::DataWrite(data) => {
match raw_stream.write_all(&data) { match raw_stream.write_all(&data) {
Ok(()) => {}, Ok(()) => {}
Err(_e) => { Err(_e) => {
Self::handle_error("Failed to write data to UpStream tcp stream!"); Self::handle_error("Failed to write data to UpStream tcp stream!");
let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown); let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown);
@ -273,38 +222,35 @@ impl UpStreamInner {
} }
} }
let _ = raw_stream.flush(); let _ = raw_stream.flush();
}, }
DataPipe::Shutdown => { DataPipe::Shutdown => {
let _ = raw_stream.shutdown(Shutdown::Both); let _ = raw_stream.shutdown(Shutdown::Both);
return; return;
} }
}
}, },
Err(e) => { Err(e) => match e {
match e { mpsc::RecvTimeoutError::Timeout => {}
mpsc::RecvTimeoutError::Timeout => {},
mpsc::RecvTimeoutError::Disconnected => { mpsc::RecvTimeoutError::Disconnected => {
Self::handle_error("UpStream data_in channel is disconnected!"); Self::handle_error("UpStream data_in channel is disconnected!");
return; return;
} }
} },
} } // End of data_in receive
}// End of data_in receive
if let Some(byte_count) = Self::get_data_stream(&mut raw_stream, &mut self.internal_data_buffer) { if let Some(byte_count) =
Self::get_data_stream(&mut raw_stream, &mut self.internal_data_buffer)
{
if byte_count > 0 { if byte_count > 0 {
if let Err(_e) = data_out.send(FullDuplexTcpState::DownStreamWrite(
if let Err(_e) = data_out.send(FullDuplexTcpState::DownStreamWrite(self.internal_data_buffer.clone())) { self.internal_data_buffer.clone(),
)) {
//Self::handle_error(format!("Failed to send DownStreamWrite to main thread: {}", e).as_str()); //Self::handle_error(format!("Failed to send DownStreamWrite to main thread: {}", e).as_str());
let _ = raw_stream.shutdown(Shutdown::Both); let _ = raw_stream.shutdown(Shutdown::Both);
return; return;
} }
self.internal_data_buffer.clear(); self.internal_data_buffer.clear();
} else if byte_count == 0 || byte_count == -2 { } else if byte_count == 0 || byte_count == -2 {
let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown); let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown);
let _ = raw_stream.shutdown(Shutdown::Both); let _ = raw_stream.shutdown(Shutdown::Both);
return; return;
@ -317,23 +263,17 @@ impl UpStreamInner {
} }
fn handle_tls(mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) { fn handle_tls(mut self, data_out: Sender<FullDuplexTcpState>, data_in: Receiver<DataPipe>) {
let mut tls_stream = match self.us_stream { let mut tls_stream = match self.us_stream {
DataStreamType::TLS(ref mut s) => s, DataStreamType::TLS(ref mut s) => s,
_ => return, _ => return,
}; };
loop { loop {
match data_in.recv_timeout(Duration::from_millis(50)) { match data_in.recv_timeout(Duration::from_millis(50)) {
Ok(data_received) => match data_received {
Ok(data_received) => {
match data_received {
DataPipe::DataWrite(data) => { DataPipe::DataWrite(data) => {
match tls_stream.write_all(&data) { match tls_stream.write_all(&data) {
Ok(()) => {}, Ok(()) => {}
Err(_e) => { Err(_e) => {
Self::handle_error("Failed to write data to UpStream tcp stream!"); Self::handle_error("Failed to write data to UpStream tcp stream!");
let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown); let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown);
@ -342,38 +282,35 @@ impl UpStreamInner {
} }
} }
let _ = tls_stream.flush(); let _ = tls_stream.flush();
}, }
DataPipe::Shutdown => { DataPipe::Shutdown => {
let _ = tls_stream.shutdown(); let _ = tls_stream.shutdown();
return; return;
} }
}
}, },
Err(e) => { Err(e) => match e {
match e { mpsc::RecvTimeoutError::Timeout => {}
mpsc::RecvTimeoutError::Timeout => {},
mpsc::RecvTimeoutError::Disconnected => { mpsc::RecvTimeoutError::Disconnected => {
Self::handle_error("UpStream data_in channel is disconnected!"); Self::handle_error("UpStream data_in channel is disconnected!");
return; return;
} }
} },
} } // End of data_in receive
}// End of data_in receive
if let Some(byte_count) = Self::get_data_stream(&mut tls_stream, &mut self.internal_data_buffer) { if let Some(byte_count) =
Self::get_data_stream(&mut tls_stream, &mut self.internal_data_buffer)
{
if byte_count > 0 { if byte_count > 0 {
if let Err(_e) = data_out.send(FullDuplexTcpState::DownStreamWrite(
if let Err(_e) = data_out.send(FullDuplexTcpState::DownStreamWrite(self.internal_data_buffer.clone())) { self.internal_data_buffer.clone(),
)) {
//Self::handle_error(format!("Failed to send DownStreamWrite to main thread: {}", e).as_str()); //Self::handle_error(format!("Failed to send DownStreamWrite to main thread: {}", e).as_str());
let _ = tls_stream.shutdown(); let _ = tls_stream.shutdown();
return; return;
} }
self.internal_data_buffer.clear(); self.internal_data_buffer.clear();
} else if byte_count == 0 || byte_count == -2 { } else if byte_count == 0 || byte_count == -2 {
let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown); let _ = data_out.send(FullDuplexTcpState::UpStreamShutDown);
let _ = tls_stream.shutdown(); let _ = tls_stream.shutdown();
return; return;
@ -386,52 +323,44 @@ impl UpStreamInner {
} }
fn get_data_stream<S: Read>(stream: &mut S, internal_data_buffer: &mut Vec<u8>) -> Option<i64> { fn get_data_stream<S: Read>(stream: &mut S, internal_data_buffer: &mut Vec<u8>) -> Option<i64> {
let mut data_length: i64 = 0; let mut data_length: i64 = 0;
loop { loop {
let mut r_buf = [0; 1024]; let mut r_buf = [0; 1024];
match stream.read(&mut r_buf) { match stream.read(&mut r_buf) {
Ok(bytes_read) => { Ok(bytes_read) => {
if bytes_read == 0 { if bytes_read == 0 {
break; break;
} else if bytes_read != 0 && bytes_read <= 1024 { } else if bytes_read != 0 && bytes_read <= 1024 {
/* /*
let mut tmp_buf = r_buf.to_vec(); let mut tmp_buf = r_buf.to_vec();
tmp_buf.truncate(bytes_read); tmp_buf.truncate(bytes_read);
*/ */
//let _bw = self.internal_data_buffer.write(&tmp_buf).unwrap(); //let _bw = self.internal_data_buffer.write(&tmp_buf).unwrap();
let _bw = internal_data_buffer.write(r_buf.split_at(bytes_read).0).unwrap(); let _bw = internal_data_buffer
.write(r_buf.split_at(bytes_read).0)
.unwrap();
data_length += bytes_read as i64; data_length += bytes_read as i64;
} else { } else {
println!("[+] Else hit!!!!!!!!!!!!!!!!!!!!!!"); println!("[+] Else hit!!!!!!!!!!!!!!!!!!!!!!");
} }
}, }
Err(e) => { Err(e) => match e.kind() {
match e.kind() {
io::ErrorKind::WouldBlock => { io::ErrorKind::WouldBlock => {
if data_length == 0 { if data_length == 0 {
data_length = -1; data_length = -1;
} }
break; break;
}, }
io::ErrorKind::ConnectionReset => { io::ErrorKind::ConnectionReset => {
data_length = -2; data_length = -2;
break; break;
}, }
_ => {println!("[!!!] Got error: {}",e);} _ => {
println!("[!!!upstream] Got error: {}", e);
} }
}, },
} }

View file

@ -4,26 +4,30 @@
//! This library allows you to implement callback functions for upstream and downstream traffic. //! This library allows you to implement callback functions for upstream and downstream traffic.
//! These callbacks can R/W the data from a stream(Blocking) or only R the data(Non-Blocking). //! These callbacks can R/W the data from a stream(Blocking) or only R the data(Non-Blocking).
//!``` //!```
//!pub trait HandlerCallbacks { //! pub trait HandlerCallbacks {
//! fn ds_b_callback(&mut self, _in_data: Vec<u8>) -> CallbackRet {CallbackRet::Relay(_in_data)} //! fn ds_b_callback(&mut self, _in_data: Vec<u8>) -> CallbackRet {
//! fn ds_nb_callback(&self, _in_data: Vec<u8>){} //! CallbackRet::Relay(_in_data)
//! fn us_b_callback(&mut self, _in_data: Vec<u8>) -> CallbackRet {CallbackRet::Relay(_in_data)} //! }
//! fn us_nb_callback(&self, _in_data: Vec<u8>){} //! fn ds_nb_callback(&self, _in_data: Vec<u8>) {}
//!} //! fn us_b_callback(&mut self, _in_data: Vec<u8>) -> CallbackRet {
//!``` //! CallbackRet::Relay(_in_data)
//! }
//! fn us_nb_callback(&self, _in_data: Vec<u8>) {}
//! }
//! ```
//! The blocking callbacks return an enum called CallbackRet with four different variants. //! The blocking callbacks return an enum called CallbackRet with four different variants.
//! The variants control the flow of the tcp stream. //! The variants control the flow of the tcp stream.
//!``` //!```
//! pub enum CallbackRet { //! pub enum CallbackRet {
//! Relay(Vec<u8>),// Relay data //! Relay(Vec<u8>), // Relay data
//! Spoof(Vec<u8>),// Skip relaying and send data back //! Spoof(Vec<u8>), // Skip relaying and send data back
//! Shutdown,// Shutdown TCP connection //! Shutdown, // Shutdown TCP connection
//! Freeze,// Dont send data (pretend as if stream never was recieved) //! Freeze, // Dont send data (pretend as if stream never was recieved)
//! } //! }
//! ``` //! ```
//! ## Example (basic.rs) //! ## Example (basic.rs)
//! ``` //! ```
//! use sslrelay::{self, RelayConfig, HandlerCallbacks, CallbackRet, TCPDataType, TLSConfig}; //! use sslrelay::{self, CallbackRet, HandlerCallbacks, RelayConfig, TCPDataType, TLSConfig};
//! //!
//! // Handler object //! // Handler object
//! #[derive(Clone)] // Must have Clone trait implemented. //! #[derive(Clone)] // Must have Clone trait implemented.
@ -34,7 +38,6 @@
//! into data upstream or downstream. //! into data upstream or downstream.
//! */ //! */
//! impl HandlerCallbacks for Handler { //! impl HandlerCallbacks for Handler {
//!
//! // DownStream non blocking callback (Read Only) //! // DownStream non blocking callback (Read Only)
//! fn ds_nb_callback(&self, _in_data: Vec<u8>) { //! fn ds_nb_callback(&self, _in_data: Vec<u8>) {
//! println!("[CALLBACK] Down Stream Non Blocking CallBack!"); //! println!("[CALLBACK] Down Stream Non Blocking CallBack!");
@ -59,7 +62,6 @@
//! } //! }
//! //!
//! fn main() { //! fn main() {
//!
//! // Create new SSLRelay object //! // Create new SSLRelay object
//! let mut relay = sslrelay::SSLRelay::new( //! let mut relay = sslrelay::SSLRelay::new(
//! Handler, //! Handler,
@ -68,13 +70,13 @@
//! upstream_data_type: TCPDataType::TLS, //! upstream_data_type: TCPDataType::TLS,
//! bind_host: "0.0.0.0".to_string(), //! bind_host: "0.0.0.0".to_string(),
//! bind_port: "443".to_string(), //! bind_port: "443".to_string(),
//! remote_host: "remote.com".to_string(), //! remote_host: |_| "remote.com",
//! remote_port: "443".to_string(), //! remote_port: "443".to_string(),
//! tls_config: TLSConfig::FILE{ //! tls_config: TLSConfig::FILE {
//! certificate_path: "./tls.crt".to_string(), //! certificate_path: "./tls.crt".to_string(),
//! private_key_path: "./tls.key".to_string(), //! private_key_path: "./tls.key".to_string(),
//! }, //! },
//! } //! },
//! ); //! );
//! //!
//! // Start listening //! // Start listening
@ -83,53 +85,26 @@
//! ``` //! ```
use openssl::{ use openssl::{
x509::X509,
pkey::PKey, pkey::PKey,
ssl::{ ssl::{SslAcceptor, SslConnector, SslFiletype, SslMethod, SslStream, SslVerifyMode},
SslVerifyMode, x509::X509,
SslConnector,
SslAcceptor,
SslStream,
SslFiletype,
SslMethod,
}
}; };
use std::net::{ use std::net::{Shutdown, TcpListener, TcpStream};
TcpListener,
TcpStream,
Shutdown
};
use std::sync::{ use std::sync::{Arc, Mutex};
Arc,
Mutex
};
use std::{ use std::thread;
thread
};
use std::{ use std::{path::Path, time::Duration};
path::Path,
time::Duration,
};
use std::io::{ use std::io::{self, Read, Write};
self,
Read,
Write
};
use std::sync::mpsc::{ use std::sync::mpsc::{self, Receiver, Sender};
self,
Receiver,
Sender
};
mod data; mod data;
mod tcp;
mod relay; mod relay;
mod tcp;
#[derive(Debug)] #[derive(Debug)]
enum FullDuplexTcpState { enum FullDuplexTcpState {
@ -163,8 +138,14 @@ pub enum TCPDataType {
/// NONE is for when you are not using TLS on the listening/downstream side of the relay. /// NONE is for when you are not using TLS on the listening/downstream side of the relay.
#[derive(Clone)] #[derive(Clone)]
pub enum TLSConfig { pub enum TLSConfig {
FILE {certificate_path: String, private_key_path: String}, FILE {
DATA {certificate: Vec<u8>, private_key: Vec<u8>}, certificate_path: String,
private_key_path: String,
},
DATA {
certificate: Vec<u8>,
private_key: Vec<u8>,
},
NONE, NONE,
} }
@ -175,7 +156,8 @@ pub struct RelayConfig {
pub upstream_data_type: TCPDataType, pub upstream_data_type: TCPDataType,
pub bind_host: String, pub bind_host: String,
pub bind_port: String, pub bind_port: String,
pub remote_host: String, pub remote_host: fn(Option<&str>) -> String,
//pub remote_host: String,
pub remote_port: String, pub remote_port: String,
pub tls_config: TLSConfig, pub tls_config: TLSConfig,
} }
@ -183,18 +165,23 @@ pub struct RelayConfig {
/// CallbackRet for blocking callback functions /// CallbackRet for blocking callback functions
#[derive(Debug)] #[derive(Debug)]
pub enum CallbackRet { pub enum CallbackRet {
Relay(Vec<u8>),// Relay data Relay(Vec<u8>), // Relay data
Spoof(Vec<u8>),// Skip relaying and send data back Spoof(Vec<u8>), // Skip relaying and send data back
Shutdown,// Shutdown TCP connection Shutdown, // Shutdown TCP connection
Freeze,// Dont send data (pretend as if stream never was recieved) Freeze, // Dont send data (pretend as if stream never was recieved)
} }
/// Callback functions a user may or may not implement. /// Callback functions a user may or may not implement.
pub trait HandlerCallbacks { pub trait HandlerCallbacks {
fn ds_b_callback(&mut self, _in_data: Vec<u8>) -> CallbackRet {CallbackRet::Relay(_in_data)} fn ds_b_callback(&mut self, _in_data: Vec<u8>, _conn_id: u64) -> CallbackRet {
fn ds_nb_callback(&self, _in_data: Vec<u8>){} CallbackRet::Relay(_in_data)
fn us_b_callback(&mut self, _in_data: Vec<u8>) -> CallbackRet {CallbackRet::Relay(_in_data)} }
fn us_nb_callback(&self, _in_data: Vec<u8>){} fn ds_nb_callback(&self, _in_data: Vec<u8>, _conn_id: u64) {}
fn us_b_callback(&mut self, _in_data: Vec<u8>, _conn_id: u64) -> CallbackRet {
CallbackRet::Relay(_in_data)
}
fn us_nb_callback(&self, _in_data: Vec<u8>, _conn_id: u64) {}
fn set_server_name(&mut self, _server_name: Option<&str>) {}
} }
/// The main SSLRelay object. /// The main SSLRelay object.
@ -217,6 +204,7 @@ where
ds_inner_m: Arc<Mutex<Option<DownStreamInner>>>, ds_inner_m: Arc<Mutex<Option<DownStreamInner>>>,
us_inner_m: Arc<Mutex<Option<UpStreamInner>>>, us_inner_m: Arc<Mutex<Option<UpStreamInner>>>,
inner_handlers: InnerHandlers<H>, inner_handlers: InnerHandlers<H>,
conn_id: u64,
} }
#[derive(Clone)] #[derive(Clone)]
@ -224,17 +212,15 @@ struct InnerHandlers<H>
where where
H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static, H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static,
{ {
cb: H cb: H,
} }
struct DownStreamInner struct DownStreamInner {
{
ds_stream: DataStreamType, ds_stream: DataStreamType,
internal_data_buffer: Vec<u8>, internal_data_buffer: Vec<u8>,
} }
struct UpStreamInner struct UpStreamInner {
{
us_stream: DataStreamType, us_stream: DataStreamType,
internal_data_buffer: Vec<u8> internal_data_buffer: Vec<u8>,
} }

View file

@ -1,137 +1,221 @@
//! SSLRelay //! SSLRelay
use crate::{ use crate::{
SSLRelay, thread, Arc, DataStreamType, FullDuplexTcp, HandlerCallbacks, InnerHandlers, PKey, Path,
HandlerCallbacks, RelayConfig, SSLRelay, SslAcceptor, SslFiletype, SslMethod, TCPDataType, TLSConfig,
InnerHandlers, TcpListener, X509,
TCPDataType,
TcpListener,
thread,
FullDuplexTcp,
DataStreamType,
RelayConfig,
Arc,
SslAcceptor,
Path,
SslMethod,
SslFiletype,
TLSConfig,
PKey,
X509,
}; };
impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static> SSLRelay<H> { impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static> SSLRelay<H> {
/// Creates new SSLRelay instance. /// Creates new SSLRelay instance.
pub fn new(handlers: H, config: RelayConfig) -> Self { pub fn new(handlers: H, config: RelayConfig) -> Self {
SSLRelay { SSLRelay {
config, config,
handlers: Some(InnerHandlers{cb: handlers}), handlers: Some(InnerHandlers { cb: handlers }),
} }
} }
/// Starts the SSLRelay connection handling. /// Starts the SSLRelay connection handling.
pub fn start(&mut self) { pub fn start(&mut self) {
let rhost = self.config.remote_host.clone(); let rhost = self.config.remote_host.clone();
let rport = self.config.remote_port.clone(); let rport = self.config.remote_port.clone();
let upstream_data_stream_type = self.config.upstream_data_type; let upstream_data_stream_type = self.config.upstream_data_type;
let listener = TcpListener::bind(format!("{}:{}", self.config.bind_host.clone(), self.config.bind_port.clone())).unwrap(); let listener = TcpListener::bind(format!(
"{}:{}",
self.config.bind_host.clone(),
self.config.bind_port.clone()
))
.unwrap();
let mut conn_id = 0;
match self.config.downstream_data_type { match self.config.downstream_data_type {
TCPDataType::TLS => { TCPDataType::TLS => {
let acceptor = self.setup_ssl_config(&self.config.tls_config);
let acceptor = self.setup_ssl_config(self.config.tls_config.clone());
for stream in listener.incoming() { for stream in listener.incoming() {
match stream { match stream {
Ok(stream) => { Ok(stream) => {
let acceptor = acceptor.clone(); let acceptor = acceptor.clone();
let handler_clone = self.handlers.as_ref().unwrap().clone(); let mut handler_clone = self.handlers.as_ref().unwrap().clone();
let r_host = rhost.clone(); let r_host = rhost.clone();
let r_port = rport.clone(); let r_port = rport.clone();
let this_conn_id = conn_id;
thread::spawn(move || { thread::spawn(move || {
match acceptor.accept(stream) { match acceptor.accept(stream) {
Ok(stream) => { Ok(stream) => {
// FULL DUPLEX OBJECT CREATION HERE let server_name = stream
match FullDuplexTcp::new(DataStreamType::TLS(stream), upstream_data_stream_type, r_host, r_port, handler_clone) { .ssl()
Ok(mut fdtcp) => fdtcp.handle(), .servername(openssl::ssl::NameType::HOST_NAME);
Err(_ec) => {println!("[SSLRelay Error] Failed to handle TCP(TLS) connection: {}", _ec)} handler_clone.cb.set_server_name(server_name);
} let remote_host = (r_host)(server_name);
},
Err(e) => {
// FULL DUPLEX OBJECT CREATION HERE
match FullDuplexTcp::new(
DataStreamType::TLS(stream),
upstream_data_stream_type,
remote_host,
r_port,
handler_clone,
this_conn_id,
) {
Ok(mut fdtcp) => fdtcp.handle(),
Err(_ec) => {
println!(
"[SSLRelay Error] Failed to handle TCP(TLS) \
connection: {}",
_ec
)
}
}
}
Err(e) => {
println!("[Error] {}", e); println!("[Error] {}", e);
} }
} }
}); });
}, conn_id += 1;
Err(e) => {println!("[Error] Tcp Connection Failed: {}", e)} }
Err(e) => {
println!("[Error] Tcp Connection Failed: {}", e)
}
}
} }
} }
},
TCPDataType::RAW => { TCPDataType::RAW => {
for stream in listener.incoming() { for stream in listener.incoming() {
match stream { match stream {
Ok(stream) => { Ok(stream) => {
let handler_clone = self.handlers.as_ref().unwrap().clone(); let handler_clone = self.handlers.as_ref().unwrap().clone();
let r_host = rhost.clone(); let r_host = rhost.clone();
let r_port = rport.clone(); let r_port = rport.clone();
let this_conn_id = conn_id;
thread::spawn(move || { thread::spawn(move || {
// FULL DUPLEX OBJECT CREATION HERE // FULL DUPLEX OBJECT CREATION HERE
match FullDuplexTcp::new(DataStreamType::RAW(stream), upstream_data_stream_type, r_host, r_port, handler_clone) { match FullDuplexTcp::new(
DataStreamType::RAW(stream),
upstream_data_stream_type,
(r_host)(None),
r_port,
handler_clone,
this_conn_id,
) {
Ok(mut fdtcp) => fdtcp.handle(), Ok(mut fdtcp) => fdtcp.handle(),
Err(_ec) => println!("[SSLRelay Error] Failed to handle TCP connection: {}", _ec), Err(_ec) => println!(
"[SSLRelay Error] Failed to handle TCP connection: {}",
_ec
),
} }
}); });
}, conn_id += 1;
Err(e) => {println!("[Error] Tcp Connection Failed: {}", e)} }
Err(e) => {
println!("[Error] Tcp Connection Failed: {}", e)
}
} }
} }
} }
} }
} }
fn setup_ssl_config(&self, tls_config: TLSConfig) -> Arc<SslAcceptor> { fn setup_ssl_config(&self, tls_config: &TLSConfig) -> Arc<SslAcceptor> {
let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap(); let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
match tls_config { match tls_config {
TLSConfig::FILE{certificate_path, private_key_path} => { TLSConfig::FILE {
certificate_path,
private_key_path,
} => {
if !Path::new(&private_key_path).exists() { if !Path::new(&private_key_path).exists() {
panic!("[-] [{}] does not exist!", private_key_path); panic!("[-] [{}] does not exist!", private_key_path);
} }
if !Path::new(&certificate_path).exists() { if !Path::new(&certificate_path).exists() {
panic!("[-] [{}] does not exist!", certificate_path); panic!("[-] [{}] does not exist!", certificate_path);
} }
acceptor.set_private_key_file(private_key_path, SslFiletype::PEM).unwrap(); acceptor
acceptor.set_certificate_chain_file(certificate_path).unwrap(); .set_private_key_file(private_key_path, SslFiletype::PEM)
.unwrap();
acceptor
.set_certificate_chain_file(certificate_path)
.unwrap();
acceptor.check_private_key().unwrap(); acceptor.check_private_key().unwrap();
}, }
TLSConfig::DATA{certificate, private_key} => { TLSConfig::DATA {
certificate,
private_key,
} => {
let x_509_certificate = X509::from_pem(certificate.as_slice()).unwrap(); let x_509_certificate = X509::from_pem(certificate.as_slice()).unwrap();
let private_key = PKey::private_key_from_pem(private_key.as_slice()).unwrap(); let private_key = PKey::private_key_from_pem(private_key.as_slice()).unwrap();
acceptor.set_certificate(x_509_certificate.as_ref()).unwrap(); acceptor
.set_certificate(x_509_certificate.as_ref())
.unwrap();
acceptor.set_private_key(private_key.as_ref()).unwrap(); acceptor.set_private_key(private_key.as_ref()).unwrap();
}, }
TLSConfig::NONE => { TLSConfig::NONE => {
panic!("[SSLRelay Error] Specified NONE for TLSConfig and downstream_data_type as TLS."); panic!(
"[SSLRelay Error] Specified NONE for TLSConfig and downstream_data_type as \
TLS."
);
} }
} }
Arc::new(acceptor.build()) Arc::new(acceptor.build())
} }
}// SSLRelay
// fn setup_ssl_config(&self, tls_configs: &[(&[&str], TLSConfig)]) -> AcceptorDb {
// let mut acceptor_db = AcceptorDb {
// acceptors: HashMap::new(),
// };
// for (names, tls_config) in tls_configs {
// let mut acceptor = SslAcceptor::mozilla_intermediate(SslMethod::tls()).unwrap();
// match tls_config {
// TLSConfig::FILE {
// certificate_path,
// private_key_path,
// } => {
// if !Path::new(&private_key_path).exists() {
// panic!("[-] [{}] does not exist!", private_key_path);
// }
// if !Path::new(&certificate_path).exists() {
// panic!("[-] [{}] does not exist!", certificate_path);
// }
// acceptor
// .set_private_key_file(private_key_path, SslFiletype::PEM)
// .unwrap();
// acceptor
// .set_certificate_chain_file(certificate_path)
// .unwrap();
// acceptor.check_private_key().unwrap();
// }
// TLSConfig::DATA {
// certificate,
// private_key,
// } => {
// let x_509_certificate = X509::from_pem(certificate.as_slice()).unwrap();
// let private_key = PKey::private_key_from_pem(private_key.as_slice()).unwrap();
// acceptor
// .set_certificate(x_509_certificate.as_ref())
// .unwrap();
// acceptor.set_private_key(private_key.as_ref()).unwrap();
// }
// TLSConfig::NONE => {
// panic!(
// "[SSLRelay Error] Specified NONE for TLSConfig and downstream_data_type as \
// TLS."
// );
// }
// }
// let acceptor = Arc::new(acceptor.build());
// for name in *names {
// acceptor_db.acceptors.insert(name.to_string(), acceptor.clone());
// }
// }
// acceptor_db
// }
} // SSLRelay
// struct AcceptorDb {
// acceptors: HashMap<String, Arc<SslAcceptor>>,
// }

View file

@ -1,63 +1,76 @@
use crate::{ use crate::{
FullDuplexTcp, mpsc, thread, Arc, CallbackRet, DataPipe, DataStreamType, DownStreamInner, Duration,
HandlerCallbacks, FullDuplexTcp, FullDuplexTcpState, HandlerCallbacks, InnerHandlers, Mutex, Receiver, Sender,
DataStreamType, Shutdown, SslConnector, SslMethod, SslVerifyMode, TCPDataType, TcpStream, UpStreamInner,
TCPDataType,
Duration,
Arc,
Mutex,
DownStreamInner,
UpStreamInner,
InnerHandlers,
Shutdown,
Sender,
Receiver,
FullDuplexTcpState,
DataPipe,
mpsc,
thread,
CallbackRet,
TcpStream,
SslVerifyMode,
SslConnector,
SslMethod,
}; };
impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static> FullDuplexTcp<H> { impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'static>
FullDuplexTcp<H>
pub fn new(ds_tcp_stream: DataStreamType, us_tcp_stream_type: TCPDataType, remote_host: String, remote_port: String, handlers: InnerHandlers<H>) -> Result<Self, i8> { {
pub fn new(
ds_tcp_stream: DataStreamType,
us_tcp_stream_type: TCPDataType,
remote_host: String,
remote_port: String,
handlers: InnerHandlers<H>,
conn_id: u64,
) -> Result<Self, i8> {
match ds_tcp_stream { match ds_tcp_stream {
DataStreamType::RAW(ref s) => { let _ = s.set_read_timeout(Some(Duration::from_millis(50))); }, DataStreamType::RAW(ref s) => {
DataStreamType::TLS(ref s) => { let _ = s.get_ref().set_read_timeout(Some(Duration::from_millis(50))); }, let _ = s.set_read_timeout(Some(Duration::from_millis(50)));
}
DataStreamType::TLS(ref s) => {
let _ = s
.get_ref()
.set_read_timeout(Some(Duration::from_millis(50)));
}
} }
let us_tcp_stream = match Self::connect_endpoint(us_tcp_stream_type, remote_host.clone(), remote_port.clone()) { let us_tcp_stream = match Self::connect_endpoint(
us_tcp_stream_type,
remote_host.clone(),
remote_port.clone(),
) {
Ok(s) => s, Ok(s) => s,
Err(ec) => { Err(ec) => {
match ds_tcp_stream { match ds_tcp_stream {
DataStreamType::RAW(s) => { let _ = s.shutdown(Shutdown::Both); }, DataStreamType::RAW(s) => {
DataStreamType::TLS(mut s) => { let _ = s.shutdown(); }, let _ = s.shutdown(Shutdown::Both);
}
DataStreamType::TLS(mut s) => {
let _ = s.shutdown();
}
} }
return Err(ec); return Err(ec);
} }
}; };
Ok( Ok(FullDuplexTcp {
FullDuplexTcp {
remote_host, remote_host,
remote_port, remote_port,
ds_inner_m: Arc::new(Mutex::new(Some(DownStreamInner{ds_stream: ds_tcp_stream, internal_data_buffer: Vec::<u8>::new()}))), ds_inner_m: Arc::new(Mutex::new(Some(DownStreamInner {
us_inner_m: Arc::new(Mutex::new(Some(UpStreamInner{us_stream: us_tcp_stream, internal_data_buffer: Vec::<u8>::new()}))), ds_stream: ds_tcp_stream,
internal_data_buffer: Vec::<u8>::new(),
}))),
us_inner_m: Arc::new(Mutex::new(Some(UpStreamInner {
us_stream: us_tcp_stream,
internal_data_buffer: Vec::<u8>::new(),
}))),
inner_handlers: handlers, inner_handlers: handlers,
conn_id,
}) })
} }
pub fn handle(&mut self) { pub fn handle(&mut self) {
let conn_id = self.conn_id;
let (state_sender, state_receiver): (Sender<FullDuplexTcpState>, Receiver<FullDuplexTcpState>) = mpsc::channel(); let (state_sender, state_receiver): (
let (ds_data_pipe_sender, ds_data_pipe_receiver): (Sender<DataPipe>, Receiver<DataPipe>) = mpsc::channel(); Sender<FullDuplexTcpState>,
let (us_data_pipe_sender, us_data_pipe_receiver): (Sender<DataPipe>, Receiver<DataPipe>) = mpsc::channel(); Receiver<FullDuplexTcpState>,
) = mpsc::channel();
let (ds_data_pipe_sender, ds_data_pipe_receiver): (Sender<DataPipe>, Receiver<DataPipe>) =
mpsc::channel();
let (us_data_pipe_sender, us_data_pipe_receiver): (Sender<DataPipe>, Receiver<DataPipe>) =
mpsc::channel();
let ds_method_pointer = self.ds_inner_m.clone(); let ds_method_pointer = self.ds_inner_m.clone();
let ds_state_bc = state_sender.clone(); let ds_state_bc = state_sender.clone();
@ -66,22 +79,29 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
let us_state_bc = state_sender.clone(); let us_state_bc = state_sender.clone();
thread::spawn(move || { thread::spawn(move || {
ds_method_pointer.lock().unwrap().take().unwrap().ds_handler(ds_state_bc, ds_data_pipe_receiver); ds_method_pointer
.lock()
.unwrap()
.take()
.unwrap()
.ds_handler(ds_state_bc, ds_data_pipe_receiver);
}); });
thread::spawn(move || { thread::spawn(move || {
us_method_pointer.lock().unwrap().take().unwrap().us_handler(us_state_bc, us_data_pipe_receiver); us_method_pointer
.lock()
.unwrap()
.take()
.unwrap()
.us_handler(us_state_bc, us_data_pipe_receiver);
}); });
loop { loop {
match state_receiver.recv() { match state_receiver.recv() {
Ok(state_request) => { Ok(state_request) => {
match state_request { match state_request {
// DownStream Write Request // DownStream Write Request
FullDuplexTcpState::DownStreamWrite(data) => { FullDuplexTcpState::DownStreamWrite(data) => {
/* /*
Callbacks that work with data from UpStream go here Callbacks that work with data from UpStream go here
Add callback return types for blocking callback subroutines Add callback return types for blocking callback subroutines
@ -95,43 +115,70 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
let in_data = data.clone(); let in_data = data.clone();
thread::spawn(move || { thread::spawn(move || {
inner_handlers_clone.cb.us_nb_callback(in_data); inner_handlers_clone.cb.us_nb_callback(in_data, conn_id);
}); });
match self.inner_handlers.cb.us_b_callback(data) { match self.inner_handlers.cb.us_b_callback(data, conn_id) {
CallbackRet::Relay(retdata) => { CallbackRet::Relay(retdata) => {
match ds_data_pipe_sender.send(DataPipe::DataWrite(retdata)) { match ds_data_pipe_sender.send(DataPipe::DataWrite(retdata)) {
Ok(()) => {}, Ok(()) => {}
Err(e) => { Err(e) => {
Self::handle_error(format!("Failed to send data write to DownStream thread: {}", e).as_str()); Self::handle_error(
format!(
"Failed to send data write to DownStream \
thread: {}",
e
)
.as_str(),
);
return; return;
} }
} }
}, }
CallbackRet::Spoof(retdata) => { CallbackRet::Spoof(retdata) => {
match us_data_pipe_sender.send(DataPipe::DataWrite(retdata)) { match us_data_pipe_sender.send(DataPipe::DataWrite(retdata)) {
Ok(()) => {}, Ok(()) => {}
Err(e) => { Err(e) => {
Self::handle_error(format!("Failed to send data write to DownStream thread: {}", e).as_str()); Self::handle_error(
format!(
"Failed to send data write to DownStream \
thread: {}",
e
)
.as_str(),
);
return; return;
} }
} }
}, }
CallbackRet::Freeze => {}, CallbackRet::Freeze => {}
CallbackRet::Shutdown => { CallbackRet::Shutdown => {
if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) { if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) {
Self::handle_error(format!("Failed to send Shutdown signal to UpStream thread: {}", e).as_str()); Self::handle_error(
format!(
"Failed to send Shutdown signal to UpStream \
thread: {}",
e
)
.as_str(),
);
} }
if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) { if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) {
Self::handle_error(format!("Failed to send Shutdown signal to DownStream thread: {}", e).as_str()); Self::handle_error(
format!(
"Failed to send Shutdown signal to DownStream \
thread: {}",
e
)
.as_str(),
);
} }
return; return;
} }
} }
}, }
// UpStream Write Request // UpStream Write Request
FullDuplexTcpState::UpStreamWrite(data) => { FullDuplexTcpState::UpStreamWrite(data) => {
/* /*
Callbacks that work with data from DownStream go here Callbacks that work with data from DownStream go here
*/ */
@ -140,92 +187,143 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
let in_data = data.clone(); let in_data = data.clone();
thread::spawn(move || { thread::spawn(move || {
inner_handlers_clone.cb.ds_nb_callback(in_data); inner_handlers_clone.cb.ds_nb_callback(in_data, conn_id);
}); });
match self.inner_handlers.cb.ds_b_callback(data) { match self.inner_handlers.cb.ds_b_callback(data, conn_id) {
CallbackRet::Relay(retdata) => { CallbackRet::Relay(retdata) => {
match us_data_pipe_sender.send(DataPipe::DataWrite(retdata)) { match us_data_pipe_sender.send(DataPipe::DataWrite(retdata)) {
Ok(()) => {}, Ok(()) => {}
Err(e) => { Err(e) => {
Self::handle_error(format!("Failed to send data write to UpStream thread: {}", e).as_str()); Self::handle_error(
format!(
"Failed to send data write to UpStream \
thread: {}",
e
)
.as_str(),
);
return; return;
} }
} }
}, }
CallbackRet::Spoof(retdata) => { CallbackRet::Spoof(retdata) => {
match ds_data_pipe_sender.send(DataPipe::DataWrite(retdata)) { match ds_data_pipe_sender.send(DataPipe::DataWrite(retdata)) {
Ok(()) => {}, Ok(()) => {}
Err(e) => { Err(e) => {
Self::handle_error(format!("Failed to send data write to DownStream thread: {}", e).as_str()); Self::handle_error(
format!(
"Failed to send data write to DownStream \
thread: {}",
e
)
.as_str(),
);
return; return;
} }
} }
}, }
CallbackRet::Freeze => {}, CallbackRet::Freeze => {}
CallbackRet::Shutdown => { CallbackRet::Shutdown => {
if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) { if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) {
Self::handle_error(format!("Failed to send Shutdown signal to DownStream thread: {}", e).as_str()); Self::handle_error(
format!(
"Failed to send Shutdown signal to DownStream \
thread: {}",
e
)
.as_str(),
);
} }
if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) { if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) {
Self::handle_error(format!("Failed to send Shutdown signal to UpStream thread: {}", e).as_str()); Self::handle_error(
format!(
"Failed to send Shutdown signal to UpStream \
thread: {}",
e
)
.as_str(),
);
} }
return; return;
} }
} }
}, }
// DownStreamShutDown Request // DownStreamShutDown Request
FullDuplexTcpState::DownStreamShutDown => { FullDuplexTcpState::DownStreamShutDown => {
if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) { if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) {
Self::handle_error(format!("Failed to send Shutdown signal to UpStream thread: {}", e).as_str()); Self::handle_error(
format!(
"Failed to send Shutdown signal to UpStream thread: {}",
e
)
.as_str(),
);
return; return;
} }
return; return;
}, }
// UpStreamShutDown Request // UpStreamShutDown Request
FullDuplexTcpState::UpStreamShutDown => { FullDuplexTcpState::UpStreamShutDown => {
if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) { if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) {
Self::handle_error(format!("Failed to send Shutdown signal to DownStream thread: {}", e).as_str()); Self::handle_error(
format!(
"Failed to send Shutdown signal to DownStream thread: {}",
e
)
.as_str(),
);
return; return;
} }
return; return;
},
} }
}, }
}
Err(_e) => { Err(_e) => {
Self::handle_error("State receiver communication channel has closed!"); Self::handle_error("State receiver communication channel has closed!");
if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) { if let Err(e) = ds_data_pipe_sender.send(DataPipe::Shutdown) {
Self::handle_error(format!("Failed to send Shutdown signal to DownStream thread: {}", e).as_str()); Self::handle_error(
format!("Failed to send Shutdown signal to DownStream thread: {}", e)
.as_str(),
);
} }
if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) { if let Err(e) = us_data_pipe_sender.send(DataPipe::Shutdown) {
Self::handle_error(format!("Failed to send Shutdown signal to UpStream thread: {}", e).as_str()); Self::handle_error(
format!("Failed to send Shutdown signal to UpStream thread: {}", e)
.as_str(),
);
} }
return; return;
} }
}// State Receiver } // State Receiver
} }
} }
fn connect_endpoint(stream_data_type: TCPDataType, remote_host: String, remote_port: String) -> Result<DataStreamType, i8> { fn connect_endpoint(
stream_data_type: TCPDataType,
remote_host: String,
remote_port: String,
) -> Result<DataStreamType, i8> {
match stream_data_type { match stream_data_type {
TCPDataType::RAW => { TCPDataType::RAW => {
let s = match TcpStream::connect(format!("{}:{}", remote_host, remote_port)) { let s = match TcpStream::connect(format!("{}:{}", remote_host, remote_port)) {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
Self::handle_error(format!("Can't connect to remote host: {}\nErr: {}", format!("{}:{}", remote_host, remote_port), e).as_str()); Self::handle_error(
format!(
"Can't connect to remote host: {}\nErr: {}",
format!("{}:{}", remote_host, remote_port),
e
)
.as_str(),
);
return Result::Err(-1); return Result::Err(-1);
} }
}; };
let _ = s.set_read_timeout(Some(Duration::from_millis(50))); let _ = s.set_read_timeout(Some(Duration::from_millis(50)));
return Ok(DataStreamType::RAW(s)); return Ok(DataStreamType::RAW(s));
}
},
TCPDataType::TLS => { TCPDataType::TLS => {
let mut sslbuilder = SslConnector::builder(SslMethod::tls()).unwrap(); let mut sslbuilder = SslConnector::builder(SslMethod::tls()).unwrap();
sslbuilder.set_verify(SslVerifyMode::NONE); sslbuilder.set_verify(SslVerifyMode::NONE);
@ -234,7 +332,14 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
let s = match TcpStream::connect(format!("{}:{}", remote_host, remote_port)) { let s = match TcpStream::connect(format!("{}:{}", remote_host, remote_port)) {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
Self::handle_error(format!("Can't connect to remote host: {}\nErr: {}", format!("{}:{}", remote_host, remote_port), e).as_str()); Self::handle_error(
format!(
"Can't connect to remote host: {}\nErr: {}",
format!("{}:{}", remote_host, remote_port),
e
)
.as_str(),
);
return Result::Err(-1); return Result::Err(-1);
} }
}; };
@ -242,12 +347,16 @@ impl<H: HandlerCallbacks + std::marker::Sync + std::marker::Send + Clone + 'stat
let s = match connector.connect(remote_host.as_str(), s) { let s = match connector.connect(remote_host.as_str(), s) {
Ok(s) => s, Ok(s) => s,
Err(e) => { Err(e) => {
Self::handle_error(format!("Failed to accept TLS/SSL handshake: {}", e).as_str()); Self::handle_error(
format!("Failed to accept TLS/SSL handshake: {}", e).as_str(),
);
return Result::Err(-2); return Result::Err(-2);
} }
}; };
let _ = s.get_ref().set_read_timeout(Some(Duration::from_millis(50))); let _ = s
.get_ref()
.set_read_timeout(Some(Duration::from_millis(50)));
return Ok(DataStreamType::TLS(s)); return Ok(DataStreamType::TLS(s));
} }
} }